You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by td...@apache.org on 2017/07/06 23:10:27 UTC
phoenix git commit: PHOENIX-3986
UngroupedAggregateRegionObserver.commitBatch() should set the index metadata
as an attribute on every mutation
Repository: phoenix
Updated Branches:
refs/heads/4.11-HBase-0.98 92b497449 -> b86177548
PHOENIX-3986 UngroupedAggregateRegionObserver.commitBatch() should set the index metadata as an attribute on every mutation
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b8617754
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b8617754
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b8617754
Branch: refs/heads/4.11-HBase-0.98
Commit: b86177548c62f31a580727da6f87d8a1f4fffd42
Parents: 92b4974
Author: Thomas <td...@salesforce.com>
Authored: Wed Jul 5 19:17:26 2017 -0700
Committer: Thomas <td...@salesforce.com>
Committed: Thu Jul 6 16:09:32 2017 -0700
----------------------------------------------------------------------
.../UpsertSelectOverlappingBatchesIT.java | 153 +++++++++++++++++++
.../UngroupedAggregateRegionObserver.java | 58 ++++---
.../org/apache/phoenix/util/ServerUtil.java | 4 +-
3 files changed, 183 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b8617754/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java b/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java
new file mode 100644
index 0000000..53346b9
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you maynot use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicablelaw or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class UpsertSelectOverlappingBatchesIT extends BaseUniqueNamesOwnClusterIT {
+
+ @BeforeClass
+ public static void doSetup() throws Exception {
+ Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(3);
+ serverProps.put("hbase.coprocessor.region.classes", SlowBatchRegionObserver.class.getName());
+ serverProps.put("hbase.rowlock.wait.duration", "5000");
+ serverProps.put(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, "100");
+ Map<String,String> clientProps = Maps.newHashMapWithExpectedSize(1);
+ setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
+ }
+
+ private class UpsertSelectRunner implements Callable<Boolean> {
+ private final String dataTable;
+ private final int minIndex;
+ private final int maxIndex;
+ private final int numLoop;
+
+ public UpsertSelectRunner (String dataTable, int minIndex, int maxIndex, int numLoop) {
+ this.dataTable = dataTable;
+ this.minIndex = minIndex;
+ this.maxIndex = maxIndex;
+ this.numLoop = numLoop;
+ }
+
+ @Override
+ public Boolean call() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection myConn = DriverManager.getConnection(getUrl(), props)) {
+ myConn.setAutoCommit(true);
+ String time = String.valueOf(System.currentTimeMillis());
+ String dml = "UPSERT INTO " + dataTable + " SELECT k, v1 || '" + time + "', v2 || '" + time
+ + "' FROM " + dataTable + " WHERE k >= " + minIndex + " AND k < " + maxIndex;
+ myConn.setAutoCommit(true);
+ for (int j = 0; j < numLoop; ++j) {
+ myConn.createStatement().execute(dml);
+ }
+ return true;
+ }
+ }
+
+ }
+
+ @Test
+ public void testUpsertSelectSameBatchConcurrently() throws Exception {
+ final String dataTable = generateUniqueName();
+ final String index = "IDX_" + dataTable;
+ // create the table and ensure its empty
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = driver.connect(url, props);
+ conn.createStatement()
+ .execute("CREATE TABLE " + dataTable + " (k INTEGER NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
+ // create the index and ensure its empty as well
+ conn.createStatement().execute("CREATE INDEX " + index + " ON " + dataTable + " (v1)");
+
+ conn = DriverManager.getConnection(getUrl(), props);
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + dataTable + " VALUES(?,?,?)");
+ conn.setAutoCommit(false);
+ for (int i = 0; i < 100; i++) {
+ stmt.setInt(1, i);
+ stmt.setString(2, "v1" + i);
+ stmt.setString(3, "v2" + i);
+ stmt.execute();
+ }
+ conn.commit();
+
+ int numUpsertSelectRunners = 5;
+ ExecutorService exec = Executors.newFixedThreadPool(numUpsertSelectRunners);
+ CompletionService<Boolean> completionService = new ExecutorCompletionService<Boolean>(exec);
+ List<Future<Boolean>> futures = Lists.newArrayListWithExpectedSize(numUpsertSelectRunners);
+ // run one UPSERT SELECT for 100 rows (that locks the rows for a long time)
+ futures.add(completionService.submit(new UpsertSelectRunner(dataTable, 0, 105, 1)));
+ // run four UPSERT SELECTS for 5 rows (that overlap with slow running UPSERT SELECT)
+ for (int i = 0; i < 100; i += 25) {
+ futures.add(completionService.submit(new UpsertSelectRunner(dataTable, i, i+25, 5)));
+ }
+ int received = 0;
+ while (received < futures.size()) {
+ Future<Boolean> resultFuture = completionService.take();
+ Boolean result = resultFuture.get();
+ received++;
+ assertTrue(result);
+ }
+ exec.shutdownNow();
+ conn.close();
+ }
+
+ public static class SlowBatchRegionObserver extends SimpleRegionObserver {
+ @Override
+ public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
+ // model a slow batch that takes a long time
+ if (miniBatchOp.size()==100) {
+ try {
+ Thread.sleep(6000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b8617754/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 9f3ac69..19d0e66 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -185,17 +185,19 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
private void commitBatch(HRegion region, List<Mutation> mutations, byte[] indexUUID, long blockingMemstoreSize,
byte[] indexMaintainersPtr, byte[] txState, boolean useIndexProto) throws IOException {
- if (indexMaintainersPtr != null) {
- mutations.get(0).setAttribute(useIndexProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMaintainersPtr);
- }
-
- if (txState != null) {
- mutations.get(0).setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
- }
- if (indexUUID != null) {
- for (Mutation m : mutations) {
- m.setAttribute(PhoenixIndexCodec.INDEX_UUID, indexUUID);
- }
+ if (mutations.isEmpty()) {
+ return;
+ }
+ for (Mutation m : mutations) {
+ if (indexMaintainersPtr != null) {
+ m.setAttribute(useIndexProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMaintainersPtr);
+ }
+ if (indexUUID != null) {
+ m.setAttribute(PhoenixIndexCodec.INDEX_UUID, indexUUID);
+ }
+ if (txState != null) {
+ m.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
+ }
}
Mutation[] mutationArray = new Mutation[mutations.size()];
// When memstore size reaches blockingMemstoreSize we are waiting 3 seconds for the
@@ -216,17 +218,18 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
private void commitBatchWithHTable(HTable table, HRegion region, List<Mutation> mutations, byte[] indexUUID,
long blockingMemstoreSize, byte[] indexMaintainersPtr, byte[] txState, boolean useIndexProto) throws IOException {
-
- if (indexUUID != null) {
- // Need to add indexMaintainers for each mutation as table.batch can be distributed across servers
- for (Mutation m : mutations) {
- if (indexMaintainersPtr != null) {
- m.setAttribute(useIndexProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMaintainersPtr);
- }
- if (txState != null) {
- m.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
- }
- m.setAttribute(PhoenixIndexCodec.INDEX_UUID, indexUUID);
+ if (mutations.isEmpty()) {
+ return;
+ }
+ for (Mutation m : mutations) {
+ if (indexMaintainersPtr != null) {
+ m.setAttribute(useIndexProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMaintainersPtr);
+ }
+ if (txState != null) {
+ m.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
+ }
+ if (indexUUID != null) {
+ m.setAttribute(PhoenixIndexCodec.INDEX_UUID, indexUUID);
}
}
// When memstore size reaches blockingMemstoreSize we are waiting 3 seconds for the
@@ -702,14 +705,14 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
}
}
}
- if (ServerUtil.readyToCommit(rowCount, mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
+ if (ServerUtil.readyToCommit(mutations.size(), mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
commit(region, mutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr, txState,
areMutationInSameRegion, targetHTable, useIndexProto);
mutations.clear();
}
// Commit in batches based on UPSERT_BATCH_SIZE_BYTES_ATTRIB in config
- if (ServerUtil.readyToCommit(rowCount, indexMutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
+ if (ServerUtil.readyToCommit(indexMutations.size(), indexMutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
commitBatch(region, indexMutations, null, blockingMemStoreSize, null, txState,
useIndexProto);
indexMutations.clear();
@@ -793,11 +796,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
}
}
- private boolean readyToCommit(int rowCount, long mutationSize, int maxBatchSize, long maxBatchSizeBytes) {
- return maxBatchSize > 0 && rowCount > maxBatchSize
- || (maxBatchSizeBytes > 0 && mutationSize > maxBatchSizeBytes);
- }
-
@Override
public InternalScanner preCompact(final ObserverContext<RegionCoprocessorEnvironment> c,
final Store store, final InternalScanner scanner, final ScanType scanType)
@@ -893,7 +891,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
del.addDeleteMarker(cell);
}
}
- if (readyToCommit(rowCount, mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
+ if (ServerUtil.readyToCommit(mutations.size(), mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
region.batchMutate(mutations.toArray(new Mutation[mutations.size()]), HConstants.NO_NONCE,
HConstants.NO_NONCE);
uuidValue = ServerCacheClient.generateId();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b8617754/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
index 86d89bf..fe0937b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
@@ -219,8 +219,8 @@ public class ServerUtil {
}
public static boolean readyToCommit(int rowCount, long mutationSize, int maxBatchSize, long maxBatchSizeBytes) {
- return maxBatchSize > 0 && rowCount > maxBatchSize
- || (maxBatchSizeBytes > 0 && mutationSize > maxBatchSizeBytes);
+ return maxBatchSize > 0 && rowCount >= maxBatchSize
+ || (maxBatchSizeBytes > 0 && mutationSize >= maxBatchSizeBytes);
}
}