You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by an...@apache.org on 2017/09/11 04:09:27 UTC
incubator-tephra git commit: (TEPHRA-253) Fix flaky
TransactionProcessorTest
Repository: incubator-tephra
Updated Branches:
refs/heads/master ae6ce2b5e -> 7cfe06125
(TEPHRA-253) Fix flaky TransactionProcessorTest
This closes #54 from GitHub.
Signed-off-by: anew <an...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/7cfe0612
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/7cfe0612
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/7cfe0612
Branch: refs/heads/master
Commit: 7cfe0612538ead57ec66e580ed1d2d9e950c5c73
Parents: ae6ce2b
Author: anew <an...@apache.org>
Authored: Fri Sep 8 23:43:55 2017 -0700
Committer: anew <an...@apache.org>
Committed: Sun Sep 10 21:08:51 2017 -0700
----------------------------------------------------------------------
.../coprocessor/TransactionProcessorTest.java | 22 ++++++++++++++---
.../coprocessor/TransactionProcessorTest.java | 23 ++++++++++++++---
.../coprocessor/TransactionProcessorTest.java | 23 ++++++++++++++---
.../coprocessor/TransactionProcessorTest.java | 23 ++++++++++++++---
.../coprocessor/TransactionProcessorTest.java | 26 ++++++++++++++++----
.../coprocessor/TransactionProcessorTest.java | 24 +++++++++++++++---
6 files changed, 121 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/7cfe0612/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
index 1879116..3c7d1e2 100644
--- a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
+++ b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
@@ -74,6 +74,7 @@ import org.apache.tephra.snapshot.SnapshotCodecProvider;
import org.apache.tephra.util.TxUtils;
import org.apache.zookeeper.KeeperException;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
@@ -175,7 +176,7 @@ public class TransactionProcessorTest {
try {
region.initialize();
TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
- LOG.info("Coprocessor is using transaction state: " + cache.getLatestState());
+ LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache));
for (int i = 1; i <= 8; i++) {
for (int k = 1; k <= i; k++) {
@@ -190,7 +191,7 @@ public class TransactionProcessorTest {
// force a flush to clear the data
// during flush, the coprocessor should drop all KeyValues with timestamps in the invalid set
LOG.info("Flushing region " + region.getRegionNameAsString());
- region.flushcache();
+ region.flushcache(); // in 0.96, there is no indication of success
// now a normal scan should only return the valid rows - testing that cleanup works on flush
Scan scan = new Scan();
@@ -231,7 +232,7 @@ public class TransactionProcessorTest {
try {
region.initialize();
TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
- LOG.info("Coprocessor is using transaction state: " + cache.getLatestState());
+ LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache));
byte[] row = Bytes.toBytes(1);
for (int i = 4; i < V.length; i++) {
@@ -620,6 +621,21 @@ public class TransactionProcessorTest {
cache.stopAndWait();
}
+ private TransactionVisibilityState waitForTransactionState(TransactionStateCache cache) throws InterruptedException {
+ long timeout = 5000; // ms
+ do {
+ TransactionVisibilityState state = cache.getLatestState();
+ if (state != null) {
+ return state;
+ }
+ TimeUnit.MILLISECONDS.sleep(100);
+ timeout -= 100;
+ } while (timeout > 0L);
+ LOG.error("Timed out waiting foe transaction state cache");
+ Assert.fail("Timed out waiting foe transaction state cache");
+ return null;
+ }
+
private static class MockRegionServerServices implements RegionServerServices {
private final Configuration hConf;
private final ZooKeeperWatcher zookeeper;
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/7cfe0612/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
index abe375d..b8e051b 100644
--- a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
+++ b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
@@ -80,6 +80,7 @@ import org.apache.tephra.snapshot.SnapshotCodecProvider;
import org.apache.tephra.util.TxUtils;
import org.apache.zookeeper.KeeperException;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
@@ -180,7 +181,7 @@ public class TransactionProcessorTest {
try {
region.initialize();
TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
- LOG.info("Coprocessor is using transaction state: " + cache.getLatestState());
+ LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache));
for (int i = 1; i <= 8; i++) {
for (int k = 1; k <= i; k++) {
@@ -195,7 +196,8 @@ public class TransactionProcessorTest {
// force a flush to clear the data
// during flush, the coprocessor should drop all KeyValues with timestamps in the invalid set
LOG.info("Flushing region " + region.getRegionNameAsString());
- region.flushcache();
+ HRegion.FlushResult flushResult = region.flushcache();
+ Assert.assertTrue("Unexpected flush result: " + flushResult, flushResult.isFlushSucceeded());
// now a normal scan should only return the valid rows
// do not use a filter here to test that cleanup works on flush
@@ -237,7 +239,7 @@ public class TransactionProcessorTest {
try {
region.initialize();
TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
- LOG.info("Coprocessor is using transaction state: " + cache.getLatestState());
+ LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache));
byte[] row = Bytes.toBytes(1);
for (int i = 4; i < V.length; i++) {
@@ -624,6 +626,21 @@ public class TransactionProcessorTest {
cache.stopAndWait();
}
+ private TransactionVisibilityState waitForTransactionState(TransactionStateCache cache) throws InterruptedException {
+ long timeout = 5000; // ms
+ do {
+ TransactionVisibilityState state = cache.getLatestState();
+ if (state != null) {
+ return state;
+ }
+ TimeUnit.MILLISECONDS.sleep(100);
+ timeout -= 100;
+ } while (timeout > 0L);
+ LOG.error("Timed out waiting foe transaction state cache");
+ Assert.fail("Timed out waiting foe transaction state cache");
+ return null;
+ }
+
private static class MockRegionServerServices implements RegionServerServices {
private final Configuration hConf;
private final ZooKeeperWatcher zookeeper;
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/7cfe0612/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
index f6d8e2d..9ce30b5 100644
--- a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
+++ b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
@@ -62,6 +62,7 @@ import org.apache.tephra.snapshot.DefaultSnapshotCodec;
import org.apache.tephra.snapshot.SnapshotCodecProvider;
import org.apache.tephra.util.TxUtils;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
@@ -160,7 +161,7 @@ public class TransactionProcessorTest {
try {
region.initialize();
TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
- LOG.info("Coprocessor is using transaction state: " + cache.getLatestState());
+ LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache));
for (int i = 1; i <= 8; i++) {
for (int k = 1; k <= i; k++) {
@@ -175,7 +176,8 @@ public class TransactionProcessorTest {
// force a flush to clear the data
// during flush, the coprocessor should drop all KeyValues with timestamps in the invalid set
LOG.info("Flushing region " + region.getRegionNameAsString());
- region.flushcache();
+ HRegion.FlushResult flushResult = region.flushcache();
+ Assert.assertTrue("Unexpected flush result: " + flushResult.toString(), flushResult.isFlushSucceeded());
// now a normal scan should only return the valid rows
// do not use a filter here to test that cleanup works on flush
@@ -217,7 +219,7 @@ public class TransactionProcessorTest {
try {
region.initialize();
TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
- LOG.info("Coprocessor is using transaction state: " + cache.getLatestState());
+ LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache));
byte[] row = Bytes.toBytes(1);
for (int i = 4; i < V.length; i++) {
@@ -606,6 +608,21 @@ public class TransactionProcessorTest {
cache.stopAndWait();
}
+ private TransactionVisibilityState waitForTransactionState(TransactionStateCache cache) throws InterruptedException {
+ long timeout = 5000; // ms
+ do {
+ TransactionVisibilityState state = cache.getLatestState();
+ if (state != null) {
+ return state;
+ }
+ TimeUnit.MILLISECONDS.sleep(100);
+ timeout -= 100;
+ } while (timeout > 0L);
+ LOG.error("Timed out waiting foe transaction state cache");
+ Assert.fail("Timed out waiting foe transaction state cache");
+ return null;
+ }
+
private static class LocalRegionServerServices extends MockRegionServerServices {
private final ServerName serverName;
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/7cfe0612/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
index 8dfce32..0ec3b46 100644
--- a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
+++ b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
@@ -62,6 +62,7 @@ import org.apache.tephra.snapshot.DefaultSnapshotCodec;
import org.apache.tephra.snapshot.SnapshotCodecProvider;
import org.apache.tephra.util.TxUtils;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
@@ -160,7 +161,7 @@ public class TransactionProcessorTest {
try {
region.initialize();
TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
- LOG.info("Coprocessor is using transaction state: " + cache.getLatestState());
+ LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache));
for (int i = 1; i <= 8; i++) {
for (int k = 1; k <= i; k++) {
@@ -175,7 +176,8 @@ public class TransactionProcessorTest {
// force a flush to clear the data
// during flush, the coprocessor should drop all KeyValues with timestamps in the invalid set
LOG.info("Flushing region " + region.getRegionNameAsString());
- region.flushcache();
+ HRegion.FlushResult flushResult = region.flushcache();
+ Assert.assertTrue("Unexpected flush result: " + flushResult, flushResult.isFlushSucceeded());
// now a normal scan should only return the valid rows
// do not use a filter here to test that cleanup works on flush
@@ -217,7 +219,7 @@ public class TransactionProcessorTest {
try {
region.initialize();
TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
- LOG.info("Coprocessor is using transaction state: " + cache.getLatestState());
+ LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache));
byte[] row = Bytes.toBytes(1);
for (int i = 4; i < V.length; i++) {
@@ -606,6 +608,21 @@ public class TransactionProcessorTest {
cache.stopAndWait();
}
+ private TransactionVisibilityState waitForTransactionState(TransactionStateCache cache) throws InterruptedException {
+ long timeout = 5000; // ms
+ do {
+ TransactionVisibilityState state = cache.getLatestState();
+ if (state != null) {
+ return state;
+ }
+ TimeUnit.MILLISECONDS.sleep(100);
+ timeout -= 100;
+ } while (timeout > 0L);
+ LOG.error("Timed out waiting foe transaction state cache");
+ Assert.fail("Timed out waiting foe transaction state cache");
+ return null;
+ }
+
private static class LocalRegionServerServices extends MockRegionServerServices {
private final ServerName serverName;
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/7cfe0612/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
index 9f7206d..f133735 100644
--- a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
+++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.util.Bytes;
@@ -62,6 +63,7 @@ import org.apache.tephra.snapshot.DefaultSnapshotCodec;
import org.apache.tephra.snapshot.SnapshotCodecProvider;
import org.apache.tephra.util.TxUtils;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
@@ -160,7 +162,7 @@ public class TransactionProcessorTest {
try {
region.initialize();
TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
- LOG.info("Coprocessor is using transaction state: " + cache.getLatestState());
+ LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache));
for (int i = 1; i <= 8; i++) {
for (int k = 1; k <= i; k++) {
@@ -174,10 +176,10 @@ public class TransactionProcessorTest {
// force a flush to clear the data
// during flush, the coprocessor should drop all KeyValues with timestamps in the invalid set
-
LOG.info("Flushing region " + region.getRegionInfo().getRegionNameAsString());
- region.flushcache(true, false);
-
+ Region.FlushResult flushResult = region.flushcache(true, false);
+ Assert.assertTrue("Unexpected flush result: " + flushResult, flushResult.isFlushSucceeded());
+
// now a normal scan should only return the valid rows
// do not use a filter here to test that cleanup works on flush
Scan scan = new Scan();
@@ -218,7 +220,7 @@ public class TransactionProcessorTest {
try {
region.initialize();
TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
- LOG.info("Coprocessor is using transaction state: " + cache.getLatestState());
+ LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache));
byte[] row = Bytes.toBytes(1);
for (int i = 4; i < V.length; i++) {
@@ -608,6 +610,20 @@ public class TransactionProcessorTest {
cache.stopAndWait();
}
+ private TransactionVisibilityState waitForTransactionState(TransactionStateCache cache) throws InterruptedException {
+ long timeout = 5000; // ms
+ do {
+ TransactionVisibilityState state = cache.getLatestState();
+ if (state != null) {
+ return state;
+ }
+ TimeUnit.MILLISECONDS.sleep(100);
+ timeout -= 100;
+ } while (timeout > 0L);
+ Assert.fail("Timed out waiting foe transaction state cache");
+ return null;
+ }
+
private static class LocalRegionServerServices extends MockRegionServerServices {
private final ServerName serverName;
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/7cfe0612/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
index 15842a3..4c8fa64 100644
--- a/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
+++ b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.util.Bytes;
@@ -62,6 +63,7 @@ import org.apache.tephra.snapshot.DefaultSnapshotCodec;
import org.apache.tephra.snapshot.SnapshotCodecProvider;
import org.apache.tephra.util.TxUtils;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
@@ -160,7 +162,7 @@ public class TransactionProcessorTest {
try {
region.initialize();
TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
- LOG.info("Coprocessor is using transaction state: " + cache.getLatestState());
+ LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache));
for (int i = 1; i <= 8; i++) {
for (int k = 1; k <= i; k++) {
@@ -176,7 +178,8 @@ public class TransactionProcessorTest {
// during flush, the coprocessor should drop all KeyValues with timestamps in the invalid set
LOG.info("Flushing region " + region.getRegionInfo().getRegionNameAsString());
- region.flushcache(true, false);
+ Region.FlushResult flushResult = region.flushcache(true, false);
+ Assert.assertTrue("Unexpected flush result: " + flushResult, flushResult.isFlushSucceeded());
// now a normal scan should only return the valid rows
// do not use a filter here to test that cleanup works on flush
@@ -218,7 +221,7 @@ public class TransactionProcessorTest {
try {
region.initialize();
TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
- LOG.info("Coprocessor is using transaction state: " + cache.getLatestState());
+ LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache));
byte[] row = Bytes.toBytes(1);
for (int i = 4; i < V.length; i++) {
@@ -608,6 +611,21 @@ public class TransactionProcessorTest {
cache.stopAndWait();
}
+ private TransactionVisibilityState waitForTransactionState(TransactionStateCache cache) throws InterruptedException {
+ long timeout = 5000; // ms
+ do {
+ TransactionVisibilityState state = cache.getLatestState();
+ if (state != null) {
+ return state;
+ }
+ TimeUnit.MILLISECONDS.sleep(100);
+ timeout -= 100;
+ } while (timeout > 0L);
+ LOG.error("Timed out waiting foe transaction state cache");
+ Assert.fail("Timed out waiting foe transaction state cache");
+ return null;
+ }
+
private static class LocalRegionServerServices extends MockRegionServerServices {
private final ServerName serverName;