You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ap...@apache.org on 2017/07/24 23:24:05 UTC

[4/4] phoenix git commit: PHOENIX-3997 UngroupedAggregateRegionObserver.commitBatchWithHTable() should not check the memstore size and wait for flush.

PHOENIX-3997 UngroupedAggregateRegionObserver.commitBatchWithHTable() should not check the memstore size and wait for flush.

Signed-off-by: Andrew Purtell <ap...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/318dd230
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/318dd230
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/318dd230

Branch: refs/heads/4.x-HBase-0.98
Commit: 318dd23031382200d92781fb11913464c9d442a6
Parents: e18c393
Author: Geoffrey Jacoby <ge...@careerbuilder.com>
Authored: Thu Jul 13 13:34:54 2017 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Mon Jul 24 15:56:33 2017 -0700

----------------------------------------------------------------------
 .../UngroupedAggregateRegionObserver.java       | 101 ++++++++++---------
 1 file changed, 54 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/318dd230/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 70ed5e5..1571822 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -200,21 +200,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
             InterRegionServerIndexRpcControllerFactory.class, RpcControllerFactory.class);
     }
 
-    private void commitBatch(HRegion region, List<Mutation> mutations, byte[] indexUUID, long blockingMemstoreSize,
-            byte[] indexMaintainersPtr, byte[] txState, boolean useIndexProto) throws IOException {
+    private void commitBatch(HRegion region, List<Mutation> mutations, long blockingMemstoreSize) throws IOException {
       if (mutations.isEmpty()) {
-    	  return;
-      }
-      for (Mutation m : mutations) {
-         if (indexMaintainersPtr != null) {
-             m.setAttribute(useIndexProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMaintainersPtr);
-         }
-         if (indexUUID != null) {
-        	 m.setAttribute(PhoenixIndexCodec.INDEX_UUID, indexUUID);
-         }
-         if (txState != null) {
-             m.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
-         }
+          return;
       }
       Mutation[] mutationArray = new Mutation[mutations.size()];
       // When memstore size reaches blockingMemstoreSize we are waiting 3 seconds for the
@@ -233,11 +221,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
       region.batchMutate(mutations.toArray(mutationArray), HConstants.NO_NONCE, HConstants.NO_NONCE);
     }
     
-    private void commitBatchWithHTable(HTable table, HRegion region, List<Mutation> mutations, byte[] indexUUID,
-            long blockingMemstoreSize, byte[] indexMaintainersPtr, byte[] txState, boolean useIndexProto) throws IOException {
-    	if (mutations.isEmpty()) {
-      	  return;
-    	}
+    private void setIndexAndTransactionProperties(List<Mutation> mutations, byte[] indexUUID, byte[] indexMaintainersPtr, byte[] txState, boolean useIndexProto) {
         for (Mutation m : mutations) {
             if (indexMaintainersPtr != null) {
                 m.setAttribute(useIndexProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMaintainersPtr);
@@ -246,19 +230,14 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                 m.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
             }
             if (indexUUID != null) {
-            	m.setAttribute(PhoenixIndexCodec.INDEX_UUID, indexUUID);
+                m.setAttribute(PhoenixIndexCodec.INDEX_UUID, indexUUID);
             }
         }
-        // When memstore size reaches blockingMemstoreSize we are waiting 3 seconds for the
-        // flush happen which decrease the memstore size and then writes allowed on the region.
-        for (int i = 0; region.getMemstoreSize().get() > blockingMemstoreSize && i < 30; i++) {
-            try {
-                checkForRegionClosing();
-                Thread.sleep(100);
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                throw new IOException(e);
-            }
+    }
+
+    private void commitBatchWithHTable(HTable table, List<Mutation> mutations) throws IOException {
+        if (mutations.isEmpty()) {
+            return;
         }
         logger.debug("Committing batch of " + mutations.size() + " mutations for " + table);
         try {
@@ -403,7 +382,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         byte[] deleteCF = null;
         byte[] emptyCF = null;
         HTable targetHTable = null;
-        boolean areMutationInSameRegion = true;
+        boolean isPKChanging = false;
         ImmutableBytesWritable ptr = new ImmutableBytesWritable();
         if (upsertSelectTable != null) {
             isUpsert = true;
@@ -411,9 +390,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
             targetHTable = new HTable(upsertSelectConfig, projectedTable.getPhysicalName().getBytes());
             selectExpressions = deserializeExpressions(scan.getAttribute(BaseScannerRegionObserver.UPSERT_SELECT_EXPRS));
             values = new byte[projectedTable.getPKColumns().size()][];
-            areMutationInSameRegion = Bytes.compareTo(targetHTable.getTableName(),
-                    region.getTableDesc().getTableName().getName()) == 0
-                    && !ExpressionUtil.isPkPositionChanging(new TableRef(projectedTable), selectExpressions);
+            isPKChanging = ExpressionUtil.isPkPositionChanging(new TableRef(projectedTable), selectExpressions);
             
         } else {
             byte[] isDeleteAgg = scan.getAttribute(BaseScannerRegionObserver.DELETE_AGG);
@@ -723,15 +700,15 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                             }
                         }
                         if (ServerUtil.readyToCommit(mutations.size(), mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
-                            commit(region, mutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr, txState,
-                                    areMutationInSameRegion, targetHTable, useIndexProto);
+                            commit(region, mutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr,
+                                txState, targetHTable, useIndexProto, isPKChanging);
                             mutations.clear();
                         }
                         // Commit in batches based on UPSERT_BATCH_SIZE_BYTES_ATTRIB in config
 
                         if (ServerUtil.readyToCommit(indexMutations.size(), indexMutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
-                            commitBatch(region, indexMutations, null, blockingMemStoreSize, null, txState,
-                                    useIndexProto);
+                            setIndexAndTransactionProperties(indexMutations, indexUUID, indexMaintainersPtr, txState, useIndexProto);
+                            commitBatch(region, indexMutations, blockingMemStoreSize);
                             indexMutations.clear();
                         }
                         aggregators.aggregate(rowAggregators, result);
@@ -740,12 +717,12 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                 } while (hasMore);
                 if (!mutations.isEmpty()) {
                     commit(region, mutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr, txState,
-                            areMutationInSameRegion, targetHTable, useIndexProto);
+                        targetHTable, useIndexProto, isPKChanging);
                     mutations.clear();
                 }
 
                 if (!indexMutations.isEmpty()) {
-                    commitBatch(region, indexMutations, null, blockingMemStoreSize, indexMaintainersPtr, txState, useIndexProto);
+                    commitBatch(region, indexMutations, blockingMemStoreSize);
                     indexMutations.clear();
                 }
             }
@@ -801,18 +778,48 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
 
     }
 
-    private void commit(HRegion region, List<Mutation> mutations, byte[] indexUUID, long blockingMemstoreSize,
-            byte[] indexMaintainersPtr, byte[] txState, boolean areMutationsForSameRegion, HTable hTable, boolean useIndexProto)
+    private void commit(HRegion region, List<Mutation> mutations, byte[] indexUUID, long blockingMemStoreSize,
+            byte[] indexMaintainersPtr, byte[] txState, HTable targetHTable, boolean useIndexProto,
+            boolean isPKChanging)
             throws IOException {
-        if (!areMutationsForSameRegion) {
-            assert hTable != null;// table cannot be null
-            commitBatchWithHTable(hTable, region, mutations, indexUUID, blockingMemstoreSize, indexMaintainersPtr,
-                    txState, useIndexProto);
+        List<Mutation> localRegionMutations = Lists.newArrayList();
+        List<Mutation> remoteRegionMutations = Lists.newArrayList();
+        setIndexAndTransactionProperties(mutations, indexUUID, indexMaintainersPtr, txState, useIndexProto);
+        separateLocalAndRemoteMutations(targetHTable, region, mutations, localRegionMutations, remoteRegionMutations,
+            isPKChanging);
+        commitBatch(region, localRegionMutations, blockingMemStoreSize);
+        commitBatchWithHTable(targetHTable, remoteRegionMutations);
+        localRegionMutations.clear();
+        remoteRegionMutations.clear();
+    }
+
+    private void separateLocalAndRemoteMutations(HTable targetHTable, HRegion region, List<Mutation> mutations,
+            List<Mutation> localRegionMutations, List<Mutation> remoteRegionMutations,
+            boolean isPKChanging){
+        boolean areMutationsInSameTable = areMutationsInSameTable(targetHTable, region);
+        //if we're writing to the same table, but the PK can change, that means that some
+        //mutations might be in our current region, and others in a different one.
+        if (areMutationsInSameTable && isPKChanging) {
+            HRegionInfo regionInfo = region.getRegionInfo();
+            for (Mutation mutation : mutations){
+                if (regionInfo.containsRow(mutation.getRow())){
+                    localRegionMutations.add(mutation);
+                } else {
+                    remoteRegionMutations.add(mutation);
+                }
+            }
+        } else if (areMutationsInSameTable && !isPKChanging) {
+            localRegionMutations.addAll(mutations);
         } else {
-            commitBatch(region, mutations, indexUUID, blockingMemstoreSize, indexMaintainersPtr, txState, useIndexProto);
+            remoteRegionMutations.addAll(mutations);
         }
     }
 
+    private boolean areMutationsInSameTable(HTable targetHTable, HRegion region) {
+        return (targetHTable == null || Bytes.compareTo(targetHTable.getTableName(),
+            region.getTableDesc().getTableName().getName()) == 0);
+    }
+
     @Override
     public InternalScanner preCompact(final ObserverContext<RegionCoprocessorEnvironment> c,
             final Store store, final InternalScanner scanner, final ScanType scanType)