You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@phoenix.apache.org by GitBox <gi...@apache.org> on 2020/06/01 17:40:09 UTC

[GitHub] [phoenix] gjacoby126 commented on a change in pull request #794: PHOENIX-5928 Index rebuilds without replaying data table mutations

gjacoby126 commented on a change in pull request #794:
URL: https://github.com/apache/phoenix/pull/794#discussion_r433371850



##########
File path: phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
##########
@@ -82,16 +94,30 @@
     protected long maxLookBackInMills;
     protected IndexToolVerificationResult verificationResult;
     protected IndexVerificationResultRepository verificationResultRepository;
+    protected UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver;

Review comment:
       This superclass does not appear to use the ungroupedAggregateRegionObserver pointer, so it should belong to appropriate subclasses. (Also, we shouldn't be passing coprocs around -- see comments below.) 

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java
##########
@@ -305,42 +320,96 @@ private void verifyIndex() throws IOException {
         verificationResult.add(nextVerificationResult);
     }
 
+    private void setMutationAttributes(Mutation m, byte[] uuidValue) {
+        m.setAttribute(useProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData);
+        m.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+        m.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
+                BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES);
+        m.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, clientVersionBytes);
+        // Since we're replaying existing mutations, it makes no sense to write them to the wal
+        m.setDurability(Durability.SKIP_WAL);
+    }
+
+    private byte[] commitIfReady(byte[] uuidValue, UngroupedAggregateRegionObserver.MutationList mutationList) throws IOException {
+        if (ServerUtil.readyToCommit(mutationList.size(), mutationList.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
+            ungroupedAggregateRegionObserver.checkForRegionClosing();

Review comment:
       Should this also go in a util class along with the get blocking store size method?

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
##########
@@ -82,16 +94,30 @@
     protected long maxLookBackInMills;
     protected IndexToolVerificationResult verificationResult;
     protected IndexVerificationResultRepository verificationResultRepository;
+    protected UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver;
+    protected Map<byte[], NavigableSet<byte[]>> familyMap;
+    protected IndexTool.IndexVerifyType verifyType = IndexTool.IndexVerifyType.NONE;
+    protected boolean verify = false;
 
     public GlobalIndexRegionScanner(RegionScanner innerScanner, final Region region, final Scan scan,
-            final RegionCoprocessorEnvironment env) throws IOException {
+                                    final RegionCoprocessorEnvironment env,
+                                    UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver) throws IOException {
         super(innerScanner);
         final Configuration config = env.getConfiguration();
         if (scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_PAGING) != null) {
             pageSizeInRows = config.getLong(INDEX_REBUILD_PAGE_SIZE_IN_ROWS,
                     QueryServicesOptions.DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS);
         }
         maxBatchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
+        maxBatchSizeBytes = config.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB,
+                QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES);
+        blockingMemstoreSize = UngroupedAggregateRegionObserver.getBlockingMemstoreSize(region, config);

Review comment:
       If this method is needed outside UARO, perhaps it should be in a static Util class somewhere. 

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java
##########
@@ -305,42 +320,96 @@ private void verifyIndex() throws IOException {
         verificationResult.add(nextVerificationResult);
     }
 
+    private void setMutationAttributes(Mutation m, byte[] uuidValue) {
+        m.setAttribute(useProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData);
+        m.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+        m.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
+                BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES);
+        m.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, clientVersionBytes);
+        // Since we're replaying existing mutations, it makes no sense to write them to the wal
+        m.setDurability(Durability.SKIP_WAL);
+    }
+
+    private byte[] commitIfReady(byte[] uuidValue, UngroupedAggregateRegionObserver.MutationList mutationList) throws IOException {
+        if (ServerUtil.readyToCommit(mutationList.size(), mutationList.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
+            ungroupedAggregateRegionObserver.checkForRegionClosing();
+            ungroupedAggregateRegionObserver.commitBatchWithRetries(region, mutationList, blockingMemstoreSize);
+            uuidValue = ServerCacheClient.generateId();
+            mutationList.clear();
+        }
+        return uuidValue;
+    }
+
     @Override
     public boolean next(List<Cell> results) throws IOException {
         Cell lastCell = null;
         int rowCount = 0;
         region.startRegionOperation();
         try {
             synchronized (innerScanner) {
+                byte[] uuidValue = ServerCacheClient.generateId();
                 do {
                     List<Cell> row = new ArrayList<>();
                     hasMore = innerScanner.nextRaw(row);
                     if (!row.isEmpty()) {
-                        lastCell = row.get(0);
+                        lastCell = row.get(0); // lastCell is any cell from the last visited row
                         Put put = null;
+                        Delete del = null;
                         for (Cell cell : row) {
                             if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) {
+                                if (!partialRebuild && familyMap != null && !isColumnIncluded(cell)) {
+                                    continue;
+                                }
                                 if (put == null) {
                                     put = new Put(CellUtil.cloneRow(cell));
                                 }
                                 put.add(cell);
                             } else {
-                                throw new DoNotRetryIOException("Scan without raw found a deleted cell");
+                                if (del == null) {
+                                    del = new Delete(CellUtil.cloneRow(cell));
+                                }
+                                del.addDeleteMarker(cell);
+                            }
+                        }
+                        if (put == null && del == null) {
+                            continue;
+                        }
+                        if (!verify) {
+                            if (put != null) {
+                                setMutationAttributes(put, uuidValue);
+                                mutations.add(put);
                             }
+                            if (del != null) {
+                                setMutationAttributes(del, uuidValue);
+                                mutations.add(del);
+                            }
+                            uuidValue = commitIfReady(uuidValue, mutations);
+                        } else {
+                            indexKeyToDataPutMap
+                                    .put(getIndexRowKey(indexMaintainer, put), put);
                         }
                         rowCount++;
-                        indexKeyToDataPutMap
-                                .put(getIndexRowKey(indexMaintainer, put), put);
+
                     }
                 } while (hasMore && rowCount < pageSizeInRows);
-                verifyIndex();
+                if (verify) {
+                    verifyIndex();
+                } else if (!mutations.isEmpty()) {
+                    ungroupedAggregateRegionObserver.checkForRegionClosing();

Review comment:
       I'm a bit confused -- I thought IndexerRegionScanner was only used for old-style indexes when doing ONLY-mode index verifications so we got the new correct verify features on the old indexes -- why are we committing things?

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java
##########
@@ -305,42 +320,96 @@ private void verifyIndex() throws IOException {
         verificationResult.add(nextVerificationResult);
     }
 
+    private void setMutationAttributes(Mutation m, byte[] uuidValue) {
+        m.setAttribute(useProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData);
+        m.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+        m.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
+                BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES);
+        m.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, clientVersionBytes);
+        // Since we're replaying existing mutations, it makes no sense to write them to the wal
+        m.setDurability(Durability.SKIP_WAL);
+    }
+
+    private byte[] commitIfReady(byte[] uuidValue, UngroupedAggregateRegionObserver.MutationList mutationList) throws IOException {
+        if (ServerUtil.readyToCommit(mutationList.size(), mutationList.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
+            ungroupedAggregateRegionObserver.checkForRegionClosing();
+            ungroupedAggregateRegionObserver.commitBatchWithRetries(region, mutationList, blockingMemstoreSize);

Review comment:
       Seems like it would be pretty straightforward to factor out the UARO.commitWith* functions into a dedicated Committer class that takes in a Region in the constructor and can be called from either UARO or here, to avoid the need for passing the UARO itself around. More in keeping with Single Responsibility Principle. 
   
   Over time, we ideally should whittle UARO down to where it's just a dispatcher, catching Scan events and then calling appropriate objects to serve them based on the Scan attribute. That's out of scope here, but we can get a good start. 

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationResultRepository.java
##########
@@ -83,7 +83,7 @@
             "BeforeRebuildUnknownIndexRowCount";
     public final static byte[] BEFORE_REBUILD_UNKNOWN_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_UNKNOWN_INDEX_ROW_COUNT);
     public final static String AFTER_REBUILD_VALID_INDEX_ROW_COUNT =
-        "AfterValidExpiredIndexRowCount";
+        "AfterRebuildValidIndexRowCount";

Review comment:
       Oops, thanks for the fix. 

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
##########
@@ -241,6 +243,16 @@ public void start(CoprocessorEnvironment e) throws IOException {
             e.getConfiguration().getInt(QueryServices.INDEX_REBUILD_RPC_RETRIES_COUNTER,
                 QueryServicesOptions.DEFAULT_INDEX_REBUILD_RPC_RETRIES_COUNTER));
         indexWriteProps = new ReadOnlyProps(indexWriteConfig.iterator());
+        final RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e;
+        String serverName = env.getRegionServerServices().getServerName().getServerName();
+        DelegateRegionCoprocessorEnvironment indexWriterEnv = new
+                DelegateRegionCoprocessorEnvironment(env, ConnectionType.INDEX_WRITER_CONNECTION);
+        // setup the global index writer
+        this.globalIndexRebuildWriter = new IndexWriter(indexWriterEnv, serverName + "-globalIndexRebuildWriter", false);
+    }
+
+    public IndexWriter getGlobalIndexRebuildWriter() {

Review comment:
       if other classes need access to this, better to pass it to them directly rather than pass an UARO and make them call this getter. 

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java
##########
@@ -305,42 +320,96 @@ private void verifyIndex() throws IOException {
         verificationResult.add(nextVerificationResult);
     }
 
+    private void setMutationAttributes(Mutation m, byte[] uuidValue) {
+        m.setAttribute(useProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData);
+        m.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+        m.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
+                BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES);
+        m.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, clientVersionBytes);
+        // Since we're replaying existing mutations, it makes no sense to write them to the wal
+        m.setDurability(Durability.SKIP_WAL);
+    }
+
+    private byte[] commitIfReady(byte[] uuidValue, UngroupedAggregateRegionObserver.MutationList mutationList) throws IOException {
+        if (ServerUtil.readyToCommit(mutationList.size(), mutationList.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
+            ungroupedAggregateRegionObserver.checkForRegionClosing();
+            ungroupedAggregateRegionObserver.commitBatchWithRetries(region, mutationList, blockingMemstoreSize);
+            uuidValue = ServerCacheClient.generateId();
+            mutationList.clear();
+        }
+        return uuidValue;
+    }
+
     @Override
     public boolean next(List<Cell> results) throws IOException {
         Cell lastCell = null;
         int rowCount = 0;
         region.startRegionOperation();
         try {
             synchronized (innerScanner) {
+                byte[] uuidValue = ServerCacheClient.generateId();
                 do {
                     List<Cell> row = new ArrayList<>();
                     hasMore = innerScanner.nextRaw(row);
                     if (!row.isEmpty()) {
-                        lastCell = row.get(0);
+                        lastCell = row.get(0); // lastCell is any cell from the last visited row
                         Put put = null;
+                        Delete del = null;
                         for (Cell cell : row) {
                             if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) {
+                                if (!partialRebuild && familyMap != null && !isColumnIncluded(cell)) {
+                                    continue;
+                                }
                                 if (put == null) {
                                     put = new Put(CellUtil.cloneRow(cell));
                                 }
                                 put.add(cell);
                             } else {
-                                throw new DoNotRetryIOException("Scan without raw found a deleted cell");
+                                if (del == null) {
+                                    del = new Delete(CellUtil.cloneRow(cell));
+                                }
+                                del.addDeleteMarker(cell);
+                            }
+                        }
+                        if (put == null && del == null) {
+                            continue;
+                        }
+                        if (!verify) {
+                            if (put != null) {
+                                setMutationAttributes(put, uuidValue);
+                                mutations.add(put);
                             }
+                            if (del != null) {
+                                setMutationAttributes(del, uuidValue);
+                                mutations.add(del);
+                            }
+                            uuidValue = commitIfReady(uuidValue, mutations);
+                        } else {
+                            indexKeyToDataPutMap
+                                    .put(getIndexRowKey(indexMaintainer, put), put);
                         }
                         rowCount++;
-                        indexKeyToDataPutMap
-                                .put(getIndexRowKey(indexMaintainer, put), put);
+
                     }
                 } while (hasMore && rowCount < pageSizeInRows);
-                verifyIndex();
+                if (verify) {
+                    verifyIndex();
+                } else if (!mutations.isEmpty()) {
+                    ungroupedAggregateRegionObserver.checkForRegionClosing();

Review comment:
       OK, I see below that we're now using IndexerRegionScanner more broadly. Is this necessary -- just curious if it's worth doing a lot of refactoring and requisite testing on an index framework we know we want to deprecate soon. If it's actually harder to avoid the refactoring while touching UARO and the new indexing classes, that's fine. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org