You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ya...@apache.org on 2020/12/11 21:28:43 UTC
[phoenix] branch 4.x updated: PHOENIX-6251 : Remove flakes from
ConcurrentMutationsExtendedIT
This is an automated email from the ASF dual-hosted git repository.
yanxinyi pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x by this push:
new 0641bef PHOENIX-6251 : Remove flakes from ConcurrentMutationsExtendedIT
0641bef is described below
commit 0641bef2e650f6d25456151eb48e40f51af1f047
Author: Viraj Jasani <vj...@apache.org>
AuthorDate: Fri Dec 11 12:58:01 2020 +0530
PHOENIX-6251 : Remove flakes from ConcurrentMutationsExtendedIT
Signed-off-by: Xinyi Yan <ya...@apache.org>
---
.../end2end/ConcurrentMutationsExtendedIT.java | 64 +---------
.../ConcurrentUpsertsWithoutIndexedColsIT.java | 134 +++++++++++++++++++++
2 files changed, 136 insertions(+), 62 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
index c2e0dc4..52ba058 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
@@ -68,7 +68,8 @@ public class ConcurrentMutationsExtendedIT extends ParallelStatsDisabledIT {
private final Object lock = new Object();
- private long verifyIndexTable(String tableName, String indexName, Connection conn) throws Exception {
+ static long verifyIndexTable(String tableName, String indexName,
+ Connection conn) throws Exception {
// This checks the state of every raw index row without rebuilding any row
IndexTool indexTool = IndexToolIT.runIndexTool(true, false, "", tableName,
indexName, null, 0, IndexTool.IndexVerifyType.ONLY);
@@ -319,67 +320,6 @@ public class ConcurrentMutationsExtendedIT extends ParallelStatsDisabledIT {
}
@Test
- public void testConcurrentUpsertsWithNoIndexedColumns() throws Exception {
- int nThreads = 4;
- final int batchSize = 100;
- final int nRows = 997;
- final String tableName = generateUniqueName();
- final String indexName = generateUniqueName();
- Connection conn = DriverManager.getConnection(getUrl());
- conn.createStatement().execute("CREATE TABLE " + tableName
- + "(k1 INTEGER NOT NULL, k2 INTEGER NOT NULL, a.v1 INTEGER, b.v2 INTEGER, c.v3 INTEGER, d.v4 INTEGER," +
- "CONSTRAINT pk PRIMARY KEY (k1,k2)) COLUMN_ENCODED_BYTES = 0, VERSIONS=1");
- TestUtil.addCoprocessor(conn, tableName, DelayingRegionObserver.class);
- conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + tableName + "(v1) INCLUDE(v2, v3)");
- final CountDownLatch doneSignal = new CountDownLatch(nThreads);
- Runnable[] runnables = new Runnable[nThreads];
- for (int i = 0; i < nThreads; i++) {
- runnables[i] = new Runnable() {
-
- @Override public void run() {
- try {
- Connection conn = DriverManager.getConnection(getUrl());
- for (int i = 0; i < 1000; i++) {
- if (RAND.nextInt() % 1000 < 10) {
- // Do not include the indexed column in upserts
- conn.createStatement().execute(
- "UPSERT INTO " + tableName + " (k1, k2, b.v2, c.v3, d.v4) VALUES ("
- + (RAND.nextInt() % nRows) + ", 0, "
- + (RAND.nextBoolean() ? null : RAND.nextInt()) + ", "
- + (RAND.nextBoolean() ? null : RAND.nextInt()) + ", "
- + (RAND.nextBoolean() ? null : RAND.nextInt()) + ")");
- } else {
- conn.createStatement().execute(
- "UPSERT INTO " + tableName + " VALUES (" + (i % nRows) + ", 0, "
- + (RAND.nextBoolean() ? null : RAND.nextInt()) + ", "
- + (RAND.nextBoolean() ? null : RAND.nextInt()) + ", "
- + (RAND.nextBoolean() ? null : RAND.nextInt()) + ", "
- + (RAND.nextBoolean() ? null : RAND.nextInt()) + ")");
- }
- if ((i % batchSize) == 0) {
- conn.commit();
- }
- }
- conn.commit();
- } catch (SQLException e) {
- throw new RuntimeException(e);
- } finally {
- doneSignal.countDown();
- }
- }
-
- };
- }
- for (int i = 0; i < nThreads; i++) {
- Thread t = new Thread(runnables[i]);
- t.start();
- }
-
- assertTrue("Ran out of time", doneSignal.await(120, TimeUnit.SECONDS));
- verifyIndexTable(tableName, indexName, conn);
- }
-
- @Test
public void testRowLockDuringPreBatchMutateWhenIndexed() throws Exception {
final String tableName = LOCK_TEST_TABLE_PREFIX + generateUniqueName();
final String indexName = generateUniqueName();
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentUpsertsWithoutIndexedColsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentUpsertsWithoutIndexedColsIT.java
new file mode 100644
index 0000000..43f7388
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentUpsertsWithoutIndexedColsIT.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.end2end;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.phoenix.util.RunUntilFailure;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.phoenix.end2end.ConcurrentMutationsExtendedIT
+ .verifyIndexTable;
+import static org.junit.Assert.assertTrue;
+
+
+@RunWith(RunUntilFailure.class)
+@Category(NeedsOwnMiniClusterTest.class)
+public class ConcurrentUpsertsWithoutIndexedColsIT
+ extends ParallelStatsDisabledIT {
+
+ private static final Random RANDOM = new Random(5);
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(ConcurrentUpsertsWithoutIndexedColsIT.class);
+
+ @Test
+ public void testConcurrentUpsertsWithoutIndexedColumns() throws Exception {
+ int nThreads = 4;
+ final int batchSize = 100;
+ final int nRows = 997;
+ final String tableName = generateUniqueName();
+ final String indexName = generateUniqueName();
+ Connection conn = DriverManager.getConnection(getUrl());
+ conn.createStatement().execute("CREATE TABLE " + tableName
+ + "(k1 INTEGER NOT NULL, k2 INTEGER NOT NULL, a.v1 INTEGER, "
+ + "b.v2 INTEGER, c.v3 INTEGER, d.v4 INTEGER,"
+ + "CONSTRAINT pk PRIMARY KEY (k1,k2)) COLUMN_ENCODED_BYTES = 0, VERSIONS=1");
+ TestUtil.addCoprocessor(conn, tableName,
+ ConcurrentMutationsExtendedIT.DelayingRegionObserver.class);
+ conn.createStatement().execute("CREATE INDEX " + indexName + " ON "
+ + tableName + "(v1) INCLUDE(v2, v3)");
+ final CountDownLatch doneSignal = new CountDownLatch(nThreads);
+ ExecutorService executorService = Executors.newFixedThreadPool(nThreads,
+ new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("test-concurrent-upsert-%d").build());
+ for (int i = 0; i < nThreads; i++) {
+ TestRunnable testRunnable = new TestRunnable(tableName, nRows,
+ batchSize, doneSignal);
+ executorService.submit(testRunnable);
+ }
+
+ assertTrue("Ran out of time", doneSignal.await(1300, TimeUnit.SECONDS));
+ executorService.shutdown();
+ executorService.awaitTermination(5, TimeUnit.SECONDS);
+ verifyIndexTable(tableName, indexName, conn);
+ }
+
+ private static class TestRunnable implements Runnable {
+ private final String tableName;
+ private final int nRows;
+ private final int batchSize;
+ private final CountDownLatch doneSignal;
+
+ public TestRunnable(String tableName, int nRows, int batchSize,
+ CountDownLatch doneSignal) {
+ this.tableName = tableName;
+ this.nRows = nRows;
+ this.batchSize = batchSize;
+ this.doneSignal = doneSignal;
+ }
+
+ @Override
+ public void run() {
+ try {
+ Connection conn = DriverManager.getConnection(getUrl());
+ for (int i = 0; i < 1000; i++) {
+ if (RANDOM.nextInt() % 1000 < 10) {
+ // Do not include the indexed column in upserts
+ conn.createStatement().execute(
+ "UPSERT INTO " + tableName + " (k1, k2, b.v2, c.v3, d.v4) VALUES ("
+ + (RANDOM.nextInt() % nRows) + ", 0, "
+ + (RANDOM.nextBoolean() ? null : RANDOM.nextInt()) + ", "
+ + (RANDOM.nextBoolean() ? null : RANDOM.nextInt()) + ", "
+ + (RANDOM.nextBoolean() ? null : RANDOM.nextInt()) + ")");
+ } else {
+ conn.createStatement().execute(
+ "UPSERT INTO " + tableName + " VALUES (" + (i % nRows) + ", 0, "
+ + (RANDOM.nextBoolean() ? null : RANDOM.nextInt()) + ", "
+ + (RANDOM.nextBoolean() ? null : RANDOM.nextInt()) + ", "
+ + (RANDOM.nextBoolean() ? null : RANDOM.nextInt()) + ", "
+ + (RANDOM.nextBoolean() ? null : RANDOM.nextInt()) + ")");
+ }
+ if ((i % batchSize) == 0) {
+ conn.commit();
+ LOGGER.info("Committed batch no: {}", i);
+ }
+ }
+ conn.commit();
+ } catch (SQLException e) {
+ LOGGER.error("Error during concurrent upserts. ", e);
+ throw new RuntimeException(e);
+ } finally {
+ doneSignal.countDown();
+ }
+ }
+ }
+}