You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ch...@apache.org on 2016/02/22 22:30:37 UTC

hbase git commit: HBASE-15169 Backport HBASE-14362 'TestWALProcedureStoreOnHDFS is super duper flaky' to branch-1.1

Repository: hbase
Updated Branches:
  refs/heads/branch-1.1 ff65cba33 -> 10d607717


HBASE-15169 Backport HBASE-14362 'TestWALProcedureStoreOnHDFS is super duper flaky' to branch-1.1


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/10d60771
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/10d60771
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/10d60771

Branch: refs/heads/branch-1.1
Commit: 10d607717257af8e83368e34bc32f98f98389dc8
Parents: ff65cba
Author: chenheng <ch...@apache.org>
Authored: Tue Feb 23 05:04:34 2016 +0800
Committer: chenheng <ch...@apache.org>
Committed: Tue Feb 23 05:04:34 2016 +0800

----------------------------------------------------------------------
 .../procedure/TestWALProcedureStoreOnHDFS.java  | 189 +++++++++----------
 1 file changed, 92 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/10d60771/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestWALProcedureStoreOnHDFS.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestWALProcedureStoreOnHDFS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestWALProcedureStoreOnHDFS.java
index e26418d..a5a3620 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestWALProcedureStoreOnHDFS.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestWALProcedureStoreOnHDFS.java
@@ -18,45 +18,24 @@
 
 package org.apache.hadoop.hbase.master.procedure;
 
-import java.io.IOException;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.CountDownLatch;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MiniHBaseCluster;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.master.HMaster;
-import org.apache.hadoop.hbase.procedure2.Procedure;
-import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
 import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
 import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
 import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.CreateTableState;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.DeleteTableState;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.DisableTableState;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.EnableTableState;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.TruncateTableState;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.ModifyRegionUtils;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 
