You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by vi...@apache.org on 2019/02/25 19:49:57 UTC

[phoenix] branch 4.x-HBase-1.2 updated: PHOENIX-5137 check region close before commiting a batch for index rebuild

This is an automated email from the ASF dual-hosted git repository.

vincentpoon pushed a commit to branch 4.x-HBase-1.2
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x-HBase-1.2 by this push:
     new 1f493ff  PHOENIX-5137 check region close before commiting a batch for index rebuild
1f493ff is described below

commit 1f493ff38b2b52d5d6a268c8ee2a1a1ef9d3103b
Author: Kiran Kumar Maturi <ma...@gmail.com>
AuthorDate: Fri Feb 22 09:45:13 2019 +0530

    PHOENIX-5137 check region close before commiting a batch for index rebuild
---
 .../UngroupedAggregateRegionObserver.java          | 30 +++++++++++++---------
 1 file changed, 18 insertions(+), 12 deletions(-)

diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index cab13f1..5923a75 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -263,7 +263,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
           return;
       }
 
-        Mutation[] mutationArray = new Mutation[mutations.size()];
+       Mutation[] mutationArray = new Mutation[mutations.size()];
       // When memstore size reaches blockingMemstoreSize we are waiting 3 seconds for the
       // flush happen which decrease the memstore size and then writes allowed on the region.
       for (int i = 0; blockingMemstoreSize > 0 && region.getMemstoreSize() > blockingMemstoreSize && i < 30; i++) {
@@ -374,6 +374,17 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
             super.clear();
         }
     }
+
+   private long getBlockingMemstoreSize(Region region, Configuration conf) {
+       long flushSize = region.getTableDesc().getMemStoreFlushSize();
+
+       if (flushSize <= 0) {
+           flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
+                   HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
+       }
+       return flushSize * (conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER,
+                   HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER)-1);
+   }
     
     @Override
     protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws IOException, SQLException {
@@ -499,12 +510,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         MutationList mutations = new MutationList();
         boolean needToWrite = false;
         Configuration conf = env.getConfiguration();
-        long flushSize = region.getTableDesc().getMemStoreFlushSize();
-
-        if (flushSize <= 0) {
-            flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
-                    HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
-        }
 
         /**
          * Slow down the writes if the memstore size more than
@@ -512,9 +517,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
          * bytes. This avoids flush storm to hdfs for cases like index building where reads and
          * write happen to all the table regions in the server.
          */
-        final long blockingMemStoreSize = flushSize * (
-                conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER,
-                        HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER)-1) ;
+        final long blockingMemStoreSize = getBlockingMemstoreSize(region, conf) ;
 
         boolean buildLocalIndex = indexMaintainers != null && dataColumns==null && !localIndexScan;
         if(buildLocalIndex) {
@@ -1060,6 +1063,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
             int maxBatchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
             long maxBatchSizeBytes = config.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB,
                 QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES);
+            final long blockingMemstoreSize = getBlockingMemstoreSize(region, config);
             MutationList mutations = new MutationList(maxBatchSize);
             region.startRegionOperation();
             byte[] uuidValue = ServerCacheClient.generateId();
@@ -1101,7 +1105,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                             }
                         }
                         if (ServerUtil.readyToCommit(mutations.size(), mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
-                            commitBatchWithRetries(region, mutations, -1);
+                            checkForRegionClosingOrSplitting();
+                            commitBatchWithRetries(region, mutations, blockingMemstoreSize);
                             uuidValue = ServerCacheClient.generateId();
                             mutations.clear();
                         }
@@ -1110,7 +1115,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                     
                 } while (hasMore);
                 if (!mutations.isEmpty()) {
-                    commitBatchWithRetries(region, mutations, -1);
+                    checkForRegionClosingOrSplitting();
+                    commitBatchWithRetries(region, mutations, blockingMemstoreSize);
                 }
             }
         } catch (IOException e) {