You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by an...@apache.org on 2017/09/11 04:09:27 UTC

incubator-tephra git commit: (TEPHRA-253) Fix flaky TransactionProcessorTest

Repository: incubator-tephra
Updated Branches:
  refs/heads/master ae6ce2b5e -> 7cfe06125


(TEPHRA-253) Fix flaky TransactionProcessorTest

This closes #54 from GitHub.

Signed-off-by: anew <an...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/7cfe0612
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/7cfe0612
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/7cfe0612

Branch: refs/heads/master
Commit: 7cfe0612538ead57ec66e580ed1d2d9e950c5c73
Parents: ae6ce2b
Author: anew <an...@apache.org>
Authored: Fri Sep 8 23:43:55 2017 -0700
Committer: anew <an...@apache.org>
Committed: Sun Sep 10 21:08:51 2017 -0700

----------------------------------------------------------------------
 .../coprocessor/TransactionProcessorTest.java   | 22 ++++++++++++++---
 .../coprocessor/TransactionProcessorTest.java   | 23 ++++++++++++++---
 .../coprocessor/TransactionProcessorTest.java   | 23 ++++++++++++++---
 .../coprocessor/TransactionProcessorTest.java   | 23 ++++++++++++++---
 .../coprocessor/TransactionProcessorTest.java   | 26 ++++++++++++++++----
 .../coprocessor/TransactionProcessorTest.java   | 24 +++++++++++++++---
 6 files changed, 121 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/7cfe0612/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
