You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ap...@apache.org on 2017/07/24 23:24:05 UTC
[4/4] phoenix git commit: PHOENIX-3997
UngroupedAggregateRegionObserver.commitBatchWithHTable() should not check the
memstore size and wait for flush.
PHOENIX-3997 UngroupedAggregateRegionObserver.commitBatchWithHTable() should not check the memstore size and wait for flush.
Signed-off-by: Andrew Purtell <ap...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/318dd230
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/318dd230
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/318dd230
Branch: refs/heads/4.x-HBase-0.98
Commit: 318dd23031382200d92781fb11913464c9d442a6
Parents: e18c393
Author: Geoffrey Jacoby <ge...@careerbuilder.com>
Authored: Thu Jul 13 13:34:54 2017 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Mon Jul 24 15:56:33 2017 -0700
----------------------------------------------------------------------
.../UngroupedAggregateRegionObserver.java | 101 ++++++++++---------
1 file changed, 54 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/318dd230/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 70ed5e5..1571822 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -200,21 +200,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
InterRegionServerIndexRpcControllerFactory.class, RpcControllerFactory.class);
}
- private void commitBatch(HRegion region, List<Mutation> mutations, byte[] indexUUID, long blockingMemstoreSize,
- byte[] indexMaintainersPtr, byte[] txState, boolean useIndexProto) throws IOException {
+ private void commitBatch(HRegion region, List<Mutation> mutations, long blockingMemstoreSize) throws IOException {
if (mutations.isEmpty()) {
- return;
- }
- for (Mutation m : mutations) {
- if (indexMaintainersPtr != null) {
- m.setAttribute(useIndexProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMaintainersPtr);
- }
- if (indexUUID != null) {
- m.setAttribute(PhoenixIndexCodec.INDEX_UUID, indexUUID);
- }
- if (txState != null) {
- m.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
- }
+ return;
}
Mutation[] mutationArray = new Mutation[mutations.size()];
// When memstore size reaches blockingMemstoreSize we are waiting 3 seconds for the
@@ -233,11 +221,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
region.batchMutate(mutations.toArray(mutationArray), HConstants.NO_NONCE, HConstants.NO_NONCE);
}
- private void commitBatchWithHTable(HTable table, HRegion region, List<Mutation> mutations, byte[] indexUUID,
- long blockingMemstoreSize, byte[] indexMaintainersPtr, byte[] txState, boolean useIndexProto) throws IOException {
- if (mutations.isEmpty()) {
- return;
- }
+ private void setIndexAndTransactionProperties(List<Mutation> mutations, byte[] indexUUID, byte[] indexMaintainersPtr, byte[] txState, boolean useIndexProto) {
for (Mutation m : mutations) {
if (indexMaintainersPtr != null) {
m.setAttribute(useIndexProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMaintainersPtr);
@@ -246,19 +230,14 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
m.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
}
if (indexUUID != null) {
- m.setAttribute(PhoenixIndexCodec.INDEX_UUID, indexUUID);
+ m.setAttribute(PhoenixIndexCodec.INDEX_UUID, indexUUID);
}
}
- // When memstore size reaches blockingMemstoreSize we are waiting 3 seconds for the
- // flush happen which decrease the memstore size and then writes allowed on the region.
- for (int i = 0; region.getMemstoreSize().get() > blockingMemstoreSize && i < 30; i++) {
- try {
- checkForRegionClosing();
- Thread.sleep(100);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IOException(e);
- }
+ }
+
+ private void commitBatchWithHTable(HTable table, List<Mutation> mutations) throws IOException {
+ if (mutations.isEmpty()) {
+ return;
}
logger.debug("Committing batch of " + mutations.size() + " mutations for " + table);
try {
@@ -403,7 +382,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
byte[] deleteCF = null;
byte[] emptyCF = null;
HTable targetHTable = null;
- boolean areMutationInSameRegion = true;
+ boolean isPKChanging = false;
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
if (upsertSelectTable != null) {
isUpsert = true;
@@ -411,9 +390,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
targetHTable = new HTable(upsertSelectConfig, projectedTable.getPhysicalName().getBytes());
selectExpressions = deserializeExpressions(scan.getAttribute(BaseScannerRegionObserver.UPSERT_SELECT_EXPRS));
values = new byte[projectedTable.getPKColumns().size()][];
- areMutationInSameRegion = Bytes.compareTo(targetHTable.getTableName(),
- region.getTableDesc().getTableName().getName()) == 0
- && !ExpressionUtil.isPkPositionChanging(new TableRef(projectedTable), selectExpressions);
+ isPKChanging = ExpressionUtil.isPkPositionChanging(new TableRef(projectedTable), selectExpressions);
} else {
byte[] isDeleteAgg = scan.getAttribute(BaseScannerRegionObserver.DELETE_AGG);
@@ -723,15 +700,15 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
}
}
if (ServerUtil.readyToCommit(mutations.size(), mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
- commit(region, mutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr, txState,
- areMutationInSameRegion, targetHTable, useIndexProto);
+ commit(region, mutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr,
+ txState, targetHTable, useIndexProto, isPKChanging);
mutations.clear();
}
// Commit in batches based on UPSERT_BATCH_SIZE_BYTES_ATTRIB in config
if (ServerUtil.readyToCommit(indexMutations.size(), indexMutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
- commitBatch(region, indexMutations, null, blockingMemStoreSize, null, txState,
- useIndexProto);
+ setIndexAndTransactionProperties(indexMutations, indexUUID, indexMaintainersPtr, txState, useIndexProto);
+ commitBatch(region, indexMutations, blockingMemStoreSize);
indexMutations.clear();
}
aggregators.aggregate(rowAggregators, result);
@@ -740,12 +717,12 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
} while (hasMore);
if (!mutations.isEmpty()) {
commit(region, mutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr, txState,
- areMutationInSameRegion, targetHTable, useIndexProto);
+ targetHTable, useIndexProto, isPKChanging);
mutations.clear();
}
if (!indexMutations.isEmpty()) {
- commitBatch(region, indexMutations, null, blockingMemStoreSize, indexMaintainersPtr, txState, useIndexProto);
+ commitBatch(region, indexMutations, blockingMemStoreSize);
indexMutations.clear();
}
}
@@ -801,18 +778,48 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
}
- private void commit(HRegion region, List<Mutation> mutations, byte[] indexUUID, long blockingMemstoreSize,
- byte[] indexMaintainersPtr, byte[] txState, boolean areMutationsForSameRegion, HTable hTable, boolean useIndexProto)
+ private void commit(HRegion region, List<Mutation> mutations, byte[] indexUUID, long blockingMemStoreSize,
+ byte[] indexMaintainersPtr, byte[] txState, HTable targetHTable, boolean useIndexProto,
+ boolean isPKChanging)
throws IOException {
- if (!areMutationsForSameRegion) {
- assert hTable != null;// table cannot be null
- commitBatchWithHTable(hTable, region, mutations, indexUUID, blockingMemstoreSize, indexMaintainersPtr,
- txState, useIndexProto);
+ List<Mutation> localRegionMutations = Lists.newArrayList();
+ List<Mutation> remoteRegionMutations = Lists.newArrayList();
+ setIndexAndTransactionProperties(mutations, indexUUID, indexMaintainersPtr, txState, useIndexProto);
+ separateLocalAndRemoteMutations(targetHTable, region, mutations, localRegionMutations, remoteRegionMutations,
+ isPKChanging);
+ commitBatch(region, localRegionMutations, blockingMemStoreSize);
+ commitBatchWithHTable(targetHTable, remoteRegionMutations);
+ localRegionMutations.clear();
+ remoteRegionMutations.clear();
+ }
+
+ private void separateLocalAndRemoteMutations(HTable targetHTable, HRegion region, List<Mutation> mutations,
+ List<Mutation> localRegionMutations, List<Mutation> remoteRegionMutations,
+ boolean isPKChanging){
+ boolean areMutationsInSameTable = areMutationsInSameTable(targetHTable, region);
+ //if we're writing to the same table, but the PK can change, that means that some
+ //mutations might be in our current region, and others in a different one.
+ if (areMutationsInSameTable && isPKChanging) {
+ HRegionInfo regionInfo = region.getRegionInfo();
+ for (Mutation mutation : mutations){
+ if (regionInfo.containsRow(mutation.getRow())){
+ localRegionMutations.add(mutation);
+ } else {
+ remoteRegionMutations.add(mutation);
+ }
+ }
+ } else if (areMutationsInSameTable && !isPKChanging) {
+ localRegionMutations.addAll(mutations);
} else {
- commitBatch(region, mutations, indexUUID, blockingMemstoreSize, indexMaintainersPtr, txState, useIndexProto);
+ remoteRegionMutations.addAll(mutations);
}
}
+ private boolean areMutationsInSameTable(HTable targetHTable, HRegion region) {
+ return (targetHTable == null || Bytes.compareTo(targetHTable.getTableName(),
+ region.getTableDesc().getTableName().getName()) == 0);
+ }
+
@Override
public InternalScanner preCompact(final ObserverContext<RegionCoprocessorEnvironment> c,
final Store store, final InternalScanner scanner, final ScanType scanType)