You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by gj...@apache.org on 2020/12/31 15:21:39 UTC

[phoenix] branch 4.x updated: PHOENIX-6284 : De-flake UpgradeIT#testConcurrentUpgradeThrowsUpgradeInProgressException

This is an automated email from the ASF dual-hosted git repository.

gjacoby pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x by this push:
     new 24c78e9  PHOENIX-6284 : De-flake UpgradeIT#testConcurrentUpgradeThrowsUpgradeInProgressException
24c78e9 is described below

commit 24c78e95f19fd778c724ae91ce117bbc5dd8121b
Author: Viraj Jasani <vj...@apache.org>
AuthorDate: Wed Dec 30 19:05:56 2020 +0530

    PHOENIX-6284 : De-flake UpgradeIT#testConcurrentUpgradeThrowsUpgradeInProgressException
---
 .../java/org/apache/phoenix/end2end/UpgradeIT.java | 76 ++++++++++++----------
 1 file changed, 41 insertions(+), 35 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java
index 81b634d..dd386bc 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java
@@ -38,7 +38,6 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
@@ -50,44 +49,39 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.FutureTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
-import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.exception.UpgradeInProgressException;
 import org.apache.phoenix.exception.UpgradeRequiredException;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
-import org.apache.phoenix.parse.PFunction;
 import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.ConnectionQueryServicesImpl;
 import org.apache.phoenix.query.DelegateConnectionQueryServices;
-import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.schema.PMetaData;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PNameFactory;
-import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.LinkType;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.SequenceAllocation;
 import org.apache.phoenix.schema.SequenceKey;
-import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
@@ -212,31 +206,44 @@ public class UpgradeIT extends ParallelStatsDisabledIT {
     }
     
     @Test
-    public void testConcurrentUpgradeThrowsUprgadeInProgressException() throws Exception {
+    public void testConcurrentUpgradeThrowsUpgradeInProgressException() throws Exception {
         final AtomicBoolean mutexStatus1 = new AtomicBoolean(false);
         final AtomicBoolean mutexStatus2 = new AtomicBoolean(false);
+        final AtomicBoolean mutexStatus3 = new AtomicBoolean(true);
         final CountDownLatch latch = new CountDownLatch(2);
         final AtomicInteger numExceptions = new AtomicInteger(0);
-        ConnectionQueryServices services = null;
-        final byte[] mutexKey = Bytes.toBytes(generateUniqueName());
         try (Connection conn = getConnection(false, null)) {
-            services = conn.unwrap(PhoenixConnection.class).getQueryServices();
-            FutureTask<Void> task1 = new FutureTask<>(new AcquireMutexRunnable(mutexStatus1, services, latch, numExceptions, mutexKey));
-            FutureTask<Void> task2 = new FutureTask<>(new AcquireMutexRunnable(mutexStatus2, services, latch, numExceptions, mutexKey));
-            Thread t1 = new Thread(task1);
-            t1.setDaemon(true);
-            Thread t2 = new Thread(task2);
-            t2.setDaemon(true);
-            t1.start();
-            t2.start();
+            ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class)
+                .getQueryServices();
+            Callable<Void> task1 = new AcquireMutexRunnable(
+                mutexStatus1, services, latch, numExceptions);
+            Callable<Void> task2 = new AcquireMutexRunnable(
+                mutexStatus2, services, latch, numExceptions);
+            Callable<Void> task3 = new AcquireMutexRunnable(
+                mutexStatus3, services, latch, numExceptions);
+            ExecutorService executorService = Executors.newFixedThreadPool(2,
+                new ThreadFactoryBuilder().setDaemon(true)
+                    .setNameFormat("mutex-acquire-%d").build());
+            Future<?> futureTask1 = executorService.submit(task1);
+            Future<?> futureTask2 = executorService.submit(task2);
             latch.await();
             // make sure tasks didn't fail by calling get()
-            task1.get();
-            task2.get();
-            assertTrue("One of the threads should have acquired the mutex", mutexStatus1.get() || mutexStatus2.get());
-            assertNotEquals("One and only one thread should have acquired the mutex ", mutexStatus1.get(),
-                    mutexStatus2.get());
-            assertEquals("One and only one thread should have caught UpgradeRequiredException ", 1, numExceptions.get());
+            futureTask1.get();
+            futureTask2.get();
+            executorService.submit(task3).get();
+            assertTrue("One of the threads should have acquired the mutex",
+                mutexStatus1.get() || mutexStatus2.get() || mutexStatus3.get());
+            assertNotEquals("One and only one thread should have acquired the mutex ",
+                mutexStatus1.get(), mutexStatus2.get());
+            assertFalse("mutexStatus3 should never be true ",
+                mutexStatus3.get());
+            assertEquals("One and only one thread should have caught UpgradeRequiredException ",
+                2, numExceptions.get());
+            // release mutex only after all threads are done executing
+            // so as to avoid case where one thread releases lock and only
+            // after that another thread acquires lock (due to slow thread
+            // execution)
+            ((ConnectionQueryServicesImpl) services).releaseUpgradeMutex();
         }
     }
     
@@ -246,14 +253,15 @@ public class UpgradeIT extends ParallelStatsDisabledIT {
         private final ConnectionQueryServices services;
         private final CountDownLatch latch;
         private final AtomicInteger numExceptions;
-        private final byte[] mutexRowKey;
-        public AcquireMutexRunnable(AtomicBoolean acquireStatus, ConnectionQueryServices services, CountDownLatch latch, AtomicInteger numExceptions, byte[] mutexKey) {
+
+        private AcquireMutexRunnable(AtomicBoolean acquireStatus,
+                ConnectionQueryServices services, CountDownLatch latch, AtomicInteger numExceptions) {
             this.acquireStatus = acquireStatus;
             this.services = services;
             this.latch = latch;
             this.numExceptions = numExceptions;
-            this.mutexRowKey = mutexKey;
         }
+
         @Override
         public Void call() throws Exception {
             try {
@@ -261,12 +269,10 @@ public class UpgradeIT extends ParallelStatsDisabledIT {
                         MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0);
                 acquireStatus.set(true);
             } catch (UpgradeInProgressException e) {
+                acquireStatus.set(false);
                 numExceptions.incrementAndGet();
             } finally {
                 latch.countDown();
-                if (acquireStatus.get()) {
-                    ((ConnectionQueryServicesImpl)services).releaseUpgradeMutex();
-                }
             }
             return null;
         }