index 1879116..3c7d1e2 100644
--- a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
+++ b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
@@ -74,6 +74,7 @@ import org.apache.tephra.snapshot.SnapshotCodecProvider;
 import org.apache.tephra.util.TxUtils;
 import org.apache.zookeeper.KeeperException;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -175,7 +176,7 @@ public class TransactionProcessorTest {
     try {
       region.initialize();
       TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
-      LOG.info("Coprocessor is using transaction state: " + cache.getLatestState());
+      LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache));
 
       for (int i = 1; i <= 8; i++) {
         for (int k = 1; k <= i; k++) {
@@ -190,7 +191,7 @@ public class TransactionProcessorTest {
       // force a flush to clear the data
       // during flush, the coprocessor should drop all KeyValues with timestamps in the invalid set
       LOG.info("Flushing region " + region.getRegionNameAsString());
-      region.flushcache();
+      region.flushcache(); // in 0.96, there is no indication of success
 
       // now a normal scan should only return the valid rows - testing that cleanup works on flush
       Scan scan = new Scan();
@@ -231,7 +232,7 @@ public class TransactionProcessorTest {
     try {
       region.initialize();
       TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
-      LOG.info("Coprocessor is using transaction state: " + cache.getLatestState());
+      LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache));
 
       byte[] row = Bytes.toBytes(1);
       for (int i = 4; i < V.length; i++) {
@@ -620,6 +621,21 @@ public class TransactionProcessorTest {
     cache.stopAndWait();
   }
 
+  private TransactionVisibilityState waitForTransactionState(TransactionStateCache cache) throws InterruptedException {
+    long timeout = 5000; // ms
+    do {
+      TransactionVisibilityState state = cache.getLatestState();
+      if (state != null) {
+        return state;
+      }
+      TimeUnit.MILLISECONDS.sleep(100);
+      timeout -= 100;
+    } while (timeout > 0L);
+    LOG.error("Timed out waiting foe transaction state cache");
+    Assert.fail("Timed out waiting foe transaction state cache");
+    return null;
+  }
+
   private static class MockRegionServerServices implements RegionServerServices {
     private final Configuration hConf;
     private final ZooKeeperWatcher zookeeper;

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/7cfe0612/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
index abe375d..b8e051b 100644
--- a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
+++ b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
@@ -80,6 +80,7 @@ import org.apache.tephra.snapshot.SnapshotCodecProvider;
 import org.apache.tephra.util.TxUtils;
 import org.apache.zookeeper.KeeperException;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -180,7 +181,7 @@ public class TransactionProcessorTest {
     try {
       region.initialize();
       TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
-      LOG.info("Coprocessor is using transaction state: " + cache.getLatestState());
+      LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache));
 
       for (int i = 1; i <= 8; i++) {
         for (int k = 1; k <= i; k++) {
@@ -195,7 +196,8 @@ public class TransactionProcessorTest {
       // force a flush to clear the data
       // during flush, the coprocessor should drop all KeyValues with timestamps in the invalid set
       LOG.info("Flushing region " + region.getRegionNameAsString());
-      region.flushcache();
+      HRegion.FlushResult flushResult = region.flushcache();
+      Assert.assertTrue("Unexpected flush result: " + flushResult, flushResult.isFlushSucceeded());
 
       // now a normal scan should only return the valid rows
       // do not use a filter here to test that cleanup works on flush
@@ -237,7 +239,7 @@ public class TransactionProcessorTest {
     try {
       region.initialize();
       TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
-      LOG.info("Coprocessor is using transaction state: " + cache.getLatestState());
+      LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache));
 
       byte[] row = Bytes.toBytes(1);
       for (int i = 4; i < V.length; i++) {
@@ -624,6 +626,21 @@ public class TransactionProcessorTest {
     cache.stopAndWait();
   }
 
+  private TransactionVisibilityState waitForTransactionState(TransactionStateCache cache) throws InterruptedException {
+    long timeout = 5000; // ms
+    do {
+      TransactionVisibilityState state = cache.getLatestState();
+      if (state != null) {
+        return state;
+      }
+      TimeUnit.MILLISECONDS.sleep(100);
+      timeout -= 100;
+    } while (timeout > 0L);
+    LOG.error("Timed out waiting foe transaction state cache");
+    Assert.fail("Timed out waiting foe transaction state cache");
+    return null;
+  }
+
   private static class MockRegionServerServices implements RegionServerServices {
     private final Configuration hConf;
     private final ZooKeeperWatcher zookeeper;

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/7cfe0612/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
index f6d8e2d..9ce30b5 100644
--- a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
+++ b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
@@ -62,6 +62,7 @@ import org.apache.tephra.snapshot.DefaultSnapshotCodec;
 import org.apache.tephra.snapshot.SnapshotCodecProvider;
 import org.apache.tephra.util.TxUtils;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -160,7 +161,7 @@ public class TransactionProcessorTest {
     try {
       region.initialize();
       TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
-      LOG.info("Coprocessor is using transaction state: " + cache.getLatestState());
+      LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache));
 
       for (int i = 1; i <= 8; i++) {
         for (int k = 1; k <= i; k++) {
@@ -175,7 +176,8 @@ public class TransactionProcessorTest {
       // force a flush to clear the data
       // during flush, the coprocessor should drop all KeyValues with timestamps in the invalid set
       LOG.info("Flushing region " + region.getRegionNameAsString());
-      region.flushcache();
+      HRegion.FlushResult flushResult = region.flushcache();
+      Assert.assertTrue("Unexpected flush result: " + flushResult.toString(), flushResult.isFlushSucceeded());
 
       // now a normal scan should only return the valid rows
       // do not use a filter here to test that cleanup works on flush
@@ -217,7 +219,7 @@ public class TransactionProcessorTest {
     try {
       region.initialize();
       TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
-      LOG.info("Coprocessor is using transaction state: " + cache.getLatestState());
+      LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache));
 
       byte[] row = Bytes.toBytes(1);
       for (int i = 4; i < V.length; i++) {
@@ -606,6 +608,21 @@ public class TransactionProcessorTest {
     cache.stopAndWait();
   }
 
+  private TransactionVisibilityState waitForTransactionState(TransactionStateCache cache) throws InterruptedException {
+    long timeout = 5000; // ms
+    do {
+      TransactionVisibilityState state = cache.getLatestState();
+      if (state != null) {
+        return state;
+      }
+      TimeUnit.MILLISECONDS.sleep(100);
+      timeout -= 100;
+    } while (timeout > 0L);
+    LOG.error("Timed out waiting foe transaction state cache");
+    Assert.fail("Timed out waiting foe transaction state cache");
+    return null;
+  }
+
   private static class LocalRegionServerServices extends MockRegionServerServices {
     private final ServerName serverName;
 

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/7cfe0612/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
index 8dfce32..0ec3b46 100644
--- a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
+++ b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
@@ -62,6 +62,7 @@ import org.apache.tephra.snapshot.DefaultSnapshotCodec;
 import org.apache.tephra.snapshot.SnapshotCodecProvider;
 import org.apache.tephra.util.TxUtils;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -160,7 +161,7 @@ public class TransactionProcessorTest {
     try {
       region.initialize();
       TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
-      LOG.info("Coprocessor is using transaction state: " + cache.getLatestState());
+      LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache));
 
       for (int i = 1; i <= 8; i++) {
         for (int k = 1; k <= i; k++) {
@@ -175,7 +176,8 @@ public class TransactionProcessorTest {
       // force a flush to clear the data
       // during flush, the coprocessor should drop all KeyValues with timestamps in the invalid set
       LOG.info("Flushing region " + region.getRegionNameAsString());
-      region.flushcache();
+      HRegion.FlushResult flushResult = region.flushcache();
+      Assert.assertTrue("Unexpected flush result: " + flushResult, flushResult.isFlushSucceeded());
 
       // now a normal scan should only return the valid rows
       // do not use a filter here to test that cleanup works on flush
@@ -217,7 +219,7 @@ public class TransactionProcessorTest {
     try {
       region.initialize();
       TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
-      LOG.info("Coprocessor is using transaction state: " + cache.getLatestState());
+      LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache));
 
       byte[] row = Bytes.toBytes(1);
       for (int i = 4; i < V.length; i++) {
@@ -606,6 +608,21 @@ public class TransactionProcessorTest {
     cache.stopAndWait();
   }
 
+  private TransactionVisibilityState waitForTransactionState(TransactionStateCache cache) throws InterruptedException {
+    long timeout = 5000; // ms
+    do {
+      TransactionVisibilityState state = cache.getLatestState();
+      if (state != null) {
+        return state;
+      }
+      TimeUnit.MILLISECONDS.sleep(100);
+      timeout -= 100;
+    } while (timeout > 0L);
+    LOG.error("Timed out waiting foe transaction state cache");
+    Assert.fail("Timed out waiting foe transaction state cache");
+    return null;
+  }
+
   private static class LocalRegionServerServices extends MockRegionServerServices {
     private final ServerName serverName;
 

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/7cfe0612/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
index 9f7206d..f133735 100644
--- a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
+++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -62,6 +63,7 @@ import org.apache.tephra.snapshot.DefaultSnapshotCodec;
 import org.apache.tephra.snapshot.SnapshotCodecProvider;
 import org.apache.tephra.util.TxUtils;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -160,7 +162,7 @@ public class TransactionProcessorTest {
     try {
       region.initialize();
       TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
-      LOG.info("Coprocessor is using transaction state: " + cache.getLatestState());
+      LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache));
 
       for (int i = 1; i <= 8; i++) {
         for (int k = 1; k <= i; k++) {
@@ -174,10 +176,10 @@ public class TransactionProcessorTest {
 
       // force a flush to clear the data
       // during flush, the coprocessor should drop all KeyValues with timestamps in the invalid set
-
       LOG.info("Flushing region " + region.getRegionInfo().getRegionNameAsString());
-      region.flushcache(true, false);
-
+      Region.FlushResult flushResult = region.flushcache(true, false);
+      Assert.assertTrue("Unexpected flush result: " + flushResult, flushResult.isFlushSucceeded());
+      
       // now a normal scan should only return the valid rows
       // do not use a filter here to test that cleanup works on flush
       Scan scan = new Scan();
@@ -218,7 +220,7 @@ public class TransactionProcessorTest {
     try {
       region.initialize();
       TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
-      LOG.info("Coprocessor is using transaction state: " + cache.getLatestState());
+      LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache));
 
       byte[] row = Bytes.toBytes(1);
       for (int i = 4; i < V.length; i++) {
@@ -608,6 +610,20 @@ public class TransactionProcessorTest {
     cache.stopAndWait();
   }
 
+  private TransactionVisibilityState waitForTransactionState(TransactionStateCache cache) throws InterruptedException {
+    long timeout = 5000; // ms
+    do {
+      TransactionVisibilityState state = cache.getLatestState();
+      if (state != null) {
+        return state;
+      }
+      TimeUnit.MILLISECONDS.sleep(100);
+      timeout -= 100;
+    } while (timeout > 0L);
+    Assert.fail("Timed out waiting foe transaction state cache");
+    return null;
+  }
+
   private static class LocalRegionServerServices extends MockRegionServerServices {
     private final ServerName serverName;
 

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/7cfe0612/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
index 15842a3..4c8fa64 100644
--- a/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
+++ b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -62,6 +63,7 @@ import org.apache.tephra.snapshot.DefaultSnapshotCodec;
 import org.apache.tephra.snapshot.SnapshotCodecProvider;
 import org.apache.tephra.util.TxUtils;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -160,7 +162,7 @@ public class TransactionProcessorTest {
     try {
       region.initialize();
       TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
-      LOG.info("Coprocessor is using transaction state: " + cache.getLatestState());
+      LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache));
 
       for (int i = 1; i <= 8; i++) {
         for (int k = 1; k <= i; k++) {
@@ -176,7 +178,8 @@ public class TransactionProcessorTest {
       // during flush, the coprocessor should drop all KeyValues with timestamps in the invalid set
 
       LOG.info("Flushing region " + region.getRegionInfo().getRegionNameAsString());
-      region.flushcache(true, false);
+      Region.FlushResult flushResult = region.flushcache(true, false);
+      Assert.assertTrue("Unexpected flush result: " + flushResult, flushResult.isFlushSucceeded());
 
       // now a normal scan should only return the valid rows
       // do not use a filter here to test that cleanup works on flush
@@ -218,7 +221,7 @@ public class TransactionProcessorTest {
     try {
       region.initialize();
       TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
-      LOG.info("Coprocessor is using transaction state: " + cache.getLatestState());
+      LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache));
 
       byte[] row = Bytes.toBytes(1);
       for (int i = 4; i < V.length; i++) {
@@ -608,6 +611,21 @@ public class TransactionProcessorTest {
     cache.stopAndWait();
   }
 
+  private TransactionVisibilityState waitForTransactionState(TransactionStateCache cache) throws InterruptedException {
+    long timeout = 5000; // ms
+    do {
+      TransactionVisibilityState state = cache.getLatestState();
+      if (state != null) {
+        return state;
+      }
+      TimeUnit.MILLISECONDS.sleep(100);
+      timeout -= 100;
+    } while (timeout > 0L);
+    LOG.error("Timed out waiting foe transaction state cache");
+    Assert.fail("Timed out waiting foe transaction state cache");
+    return null;
+  }
+
   private static class LocalRegionServerServices extends MockRegionServerServices {
     private final ServerName serverName;