You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ap...@apache.org on 2017/07/24 23:24:03 UTC
[2/4] phoenix git commit: PHOENIX-3997
UngroupedAggregateRegionObserver.commitBatchWithHTable() should not check the
memstore size and wait for flush.
PHOENIX-3997 UngroupedAggregateRegionObserver.commitBatchWithHTable() should not check the memstore size and wait for flush.
Signed-off-by: Andrew Purtell <ap...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/54c28d19
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/54c28d19
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/54c28d19
Branch: refs/heads/4.x-HBase-1.2
Commit: 54c28d19698b18d2e5d86d61e37a104391ff3392
Parents: 2913e10
Author: Geoffrey Jacoby <ge...@careerbuilder.com>
Authored: Thu Jul 13 13:34:54 2017 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Mon Jul 24 15:41:42 2017 -0700
----------------------------------------------------------------------
.../UngroupedAggregateRegionObserver.java | 129 ++++++++++---------
1 file changed, 68 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/54c28d19/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index dc2ae3f..a07b5d0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -200,24 +200,12 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
InterRegionServerIndexRpcControllerFactory.class, RpcControllerFactory.class);
}
- private void commitBatch(Region region, List<Mutation> mutations, byte[] indexUUID, long blockingMemstoreSize,
- byte[] indexMaintainersPtr, byte[] txState, boolean useIndexProto) throws IOException {
+ private void commitBatch(Region region, List<Mutation> mutations, long blockingMemstoreSize) throws IOException {
if (mutations.isEmpty()) {
- return;
+ return;
}
- for (Mutation m : mutations) {
- if (indexMaintainersPtr != null) {
- m.setAttribute(useIndexProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMaintainersPtr);
- }
- if (indexUUID != null) {
- m.setAttribute(PhoenixIndexCodec.INDEX_UUID, indexUUID);
- }
- if (txState != null) {
- m.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
- }
- }
-
- Mutation[] mutationArray = new Mutation[mutations.size()];
+
+ Mutation[] mutationArray = new Mutation[mutations.size()];
// When memstore size reaches blockingMemstoreSize we are waiting 3 seconds for the
// flush happen which decrease the memstore size and then writes allowed on the region.
for (int i = 0; region.getMemstoreSize() > blockingMemstoreSize && i < 30; i++) {
@@ -233,34 +221,26 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
logger.debug("Committing bactch of " + mutations.size() + " mutations for " + region.getRegionInfo().getTable().getNameAsString());
region.batchMutate(mutations.toArray(mutationArray), HConstants.NO_NONCE, HConstants.NO_NONCE);
}
-
- private void commitBatchWithHTable(HTable table, Region region, List<Mutation> mutations, byte[] indexUUID,
- long blockingMemstoreSize, byte[] indexMaintainersPtr, byte[] txState, boolean useIndexProto) throws IOException {
- if (mutations.isEmpty()) {
- return;
- }
+
+ private void setIndexAndTransactionProperties(List<Mutation> mutations, byte[] indexUUID, byte[] indexMaintainersPtr, byte[] txState, boolean useIndexProto) {
for (Mutation m : mutations) {
- if (indexMaintainersPtr != null) {
- m.setAttribute(useIndexProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMaintainersPtr);
- }
- if (txState != null) {
- m.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
- }
- if (indexUUID != null) {
- m.setAttribute(PhoenixIndexCodec.INDEX_UUID, indexUUID);
- }
- }
- // When memstore size reaches blockingMemstoreSize we are waiting 3 seconds for the
- // flush happen which decrease the memstore size and then writes allowed on the region.
- for (int i = 0; region.getMemstoreSize() > blockingMemstoreSize && i < 30; i++) {
- try {
- checkForRegionClosing();
- Thread.sleep(100);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IOException(e);
- }
+ if (indexMaintainersPtr != null) {
+ m.setAttribute(useIndexProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMaintainersPtr);
+ }
+ if (indexUUID != null) {
+ m.setAttribute(PhoenixIndexCodec.INDEX_UUID, indexUUID);
+ }
+ if (txState != null) {
+ m.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
+ }
}
+ }
+
+ private void commitBatchWithHTable(HTable table, List<Mutation> mutations) throws IOException {
+ if (mutations.isEmpty()) {
+ return;
+ }
+
logger.debug("Committing batch of " + mutations.size() + " mutations for " + table);
try {
table.batch(mutations);
@@ -404,7 +384,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
byte[] deleteCF = null;
byte[] emptyCF = null;
HTable targetHTable = null;
- boolean areMutationInSameRegion = true;
+ boolean isPKChanging = false;
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
if (upsertSelectTable != null) {
isUpsert = true;
@@ -412,10 +392,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
targetHTable = new HTable(upsertSelectConfig, projectedTable.getPhysicalName().getBytes());
selectExpressions = deserializeExpressions(scan.getAttribute(BaseScannerRegionObserver.UPSERT_SELECT_EXPRS));
values = new byte[projectedTable.getPKColumns().size()][];
- areMutationInSameRegion = Bytes.compareTo(targetHTable.getTableName(),
- region.getTableDesc().getTableName().getName()) == 0
- && !ExpressionUtil.isPkPositionChanging(new TableRef(projectedTable), selectExpressions);
-
+ isPKChanging = ExpressionUtil.isPkPositionChanging(new TableRef(projectedTable), selectExpressions);
} else {
byte[] isDeleteAgg = scan.getAttribute(BaseScannerRegionObserver.DELETE_AGG);
isDelete = isDeleteAgg != null && Bytes.compareTo(PDataType.TRUE_BYTES, isDeleteAgg) == 0;
@@ -724,15 +701,15 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
}
}
if (ServerUtil.readyToCommit(mutations.size(), mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
- commit(region, mutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr, txState,
- areMutationInSameRegion, targetHTable, useIndexProto);
+ commit(region, mutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr,
+ txState, targetHTable, useIndexProto, isPKChanging);
mutations.clear();
}
// Commit in batches based on UPSERT_BATCH_SIZE_BYTES_ATTRIB in config
if (ServerUtil.readyToCommit(indexMutations.size(), indexMutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
- commitBatch(region, indexMutations, null, blockingMemStoreSize, null, txState,
- useIndexProto);
+ setIndexAndTransactionProperties(indexMutations, indexUUID, indexMaintainersPtr, txState, useIndexProto);
+ commitBatch(region, indexMutations, blockingMemStoreSize);
indexMutations.clear();
}
aggregators.aggregate(rowAggregators, result);
@@ -741,12 +718,12 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
} while (hasMore);
if (!mutations.isEmpty()) {
commit(region, mutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr, txState,
- areMutationInSameRegion, targetHTable, useIndexProto);
+ targetHTable, useIndexProto, isPKChanging);
mutations.clear();
}
if (!indexMutations.isEmpty()) {
- commitBatch(region, indexMutations, null, blockingMemStoreSize, indexMaintainersPtr, txState, useIndexProto);
+ commitBatch(region, indexMutations, blockingMemStoreSize);
indexMutations.clear();
}
}
@@ -802,18 +779,48 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
}
- private void commit(Region region, List<Mutation> mutations, byte[] indexUUID, long blockingMemstoreSize,
- byte[] indexMaintainersPtr, byte[] txState, boolean areMutationsForSameRegion, HTable hTable, boolean useIndexProto)
+ private void commit(Region region, List<Mutation> mutations, byte[] indexUUID, long blockingMemStoreSize,
+ byte[] indexMaintainersPtr, byte[] txState, HTable targetHTable, boolean useIndexProto,
+ boolean isPKChanging)
throws IOException {
- if (!areMutationsForSameRegion) {
- assert hTable != null;// table cannot be null
- commitBatchWithHTable(hTable, region, mutations, indexUUID, blockingMemstoreSize, indexMaintainersPtr,
- txState, useIndexProto);
+ List<Mutation> localRegionMutations = Lists.newArrayList();
+ List<Mutation> remoteRegionMutations = Lists.newArrayList();
+ setIndexAndTransactionProperties(mutations, indexUUID, indexMaintainersPtr, txState, useIndexProto);
+ separateLocalAndRemoteMutations(targetHTable, region, mutations, localRegionMutations, remoteRegionMutations,
+ isPKChanging);
+ commitBatch(region, localRegionMutations, blockingMemStoreSize);
+ commitBatchWithHTable(targetHTable, remoteRegionMutations);
+ localRegionMutations.clear();
+ remoteRegionMutations.clear();
+ }
+
+ private void separateLocalAndRemoteMutations(HTable targetHTable, Region region, List<Mutation> mutations,
+ List<Mutation> localRegionMutations, List<Mutation> remoteRegionMutations,
+ boolean isPKChanging){
+ boolean areMutationsInSameTable = areMutationsInSameTable(targetHTable, region);
+ //if we're writing to the same table, but the PK can change, that means that some
+ //mutations might be in our current region, and others in a different one.
+ if (areMutationsInSameTable && isPKChanging) {
+ HRegionInfo regionInfo = region.getRegionInfo();
+ for (Mutation mutation : mutations){
+ if (regionInfo.containsRow(mutation.getRow())){
+ localRegionMutations.add(mutation);
+ } else {
+ remoteRegionMutations.add(mutation);
+ }
+ }
+ } else if (areMutationsInSameTable && !isPKChanging) {
+ localRegionMutations.addAll(mutations);
} else {
- commitBatch(region, mutations, indexUUID, blockingMemstoreSize, indexMaintainersPtr, txState, useIndexProto);
+ remoteRegionMutations.addAll(mutations);
}
}
-
+
+ private boolean areMutationsInSameTable(HTable targetHTable, Region region) {
+ return (targetHTable == null || Bytes.compareTo(targetHTable.getTableName(),
+ region.getTableDesc().getTableName().getName()) == 0);
+ }
+
@Override
public InternalScanner preCompact(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
final InternalScanner scanner, final ScanType scanType) throws IOException {