You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ap...@apache.org on 2017/07/24 23:24:04 UTC

[3/4] phoenix git commit: PHOENIX-3997 UngroupedAggregateRegionObserver.commitBatchWithHTable() should not check the memstore size and wait for flush.

PHOENIX-3997 UngroupedAggregateRegionObserver.commitBatchWithHTable() should not check the memstore size and wait for flush.

Signed-off-by: Andrew Purtell <ap...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/e9498bf4
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/e9498bf4
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/e9498bf4

Branch: refs/heads/4.x-HBase-1.1
Commit: e9498bf4704d438969e67557c7d45b3a76c65458
Parents: bd11d86
Author: Geoffrey Jacoby <ge...@careerbuilder.com>
Authored: Thu Jul 13 13:34:54 2017 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Mon Jul 24 15:42:20 2017 -0700

----------------------------------------------------------------------
 .../UngroupedAggregateRegionObserver.java       | 127 ++++++++++---------
 1 file changed, 67 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/e9498bf4/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index a949058..a07b5d0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -200,24 +200,12 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
             InterRegionServerIndexRpcControllerFactory.class, RpcControllerFactory.class);
     }
 
-    private void commitBatch(Region region, List<Mutation> mutations, byte[] indexUUID, long blockingMemstoreSize,
-            byte[] indexMaintainersPtr, byte[] txState, boolean useIndexProto) throws IOException {
+    private void commitBatch(Region region, List<Mutation> mutations, long blockingMemstoreSize) throws IOException {
       if (mutations.isEmpty()) {
-    	  return;
+          return;
       }
-      for (Mutation m : mutations) {
-         if (indexMaintainersPtr != null) {
-             m.setAttribute(useIndexProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMaintainersPtr);
-         }
-         if (indexUUID != null) {
-        	 m.setAttribute(PhoenixIndexCodec.INDEX_UUID, indexUUID);
-         }
-         if (txState != null) {
-             m.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
-         }
-      }
-      
-      Mutation[] mutationArray = new Mutation[mutations.size()];
+
+        Mutation[] mutationArray = new Mutation[mutations.size()];
       // When memstore size reaches blockingMemstoreSize we are waiting 3 seconds for the
       // flush happen which decrease the memstore size and then writes allowed on the region.
       for (int i = 0; region.getMemstoreSize() > blockingMemstoreSize && i < 30; i++) {
@@ -233,34 +221,26 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
       logger.debug("Committing bactch of " + mutations.size() + " mutations for " + region.getRegionInfo().getTable().getNameAsString());
       region.batchMutate(mutations.toArray(mutationArray), HConstants.NO_NONCE, HConstants.NO_NONCE);
     }
-    
-    private void commitBatchWithHTable(HTable table, Region region, List<Mutation> mutations, byte[] indexUUID,
-            long blockingMemstoreSize, byte[] indexMaintainersPtr, byte[] txState, boolean useIndexProto) throws IOException {
-    	if (mutations.isEmpty()) {
-      	  return;
-    	}
+
+    private void setIndexAndTransactionProperties(List<Mutation> mutations, byte[] indexUUID, byte[] indexMaintainersPtr, byte[] txState, boolean useIndexProto) {
         for (Mutation m : mutations) {
-            if (indexMaintainersPtr != null) {
-                m.setAttribute(useIndexProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMaintainersPtr);
-            }
-            if (txState != null) {
-                m.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
-            }
-            if (indexUUID != null) {
-            	m.setAttribute(PhoenixIndexCodec.INDEX_UUID, indexUUID);
-            }
-        }
-        // When memstore size reaches blockingMemstoreSize we are waiting 3 seconds for the
-        // flush happen which decrease the memstore size and then writes allowed on the region.
-        for (int i = 0; region.getMemstoreSize() > blockingMemstoreSize && i < 30; i++) {
-            try {
-                checkForRegionClosing();
-                Thread.sleep(100);
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                throw new IOException(e);
-            }
+           if (indexMaintainersPtr != null) {
+               m.setAttribute(useIndexProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMaintainersPtr);
+           }
+           if (indexUUID != null) {
+             m.setAttribute(PhoenixIndexCodec.INDEX_UUID, indexUUID);
+           }
+           if (txState != null) {
+               m.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
+           }
         }
+    }
+
+    private void commitBatchWithHTable(HTable table, List<Mutation> mutations) throws IOException {
+      if (mutations.isEmpty()) {
+          return;
+      }
+
         logger.debug("Committing batch of " + mutations.size() + " mutations for " + table);
         try {
             table.batch(mutations);
@@ -404,7 +384,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         byte[] deleteCF = null;
         byte[] emptyCF = null;
         HTable targetHTable = null;
-        boolean areMutationInSameRegion = true;
+        boolean isPKChanging = false;
         ImmutableBytesWritable ptr = new ImmutableBytesWritable();
         if (upsertSelectTable != null) {
             isUpsert = true;
@@ -412,10 +392,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
             targetHTable = new HTable(upsertSelectConfig, projectedTable.getPhysicalName().getBytes());
             selectExpressions = deserializeExpressions(scan.getAttribute(BaseScannerRegionObserver.UPSERT_SELECT_EXPRS));
             values = new byte[projectedTable.getPKColumns().size()][];
-            areMutationInSameRegion = Bytes.compareTo(targetHTable.getTableName(),
-                    region.getTableDesc().getTableName().getName()) == 0
-                    && !ExpressionUtil.isPkPositionChanging(new TableRef(projectedTable), selectExpressions);
-            
+            isPKChanging = ExpressionUtil.isPkPositionChanging(new TableRef(projectedTable), selectExpressions);
         } else {
             byte[] isDeleteAgg = scan.getAttribute(BaseScannerRegionObserver.DELETE_AGG);
             isDelete = isDeleteAgg != null && Bytes.compareTo(PDataType.TRUE_BYTES, isDeleteAgg) == 0;
@@ -724,15 +701,15 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                             }
                         }
                         if (ServerUtil.readyToCommit(mutations.size(), mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
-                            commit(region, mutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr, txState,
-                                    areMutationInSameRegion, targetHTable, useIndexProto);
+                            commit(region, mutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr,
+                                txState, targetHTable, useIndexProto, isPKChanging);
                             mutations.clear();
                         }
                         // Commit in batches based on UPSERT_BATCH_SIZE_BYTES_ATTRIB in config
 
                         if (ServerUtil.readyToCommit(indexMutations.size(), indexMutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
-                            commitBatch(region, indexMutations, null, blockingMemStoreSize, null, txState,
-                                    useIndexProto);
+                            setIndexAndTransactionProperties(indexMutations, indexUUID, indexMaintainersPtr, txState, useIndexProto);
+                            commitBatch(region, indexMutations, blockingMemStoreSize);
                             indexMutations.clear();
                         }
                         aggregators.aggregate(rowAggregators, result);
@@ -741,12 +718,12 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                 } while (hasMore);
                 if (!mutations.isEmpty()) {
                     commit(region, mutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr, txState,
-                            areMutationInSameRegion, targetHTable, useIndexProto);
+                        targetHTable, useIndexProto, isPKChanging);
                     mutations.clear();
                 }
 
                 if (!indexMutations.isEmpty()) {
-                    commitBatch(region, indexMutations, null, blockingMemStoreSize, indexMaintainersPtr, txState, useIndexProto);
+                    commitBatch(region, indexMutations, blockingMemStoreSize);
                     indexMutations.clear();
                 }
             }
@@ -802,18 +779,48 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
 
     }
 
-    private void commit(Region region, List<Mutation> mutations, byte[] indexUUID, long blockingMemstoreSize,
-            byte[] indexMaintainersPtr, byte[] txState, boolean areMutationsForSameRegion, HTable hTable, boolean useIndexProto)
+    private void commit(Region region, List<Mutation> mutations, byte[] indexUUID, long blockingMemStoreSize,
+            byte[] indexMaintainersPtr, byte[] txState, HTable targetHTable, boolean useIndexProto,
+                        boolean isPKChanging)
             throws IOException {
-        if (!areMutationsForSameRegion) {
-            assert hTable != null;// table cannot be null
-            commitBatchWithHTable(hTable, region, mutations, indexUUID, blockingMemstoreSize, indexMaintainersPtr,
-                    txState, useIndexProto);
+        List<Mutation> localRegionMutations = Lists.newArrayList();
+        List<Mutation> remoteRegionMutations = Lists.newArrayList();
+        setIndexAndTransactionProperties(mutations, indexUUID, indexMaintainersPtr, txState, useIndexProto);
+        separateLocalAndRemoteMutations(targetHTable, region, mutations, localRegionMutations, remoteRegionMutations,
+            isPKChanging);
+        commitBatch(region, localRegionMutations, blockingMemStoreSize);
+        commitBatchWithHTable(targetHTable, remoteRegionMutations);
+        localRegionMutations.clear();
+        remoteRegionMutations.clear();
+    }
+
+    private void separateLocalAndRemoteMutations(HTable targetHTable, Region region, List<Mutation> mutations,
+                                                 List<Mutation> localRegionMutations, List<Mutation> remoteRegionMutations,
+                                                 boolean isPKChanging){
+        boolean areMutationsInSameTable = areMutationsInSameTable(targetHTable, region);
+        //if we're writing to the same table, but the PK can change, that means that some
+        //mutations might be in our current region, and others in a different one.
+        if (areMutationsInSameTable && isPKChanging) {
+            HRegionInfo regionInfo = region.getRegionInfo();
+            for (Mutation mutation : mutations){
+                if (regionInfo.containsRow(mutation.getRow())){
+                    localRegionMutations.add(mutation);
+                } else {
+                    remoteRegionMutations.add(mutation);
+                }
+            }
+        } else if (areMutationsInSameTable && !isPKChanging) {
+            localRegionMutations.addAll(mutations);
         } else {
-            commitBatch(region, mutations, indexUUID, blockingMemstoreSize, indexMaintainersPtr, txState, useIndexProto);
+            remoteRegionMutations.addAll(mutations);
         }
     }
 
+    private boolean areMutationsInSameTable(HTable targetHTable, Region region) {
+        return (targetHTable == null || Bytes.compareTo(targetHTable.getTableName(),
+                region.getTableDesc().getTableName().getName()) == 0);
+    }
+
     @Override
     public InternalScanner preCompact(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
             final InternalScanner scanner, final ScanType scanType) throws IOException {