-import org.junit.After;
-import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.mockito.Mockito;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -71,7 +50,18 @@ public class TestWALProcedureStoreOnHDFS {
 
   private WALProcedureStore store;
 
-  private static void setupConf(Configuration conf) {
+  private ProcedureStore.ProcedureStoreListener stopProcedureListener = new ProcedureStore.ProcedureStoreListener() {
+    @Override
+    public void postSync() {}
+
+    @Override
+    public void abortProcess() {
+      LOG.fatal("Abort the Procedure Store");
+      store.stop(true);
+    }
+  };
+
+  private static void initConfig(Configuration conf) {
     conf.setInt("dfs.replication", 3);
     conf.setInt("dfs.namenode.replication.min", 3);
 
@@ -81,29 +71,16 @@ public class TestWALProcedureStoreOnHDFS {
     conf.setInt("hbase.procedure.store.wal.sync.failure.roll.max", 10);
   }
 
-  @Before
   public void setup() throws Exception {
-    setupConf(UTIL.getConfiguration());
     MiniDFSCluster dfs = UTIL.startMiniDFSCluster(3);
-
     Path logDir = new Path(new Path(dfs.getFileSystem().getUri()), "/test-logs");
-    store = ProcedureTestingUtility.createWalStore(
-      UTIL.getConfiguration(), dfs.getFileSystem(), logDir);
-    store.registerListener(new ProcedureStore.ProcedureStoreListener() {
-      @Override
-      public void postSync() {}
-
-      @Override
-      public void abortProcess() {
-        LOG.fatal("Abort the Procedure Store");
-        store.stop(true);
-      }
-    });
+    store = ProcedureTestingUtility.createWalStore(UTIL.getConfiguration(), dfs.getFileSystem(),
+      logDir);
+    store.registerListener(stopProcedureListener);
     store.start(8);
     store.recoverLease();
   }
 
-  @After
   public void tearDown() throws Exception {
     store.stop(false);
     UTIL.getDFSCluster().getFileSystem().delete(store.getLogDir(), true);
@@ -117,84 +94,102 @@ public class TestWALProcedureStoreOnHDFS {
 
   @Test(timeout=60000, expected=RuntimeException.class)
   public void testWalAbortOnLowReplication() throws Exception {
-    assertEquals(3, UTIL.getDFSCluster().getDataNodes().size());
-
-    LOG.info("Stop DataNode");
-    UTIL.getDFSCluster().stopDataNode(0);
-    assertEquals(2, UTIL.getDFSCluster().getDataNodes().size());
+    initConfig(UTIL.getConfiguration());
+    setup();
+    try {
+      assertEquals(3, UTIL.getDFSCluster().getDataNodes().size());
 
-    store.insert(new TestProcedure(1, -1), null);
-    for (long i = 2; store.isRunning(); ++i) {
+      LOG.info("Stop DataNode");
+      UTIL.getDFSCluster().stopDataNode(0);
       assertEquals(2, UTIL.getDFSCluster().getDataNodes().size());
-      store.insert(new TestProcedure(i, -1), null);
-      Thread.sleep(100);
+
+      store.insert(new TestProcedure(1, -1), null);
+      for (long i = 2; store.isRunning(); ++i) {
+        assertEquals(2, UTIL.getDFSCluster().getDataNodes().size());
+        store.insert(new TestProcedure(i, -1), null);
+        Thread.sleep(100);
+      }
+      assertFalse(store.isRunning());
+      fail("The store.insert() should throw an exeption");
+    } finally {
+      tearDown();
     }
-    assertFalse(store.isRunning());
-    fail("The store.insert() should throw an exeption");
   }
 
   @Test(timeout=60000)
   public void testWalAbortOnLowReplicationWithQueuedWriters() throws Exception {
-    assertEquals(3, UTIL.getDFSCluster().getDataNodes().size());
+    initConfig(UTIL.getConfiguration());
+    setup();
+    try {
+      assertEquals(3, UTIL.getDFSCluster().getDataNodes().size());
+      store.registerListener(new ProcedureStore.ProcedureStoreListener() {
+        @Override
+        public void postSync() {
+          Threads.sleepWithoutInterrupt(2000);
+        }
 
-    store.registerListener(new ProcedureStore.ProcedureStoreListener() {
-      @Override
-      public void postSync() {
-        Threads.sleepWithoutInterrupt(2000);
+        @Override
+        public void abortProcess() {}
+      });
+
+      final AtomicInteger reCount = new AtomicInteger(0);
+      Thread[] thread = new Thread[store.getNumThreads() * 2 + 1];
+      for (int i = 0; i < thread.length; ++i) {
+        final long procId = i + 1;
+        thread[i] = new Thread() {
+          public void run() {
+            try {
+              LOG.debug("[S] INSERT " + procId);
+              store.insert(new TestProcedure(procId, -1), null);
+              LOG.debug("[E] INSERT " + procId);
+            } catch (RuntimeException e) {
+              reCount.incrementAndGet();
+              LOG.debug("[F] INSERT " + procId + ": " + e.getMessage());
+            }
+          }
+        };
+        thread[i].start();
       }
 
-      @Override
-      public void abortProcess() {}
-    });
-
-    final AtomicInteger reCount = new AtomicInteger(0);
-    Thread[] thread = new Thread[store.getNumThreads() * 2 + 1];
-    for (int i = 0; i < thread.length; ++i) {
-      final long procId = i + 1;
-      thread[i] = new Thread() {
-        public void run() {
-          try {
-            LOG.debug("[S] INSERT " + procId);
-            store.insert(new TestProcedure(procId, -1), null);
-            LOG.debug("[E] INSERT " + procId);
-          } catch (RuntimeException e) {
-            reCount.incrementAndGet();
-            LOG.debug("[F] INSERT " + procId + ": " + e.getMessage());
-          }
-        }
-      };
-      thread[i].start();
-    }
+      Thread.sleep(1000);
+      LOG.info("Stop DataNode");
+      UTIL.getDFSCluster().stopDataNode(0);
+      assertEquals(2, UTIL.getDFSCluster().getDataNodes().size());
 
-    Thread.sleep(1000);
-    LOG.info("Stop DataNode");
-    UTIL.getDFSCluster().stopDataNode(0);
-    assertEquals(2, UTIL.getDFSCluster().getDataNodes().size());
+      for (int i = 0; i < thread.length; ++i) {
+        thread[i].join();
+      }
 
-    for (int i = 0; i < thread.length; ++i) {
-      thread[i].join();
+      assertFalse(store.isRunning());
+      assertTrue(reCount.toString(), reCount.get() >= store.getNumThreads() &&
+                                     reCount.get() < thread.length);
+    } finally {
+      tearDown();
     }
-
-    assertFalse(store.isRunning());
-    assertTrue(reCount.toString(), reCount.get() >= store.getNumThreads() &&
-                                   reCount.get() < thread.length);
   }
 
   @Test(timeout=60000)
   public void testWalRollOnLowReplication() throws Exception {
-    int dnCount = 0;
-    store.insert(new TestProcedure(1, -1), null);
-    UTIL.getDFSCluster().restartDataNode(dnCount);
-    for (long i = 2; i < 100; ++i) {
-      store.insert(new TestProcedure(i, -1), null);
-      waitForNumReplicas(3);
-      Thread.sleep(100);
-      if ((i % 30) == 0) {
-        LOG.info("Restart Data Node");
-        UTIL.getDFSCluster().restartDataNode(++dnCount % 3);
+    initConfig(UTIL.getConfiguration());
+    UTIL.getConfiguration().setInt("dfs.namenode.replication.min", 1);
+    setup();
+    try {
+      int dnCount = 0;
+      store.insert(new TestProcedure(1, -1), null);
+      UTIL.getDFSCluster().restartDataNode(dnCount);
+      for (long i = 2; i < 100; ++i) {
+        store.insert(new TestProcedure(i, -1), null);
+        waitForNumReplicas(3);
+        Thread.sleep(100);
+        if ((i % 30) == 0) {
+          LOG.info("Restart Data Node");
+          UTIL.getDFSCluster().restartDataNode(++dnCount % 3);
+        }
       }
+      assertTrue(store.isRunning());
+    } finally {
+      tearDown();
     }
-    assertTrue(store.isRunning());
   }
 
   public void waitForNumReplicas(int numReplicas) throws Exception {