You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ya...@apache.org on 2020/12/11 21:28:47 UTC

[phoenix] branch master updated: PHOENIX-6251 : Remove flakes from ConcurrentMutationsExtendedIT

This is an automated email from the ASF dual-hosted git repository.

yanxinyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f25f24  PHOENIX-6251 : Remove flakes from ConcurrentMutationsExtendedIT
5f25f24 is described below

commit 5f25f24eef9c820d7052c2c2188360993cc7fa5a
Author: Viraj Jasani <vj...@apache.org>
AuthorDate: Fri Dec 11 13:08:24 2020 +0530

    PHOENIX-6251 : Remove flakes from ConcurrentMutationsExtendedIT
    
    Signed-off-by: Xinyi Yan <ya...@apache.org>
---
 .../end2end/ConcurrentMutationsExtendedIT.java     |  63 +---------
 .../ConcurrentUpsertsWithoutIndexedColsIT.java     | 134 +++++++++++++++++++++
 2 files changed, 136 insertions(+), 61 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
index 67dd3ed..ddcccf0 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
@@ -82,7 +82,8 @@ public class ConcurrentMutationsExtendedIT extends ParallelStatsDisabledIT {
         setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
     }
 
-    private long verifyIndexTable(String tableName, String indexName, Connection conn) throws Exception {
+    static long verifyIndexTable(String tableName, String indexName,
+            Connection conn) throws Exception {
         // This checks the state of every raw index row without rebuilding any row
         IndexTool indexTool = IndexToolIT.runIndexTool(true, false, "", tableName,
                 indexName, null, 0, IndexTool.IndexVerifyType.ONLY);
@@ -326,66 +327,6 @@ public class ConcurrentMutationsExtendedIT extends ParallelStatsDisabledIT {
     }
 
     @Test
-    public void testConcurrentUpsertsWithNoIndexedColumns() throws Exception {
-        int nThreads = 4;
-        final int batchSize = 100;
-        final int nRows = 997;
-        final String tableName = generateUniqueName();
-        final String indexName = generateUniqueName();
-        Connection conn = DriverManager.getConnection(getUrl());
-        conn.createStatement().execute("CREATE TABLE " + tableName
-                + "(k1 INTEGER NOT NULL, k2 INTEGER NOT NULL, a.v1 INTEGER, b.v2 INTEGER, c.v3 INTEGER, d.v4 INTEGER," +
-                "CONSTRAINT pk PRIMARY KEY (k1,k2))  COLUMN_ENCODED_BYTES = 0, VERSIONS=1");
-        conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + tableName + "(v1) INCLUDE(v2, v3)");
-        final CountDownLatch doneSignal = new CountDownLatch(nThreads);
-        Runnable[] runnables = new Runnable[nThreads];
-        for (int i = 0; i < nThreads; i++) {
-            runnables[i] = new Runnable() {
-
-                @Override public void run() {
-                    try {
-                        Connection conn = DriverManager.getConnection(getUrl());
-                        for (int i = 0; i < 1000; i++) {
-                            if (RAND.nextInt() % 1000 < 10) {
-                                // Do not include the indexed column in upserts
-                                conn.createStatement().execute(
-                                        "UPSERT INTO " + tableName + " (k1, k2, b.v2, c.v3, d.v4) VALUES ("
-                                                + (RAND.nextInt() % nRows) + ", 0, "
-                                                + (RAND.nextBoolean() ? null : RAND.nextInt()) + ", "
-                                                + (RAND.nextBoolean() ? null : RAND.nextInt()) + ", "
-                                                + (RAND.nextBoolean() ? null : RAND.nextInt()) + ")");
-                            } else {
-                                conn.createStatement().execute(
-                                        "UPSERT INTO " + tableName + " VALUES (" + (i % nRows) + ", 0, "
-                                                + (RAND.nextBoolean() ? null : RAND.nextInt()) + ", "
-                                                + (RAND.nextBoolean() ? null : RAND.nextInt()) + ", "
-                                                + (RAND.nextBoolean() ? null : RAND.nextInt()) + ", "
-                                                + (RAND.nextBoolean() ? null : RAND.nextInt()) + ")");
-                            }
-                            if ((i % batchSize) == 0) {
-                                conn.commit();
-                            }
-                        }
-                        conn.commit();
-                    } catch (SQLException e) {
-                        throw new RuntimeException(e);
-                    } finally {
-                        doneSignal.countDown();
-                    }
-                }
-
-            };
-        }
-        for (int i = 0; i < nThreads; i++) {
-            Thread t = new Thread(runnables[i]);
-            t.start();
-        }
-
-        assertTrue("Ran out of time", doneSignal.await(240, TimeUnit.SECONDS));
-        verifyIndexTable(tableName, indexName, conn);
-    }
-
-    @Test
     public void testRowLockDuringPreBatchMutateWhenIndexed() throws Exception {
         final String tableName = LOCK_TEST_TABLE_PREFIX + generateUniqueName();
         final String indexName = generateUniqueName();
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentUpsertsWithoutIndexedColsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentUpsertsWithoutIndexedColsIT.java
new file mode 100644
index 0000000..bec7e28
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentUpsertsWithoutIndexedColsIT.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.end2end;
+
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.phoenix.util.RunUntilFailure;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.phoenix.end2end.ConcurrentMutationsExtendedIT
+    .verifyIndexTable;
+import static org.junit.Assert.assertTrue;
+
+
+@RunWith(RunUntilFailure.class)
+@Category(NeedsOwnMiniClusterTest.class)
+public class ConcurrentUpsertsWithoutIndexedColsIT
+        extends ParallelStatsDisabledIT {
+
+    private static final Random RANDOM = new Random(5);
+    private static final Logger LOGGER =
+        LoggerFactory.getLogger(ConcurrentUpsertsWithoutIndexedColsIT.class);
+
+    @Test
+    public void testConcurrentUpsertsWithoutIndexedColumns() throws Exception {
+        int nThreads = 4;
+        final int batchSize = 100;
+        final int nRows = 997;
+        final String tableName = generateUniqueName();
+        final String indexName = generateUniqueName();
+        Connection conn = DriverManager.getConnection(getUrl());
+        conn.createStatement().execute("CREATE TABLE " + tableName
+            + "(k1 INTEGER NOT NULL, k2 INTEGER NOT NULL, a.v1 INTEGER, "
+            + "b.v2 INTEGER, c.v3 INTEGER, d.v4 INTEGER,"
+            + "CONSTRAINT pk PRIMARY KEY (k1,k2))  COLUMN_ENCODED_BYTES = 0, VERSIONS=1");
+        TestUtil.addCoprocessor(conn, tableName,
+            ConcurrentMutationsExtendedIT.DelayingRegionObserver.class);
+        conn.createStatement().execute("CREATE INDEX " + indexName + " ON "
+            + tableName + "(v1) INCLUDE(v2, v3)");
+        final CountDownLatch doneSignal = new CountDownLatch(nThreads);
+        ExecutorService executorService = Executors.newFixedThreadPool(nThreads,
+            new ThreadFactoryBuilder().setDaemon(true)
+                .setNameFormat("test-concurrent-upsert-%d").build());
+        for (int i = 0; i < nThreads; i++) {
+            TestRunnable testRunnable = new TestRunnable(tableName, nRows,
+                batchSize, doneSignal);
+            executorService.submit(testRunnable);
+        }
+
+        assertTrue("Ran out of time", doneSignal.await(1300, TimeUnit.SECONDS));
+        executorService.shutdown();
+        executorService.awaitTermination(5, TimeUnit.SECONDS);
+        verifyIndexTable(tableName, indexName, conn);
+    }
+
+    private static class TestRunnable implements Runnable {
+        private final String tableName;
+        private final int nRows;
+        private final int batchSize;
+        private final CountDownLatch doneSignal;
+
+        public TestRunnable(String tableName, int nRows, int batchSize,
+                CountDownLatch doneSignal) {
+            this.tableName = tableName;
+            this.nRows = nRows;
+            this.batchSize = batchSize;
+            this.doneSignal = doneSignal;
+        }
+
+        @Override
+        public void run() {
+            try {
+                Connection conn = DriverManager.getConnection(getUrl());
+                for (int i = 0; i < 1000; i++) {
+                    if (RANDOM.nextInt() % 1000 < 10) {
+                        // Do not include the indexed column in upserts
+                        conn.createStatement().execute(
+                            "UPSERT INTO " + tableName + " (k1, k2, b.v2, c.v3, d.v4) VALUES ("
+                                + (RANDOM.nextInt() % nRows) + ", 0, "
+                                + (RANDOM.nextBoolean() ? null : RANDOM.nextInt()) + ", "
+                                + (RANDOM.nextBoolean() ? null : RANDOM.nextInt()) + ", "
+                                + (RANDOM.nextBoolean() ? null : RANDOM.nextInt()) + ")");
+                    } else {
+                        conn.createStatement().execute(
+                            "UPSERT INTO " + tableName + " VALUES (" + (i % nRows) + ", 0, "
+                                + (RANDOM.nextBoolean() ? null : RANDOM.nextInt()) + ", "
+                                + (RANDOM.nextBoolean() ? null : RANDOM.nextInt()) + ", "
+                                + (RANDOM.nextBoolean() ? null : RANDOM.nextInt()) + ", "
+                                + (RANDOM.nextBoolean() ? null : RANDOM.nextInt()) + ")");
+                    }
+                    if ((i % batchSize) == 0) {
+                        conn.commit();
+                        LOGGER.info("Committed batch no: {}", i);
+                    }
+                }
+                conn.commit();
+            } catch (SQLException e) {
+                LOGGER.error("Error during concurrent upserts. ", e);
+                throw new RuntimeException(e);
+            } finally {
+                doneSignal.countDown();
+            }
+        }
+    }
+}