You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by gj...@apache.org on 2020/12/31 15:21:39 UTC
[phoenix] branch 4.x updated: PHOENIX-6284 : De-flake
UpgradeIT#testConcurrentUpgradeThrowsUpgradeInProgressException
This is an automated email from the ASF dual-hosted git repository.
gjacoby pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x by this push:
new 24c78e9 PHOENIX-6284 : De-flake UpgradeIT#testConcurrentUpgradeThrowsUpgradeInProgressException
24c78e9 is described below
commit 24c78e95f19fd778c724ae91ce117bbc5dd8121b
Author: Viraj Jasani <vj...@apache.org>
AuthorDate: Wed Dec 30 19:05:56 2020 +0530
PHOENIX-6284 : De-flake UpgradeIT#testConcurrentUpgradeThrowsUpgradeInProgressException
---
.../java/org/apache/phoenix/end2end/UpgradeIT.java | 76 ++++++++++++----------
1 file changed, 41 insertions(+), 35 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java
index 81b634d..dd386bc 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java
@@ -38,7 +38,6 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
@@ -50,44 +49,39 @@ import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.FutureTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
-import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.exception.UpgradeInProgressException;
import org.apache.phoenix.exception.UpgradeRequiredException;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
-import org.apache.phoenix.parse.PFunction;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.ConnectionQueryServicesImpl;
import org.apache.phoenix.query.DelegateConnectionQueryServices;
-import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.schema.PMetaData;
import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PNameFactory;
-import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTable.LinkType;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.SequenceAllocation;
import org.apache.phoenix.schema.SequenceKey;
-import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.PhoenixRuntime;
@@ -212,31 +206,44 @@ public class UpgradeIT extends ParallelStatsDisabledIT {
}
@Test
- public void testConcurrentUpgradeThrowsUprgadeInProgressException() throws Exception {
+ public void testConcurrentUpgradeThrowsUpgradeInProgressException() throws Exception {
final AtomicBoolean mutexStatus1 = new AtomicBoolean(false);
final AtomicBoolean mutexStatus2 = new AtomicBoolean(false);
+ final AtomicBoolean mutexStatus3 = new AtomicBoolean(true);
final CountDownLatch latch = new CountDownLatch(2);
final AtomicInteger numExceptions = new AtomicInteger(0);
- ConnectionQueryServices services = null;
- final byte[] mutexKey = Bytes.toBytes(generateUniqueName());
try (Connection conn = getConnection(false, null)) {
- services = conn.unwrap(PhoenixConnection.class).getQueryServices();
- FutureTask<Void> task1 = new FutureTask<>(new AcquireMutexRunnable(mutexStatus1, services, latch, numExceptions, mutexKey));
- FutureTask<Void> task2 = new FutureTask<>(new AcquireMutexRunnable(mutexStatus2, services, latch, numExceptions, mutexKey));
- Thread t1 = new Thread(task1);
- t1.setDaemon(true);
- Thread t2 = new Thread(task2);
- t2.setDaemon(true);
- t1.start();
- t2.start();
+ ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class)
+ .getQueryServices();
+ Callable<Void> task1 = new AcquireMutexRunnable(
+ mutexStatus1, services, latch, numExceptions);
+ Callable<Void> task2 = new AcquireMutexRunnable(
+ mutexStatus2, services, latch, numExceptions);
+ Callable<Void> task3 = new AcquireMutexRunnable(
+ mutexStatus3, services, latch, numExceptions);
+ ExecutorService executorService = Executors.newFixedThreadPool(2,
+ new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("mutex-acquire-%d").build());
+ Future<?> futureTask1 = executorService.submit(task1);
+ Future<?> futureTask2 = executorService.submit(task2);
latch.await();
// make sure tasks didn't fail by calling get()
- task1.get();
- task2.get();
- assertTrue("One of the threads should have acquired the mutex", mutexStatus1.get() || mutexStatus2.get());
- assertNotEquals("One and only one thread should have acquired the mutex ", mutexStatus1.get(),
- mutexStatus2.get());
- assertEquals("One and only one thread should have caught UpgradeRequiredException ", 1, numExceptions.get());
+ futureTask1.get();
+ futureTask2.get();
+ executorService.submit(task3).get();
+ assertTrue("One of the threads should have acquired the mutex",
+ mutexStatus1.get() || mutexStatus2.get() || mutexStatus3.get());
+ assertNotEquals("One and only one thread should have acquired the mutex ",
+ mutexStatus1.get(), mutexStatus2.get());
+ assertFalse("mutexStatus3 should never be true ",
+ mutexStatus3.get());
+ assertEquals("One and only one thread should have caught UpgradeRequiredException ",
+ 2, numExceptions.get());
+ // release mutex only after all threads are done executing
+ // so as to avoid case where one thread releases lock and only
+ // after that another thread acquires lock (due to slow thread
+ // execution)
+ ((ConnectionQueryServicesImpl) services).releaseUpgradeMutex();
}
}
@@ -246,14 +253,15 @@ public class UpgradeIT extends ParallelStatsDisabledIT {
private final ConnectionQueryServices services;
private final CountDownLatch latch;
private final AtomicInteger numExceptions;
- private final byte[] mutexRowKey;
- public AcquireMutexRunnable(AtomicBoolean acquireStatus, ConnectionQueryServices services, CountDownLatch latch, AtomicInteger numExceptions, byte[] mutexKey) {
+
+ private AcquireMutexRunnable(AtomicBoolean acquireStatus,
+ ConnectionQueryServices services, CountDownLatch latch, AtomicInteger numExceptions) {
this.acquireStatus = acquireStatus;
this.services = services;
this.latch = latch;
this.numExceptions = numExceptions;
- this.mutexRowKey = mutexKey;
}
+
@Override
public Void call() throws Exception {
try {
@@ -261,12 +269,10 @@ public class UpgradeIT extends ParallelStatsDisabledIT {
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0);
acquireStatus.set(true);
} catch (UpgradeInProgressException e) {
+ acquireStatus.set(false);
numExceptions.incrementAndGet();
} finally {
latch.countDown();
- if (acquireStatus.get()) {
- ((ConnectionQueryServicesImpl)services).releaseUpgradeMutex();
- }
}
return null;
}