You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by vi...@apache.org on 2019/02/25 19:49:57 UTC
[phoenix] branch 4.x-HBase-1.2 updated: PHOENIX-5137 check region
close before commiting a batch for index rebuild
This is an automated email from the ASF dual-hosted git repository.
vincentpoon pushed a commit to branch 4.x-HBase-1.2
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x-HBase-1.2 by this push:
new 1f493ff PHOENIX-5137 check region close before commiting a batch for index rebuild
1f493ff is described below
commit 1f493ff38b2b52d5d6a268c8ee2a1a1ef9d3103b
Author: Kiran Kumar Maturi <ma...@gmail.com>
AuthorDate: Fri Feb 22 09:45:13 2019 +0530
PHOENIX-5137 check region close before commiting a batch for index rebuild
---
.../UngroupedAggregateRegionObserver.java | 30 +++++++++++++---------
1 file changed, 18 insertions(+), 12 deletions(-)
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index cab13f1..5923a75 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -263,7 +263,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
return;
}
- Mutation[] mutationArray = new Mutation[mutations.size()];
+ Mutation[] mutationArray = new Mutation[mutations.size()];
// When memstore size reaches blockingMemstoreSize we are waiting 3 seconds for the
// flush happen which decrease the memstore size and then writes allowed on the region.
for (int i = 0; blockingMemstoreSize > 0 && region.getMemstoreSize() > blockingMemstoreSize && i < 30; i++) {
@@ -374,6 +374,17 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
super.clear();
}
}
+
+ private long getBlockingMemstoreSize(Region region, Configuration conf) {
+ long flushSize = region.getTableDesc().getMemStoreFlushSize();
+
+ if (flushSize <= 0) {
+ flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
+ HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
+ }
+ return flushSize * (conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER,
+ HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER)-1);
+ }
@Override
protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws IOException, SQLException {
@@ -499,12 +510,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
MutationList mutations = new MutationList();
boolean needToWrite = false;
Configuration conf = env.getConfiguration();
- long flushSize = region.getTableDesc().getMemStoreFlushSize();
-
- if (flushSize <= 0) {
- flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
- HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
- }
/**
* Slow down the writes if the memstore size more than
@@ -512,9 +517,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
* bytes. This avoids flush storm to hdfs for cases like index building where reads and
* write happen to all the table regions in the server.
*/
- final long blockingMemStoreSize = flushSize * (
- conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER,
- HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER)-1) ;
+ final long blockingMemStoreSize = getBlockingMemstoreSize(region, conf) ;
boolean buildLocalIndex = indexMaintainers != null && dataColumns==null && !localIndexScan;
if(buildLocalIndex) {
@@ -1060,6 +1063,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
int maxBatchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
long maxBatchSizeBytes = config.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB,
QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES);
+ final long blockingMemstoreSize = getBlockingMemstoreSize(region, config);
MutationList mutations = new MutationList(maxBatchSize);
region.startRegionOperation();
byte[] uuidValue = ServerCacheClient.generateId();
@@ -1101,7 +1105,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
}
}
if (ServerUtil.readyToCommit(mutations.size(), mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
- commitBatchWithRetries(region, mutations, -1);
+ checkForRegionClosingOrSplitting();
+ commitBatchWithRetries(region, mutations, blockingMemstoreSize);
uuidValue = ServerCacheClient.generateId();
mutations.clear();
}
@@ -1110,7 +1115,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
} while (hasMore);
if (!mutations.isEmpty()) {
- commitBatchWithRetries(region, mutations, -1);
+ checkForRegionClosingOrSplitting();
+ commitBatchWithRetries(region, mutations, blockingMemstoreSize);
}
}
} catch (IOException e) {