You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by td...@apache.org on 2017/07/06 23:10:27 UTC

phoenix git commit: PHOENIX-3986 UngroupedAggregateRegionObserver.commitBatch() should set the index metadata as an attribute on every mutation

Repository: phoenix
Updated Branches:
  refs/heads/4.11-HBase-0.98 92b497449 -> b86177548


PHOENIX-3986 UngroupedAggregateRegionObserver.commitBatch() should set the index metadata as an attribute on every mutation


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b8617754
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b8617754
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b8617754

Branch: refs/heads/4.11-HBase-0.98
Commit: b86177548c62f31a580727da6f87d8a1f4fffd42
Parents: 92b4974
Author: Thomas <td...@salesforce.com>
Authored: Wed Jul 5 19:17:26 2017 -0700
Committer: Thomas <td...@salesforce.com>
Committed: Thu Jul 6 16:09:32 2017 -0700

----------------------------------------------------------------------
 .../UpsertSelectOverlappingBatchesIT.java       | 153 +++++++++++++++++++
 .../UngroupedAggregateRegionObserver.java       |  58 ++++---
 .../org/apache/phoenix/util/ServerUtil.java     |   4 +-
 3 files changed, 183 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/b8617754/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java b/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java
new file mode 100644
index 0000000..53346b9
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you maynot use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicablelaw or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class UpsertSelectOverlappingBatchesIT extends BaseUniqueNamesOwnClusterIT {
+    
+    @BeforeClass
+    public static void doSetup() throws Exception {
+        Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(3);
+        serverProps.put("hbase.coprocessor.region.classes", SlowBatchRegionObserver.class.getName());
+        serverProps.put("hbase.rowlock.wait.duration", "5000");
+        serverProps.put(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, "100");
+        Map<String,String> clientProps = Maps.newHashMapWithExpectedSize(1);
+        setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
+    }
+    
+    private class UpsertSelectRunner implements Callable<Boolean> {
+    	private final String dataTable;
+    	private final int minIndex;
+    	private final int maxIndex;
+    	private final int numLoop;
+    	
+    	public UpsertSelectRunner (String dataTable, int minIndex, int maxIndex, int numLoop) {
+    		this.dataTable = dataTable;
+    		this.minIndex = minIndex;
+    		this.maxIndex = maxIndex;
+    		this.numLoop = numLoop;
+    	}
+
+		@Override
+		public Boolean call() throws Exception {
+			Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+			try (Connection myConn = DriverManager.getConnection(getUrl(), props)) {
+				myConn.setAutoCommit(true);
+				String time = String.valueOf(System.currentTimeMillis());
+				String dml = "UPSERT INTO " + dataTable + " SELECT k, v1 || '" + time + "', v2 || '" + time
+						+ "' FROM " + dataTable + " WHERE k >= " + minIndex + " AND k < " + maxIndex;
+				myConn.setAutoCommit(true);
+				for (int j = 0; j < numLoop; ++j) {
+					myConn.createStatement().execute(dml);
+				}
+				return true;
+			}
+		}
+    	
+    }
+    
+	@Test
+	public void testUpsertSelectSameBatchConcurrently() throws Exception {
+		final String dataTable = generateUniqueName();
+		final String index = "IDX_" + dataTable;
+		// create the table and ensure its empty
+		Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+		Connection conn = driver.connect(url, props);
+		conn.createStatement()
+				.execute("CREATE TABLE " + dataTable + " (k INTEGER NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
+		// create the index and ensure its empty as well
+		conn.createStatement().execute("CREATE INDEX " + index + " ON " + dataTable + " (v1)");
+
+		conn = DriverManager.getConnection(getUrl(), props);
+		PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + dataTable + " VALUES(?,?,?)");
+		conn.setAutoCommit(false);
+		for (int i = 0; i < 100; i++) {
+			stmt.setInt(1, i);
+			stmt.setString(2, "v1" + i);
+			stmt.setString(3, "v2" + i);
+			stmt.execute();
+		}
+		conn.commit();
+
+		int numUpsertSelectRunners = 5;
+		ExecutorService exec = Executors.newFixedThreadPool(numUpsertSelectRunners);
+		CompletionService<Boolean> completionService = new ExecutorCompletionService<Boolean>(exec);
+		List<Future<Boolean>> futures = Lists.newArrayListWithExpectedSize(numUpsertSelectRunners);
+		// run one UPSERT SELECT for 100 rows (that locks the rows for a long time)
+		futures.add(completionService.submit(new UpsertSelectRunner(dataTable, 0, 105, 1)));
+		// run four UPSERT SELECTS for 5 rows (that overlap with slow running UPSERT SELECT)
+		for (int i = 0; i < 100; i += 25) {
+			futures.add(completionService.submit(new UpsertSelectRunner(dataTable, i, i+25, 5)));
+		}
+		int received = 0;
+		while (received < futures.size()) {
+			Future<Boolean> resultFuture = completionService.take(); 
+			Boolean result = resultFuture.get();
+			received++;
+			assertTrue(result);
+		}
+		exec.shutdownNow();
+		conn.close();
+	}
+    
+    public static class SlowBatchRegionObserver extends SimpleRegionObserver {
+        @Override
+        public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
+        	// model a slow batch that takes a long time
+            if (miniBatchOp.size()==100) {
+            	try {
+					Thread.sleep(6000);
+				} catch (InterruptedException e) {
+					e.printStackTrace();
+				}
+            }
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b8617754/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 9f3ac69..19d0e66 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -185,17 +185,19 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
 
     private void commitBatch(HRegion region, List<Mutation> mutations, byte[] indexUUID, long blockingMemstoreSize,
             byte[] indexMaintainersPtr, byte[] txState, boolean useIndexProto) throws IOException {
-        if (indexMaintainersPtr != null) {
-            mutations.get(0).setAttribute(useIndexProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMaintainersPtr);
-        }
-
-        if (txState != null) {
-            mutations.get(0).setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
-        }
-      if (indexUUID != null) {
-          for (Mutation m : mutations) {
-              m.setAttribute(PhoenixIndexCodec.INDEX_UUID, indexUUID);
-          }
+      if (mutations.isEmpty()) {
+    	  return;
+      }
+      for (Mutation m : mutations) {
+         if (indexMaintainersPtr != null) {
+             m.setAttribute(useIndexProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMaintainersPtr);
+         }
+         if (indexUUID != null) {
+        	 m.setAttribute(PhoenixIndexCodec.INDEX_UUID, indexUUID);
+         }
+         if (txState != null) {
+             m.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
+         }
       }
       Mutation[] mutationArray = new Mutation[mutations.size()];
       // When memstore size reaches blockingMemstoreSize we are waiting 3 seconds for the
@@ -216,17 +218,18 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
     
     private void commitBatchWithHTable(HTable table, HRegion region, List<Mutation> mutations, byte[] indexUUID,
             long blockingMemstoreSize, byte[] indexMaintainersPtr, byte[] txState, boolean useIndexProto) throws IOException {
-
-        if (indexUUID != null) {
-            // Need to add indexMaintainers for each mutation as table.batch can be distributed across servers
-            for (Mutation m : mutations) {
-                if (indexMaintainersPtr != null) {
-                    m.setAttribute(useIndexProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMaintainersPtr);
-                }
-                if (txState != null) {
-                    m.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
-                }
-                m.setAttribute(PhoenixIndexCodec.INDEX_UUID, indexUUID);
+    	if (mutations.isEmpty()) {
+      	  return;
+    	}
+        for (Mutation m : mutations) {
+            if (indexMaintainersPtr != null) {
+                m.setAttribute(useIndexProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMaintainersPtr);
+            }
+            if (txState != null) {
+                m.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
+            }
+            if (indexUUID != null) {
+            	m.setAttribute(PhoenixIndexCodec.INDEX_UUID, indexUUID);
             }
         }
         // When memstore size reaches blockingMemstoreSize we are waiting 3 seconds for the
@@ -702,14 +705,14 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                                 }
                             }
                         }
-                        if (ServerUtil.readyToCommit(rowCount, mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
+                        if (ServerUtil.readyToCommit(mutations.size(), mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
                             commit(region, mutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr, txState,
                                     areMutationInSameRegion, targetHTable, useIndexProto);
                             mutations.clear();
                         }
                         // Commit in batches based on UPSERT_BATCH_SIZE_BYTES_ATTRIB in config
 
-                        if (ServerUtil.readyToCommit(rowCount, indexMutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
+                        if (ServerUtil.readyToCommit(indexMutations.size(), indexMutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
                             commitBatch(region, indexMutations, null, blockingMemStoreSize, null, txState,
                                     useIndexProto);
                             indexMutations.clear();
@@ -793,11 +796,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         }
     }
 
-    private boolean readyToCommit(int rowCount, long mutationSize, int maxBatchSize, long maxBatchSizeBytes) {
-        return maxBatchSize > 0 && rowCount > maxBatchSize
-                || (maxBatchSizeBytes > 0 && mutationSize > maxBatchSizeBytes);
-    }
-
     @Override
     public InternalScanner preCompact(final ObserverContext<RegionCoprocessorEnvironment> c,
             final Store store, final InternalScanner scanner, final ScanType scanType)
@@ -893,7 +891,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                                 del.addDeleteMarker(cell);
                             }
                         }
-                        if (readyToCommit(rowCount, mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
+                        if (ServerUtil.readyToCommit(mutations.size(), mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
                             region.batchMutate(mutations.toArray(new Mutation[mutations.size()]), HConstants.NO_NONCE,
                                     HConstants.NO_NONCE);
                             uuidValue = ServerCacheClient.generateId();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b8617754/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
index 86d89bf..fe0937b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
@@ -219,8 +219,8 @@ public class ServerUtil {
     }
     
     public static boolean readyToCommit(int rowCount, long mutationSize, int maxBatchSize, long maxBatchSizeBytes) {
-        return maxBatchSize > 0 && rowCount > maxBatchSize
-                || (maxBatchSizeBytes > 0 && mutationSize > maxBatchSizeBytes);
+        return maxBatchSize > 0 && rowCount >= maxBatchSize
+                || (maxBatchSizeBytes > 0 && mutationSize >= maxBatchSizeBytes);
     }
     
 }