You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2015/10/26 18:34:18 UTC

[12/14] hbase git commit: HBASE-14648 Reenable TestWALProcedureStoreOnHDFS#testWalRollOnLowReplication (Heng Chen)

HBASE-14648 Reenable TestWALProcedureStoreOnHDFS#testWalRollOnLowReplication (Heng Chen)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/46c646d6
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/46c646d6
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/46c646d6

Branch: refs/heads/hbase-12439
Commit: 46c646d6154d6a5291c308e065802248f4e4e7b9
Parents: 9597847
Author: stack <st...@apache.org>
Authored: Sat Oct 24 20:55:37 2015 -0700
Committer: stack <st...@apache.org>
Committed: Sat Oct 24 20:55:47 2015 -0700

----------------------------------------------------------------------
 .../procedure/TestWALProcedureStoreOnHDFS.java  | 167 +++++++++----------
 1 file changed, 79 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/46c646d6/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestWALProcedureStoreOnHDFS.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestWALProcedureStoreOnHDFS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestWALProcedureStoreOnHDFS.java
index b360966..2e80b69 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestWALProcedureStoreOnHDFS.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestWALProcedureStoreOnHDFS.java
@@ -66,7 +66,7 @@ public class TestWALProcedureStoreOnHDFS {
     }
   };
 
-  private static void setupConf(Configuration conf) {
+  private static void initConfig(Configuration conf) {
     conf.setInt("dfs.replication", 3);
     conf.setInt("dfs.namenode.replication.min", 3);
 
@@ -76,20 +76,16 @@ public class TestWALProcedureStoreOnHDFS {
     conf.setInt("hbase.procedure.store.wal.sync.failure.roll.max", 10);
   }
 
-  @Before
   public void setup() throws Exception {
-    setupConf(UTIL.getConfiguration());
     MiniDFSCluster dfs = UTIL.startMiniDFSCluster(3);
 
     Path logDir = new Path(new Path(dfs.getFileSystem().getUri()), "/test-logs");
-    store = ProcedureTestingUtility.createWalStore(
-      UTIL.getConfiguration(), dfs.getFileSystem(), logDir);
+    store = ProcedureTestingUtility.createWalStore(UTIL.getConfiguration(), dfs.getFileSystem(), logDir);
     store.registerListener(stopProcedureListener);
     store.start(8);
     store.recoverLease();
   }
 
-  @After
   public void tearDown() throws Exception {
     store.stop(false);
     UTIL.getDFSCluster().getFileSystem().delete(store.getLogDir(), true);
@@ -103,107 +99,102 @@ public class TestWALProcedureStoreOnHDFS {
 
   @Test(timeout=60000, expected=RuntimeException.class)
   public void testWalAbortOnLowReplication() throws Exception {
-    assertEquals(3, UTIL.getDFSCluster().getDataNodes().size());
-
-    LOG.info("Stop DataNode");
-    UTIL.getDFSCluster().stopDataNode(0);
-    assertEquals(2, UTIL.getDFSCluster().getDataNodes().size());
+    initConfig(UTIL.getConfiguration());
+    setup();
+    try {
+      assertEquals(3, UTIL.getDFSCluster().getDataNodes().size());
 
-    store.insert(new TestProcedure(1, -1), null);
-    for (long i = 2; store.isRunning(); ++i) {
+      LOG.info("Stop DataNode");
+      UTIL.getDFSCluster().stopDataNode(0);
       assertEquals(2, UTIL.getDFSCluster().getDataNodes().size());
-      store.insert(new TestProcedure(i, -1), null);
-      Thread.sleep(100);
+
+      store.insert(new TestProcedure(1, -1), null);
+      for (long i = 2; store.isRunning(); ++i) {
+        assertEquals(2, UTIL.getDFSCluster().getDataNodes().size());
+        store.insert(new TestProcedure(i, -1), null);
+        Thread.sleep(100);
+      }
+      assertFalse(store.isRunning());
+      fail("The store.insert() should throw an exeption");
+    } finally {
+      tearDown();
     }
-    assertFalse(store.isRunning());
-    fail("The store.insert() should throw an exeption");
   }
 
   @Test(timeout=60000)
   public void testWalAbortOnLowReplicationWithQueuedWriters() throws Exception {
-    assertEquals(3, UTIL.getDFSCluster().getDataNodes().size());
-    store.registerListener(new ProcedureStore.ProcedureStoreListener() {
-      @Override
-      public void postSync() {
-        Threads.sleepWithoutInterrupt(2000);
-      }
+    initConfig(UTIL.getConfiguration());
+    setup();
+    try {
+      assertEquals(3, UTIL.getDFSCluster().getDataNodes().size());
+      store.registerListener(new ProcedureStore.ProcedureStoreListener() {
+        @Override
+        public void postSync() {
+          Threads.sleepWithoutInterrupt(2000);
+        }
 
-      @Override
-      public void abortProcess() {}
-    });
-
-    final AtomicInteger reCount = new AtomicInteger(0);
-    Thread[] thread = new Thread[store.getNumThreads() * 2 + 1];
-    for (int i = 0; i < thread.length; ++i) {
-      final long procId = i + 1;
-      thread[i] = new Thread() {
-        public void run() {
-          try {
-            LOG.debug("[S] INSERT " + procId);
-            store.insert(new TestProcedure(procId, -1), null);
-            LOG.debug("[E] INSERT " + procId);
-          } catch (RuntimeException e) {
-            reCount.incrementAndGet();
-            LOG.debug("[F] INSERT " + procId + ": " + e.getMessage());
+        @Override
+        public void abortProcess() {}
+      });
+
+      final AtomicInteger reCount = new AtomicInteger(0);
+      Thread[] thread = new Thread[store.getNumThreads() * 2 + 1];
+      for (int i = 0; i < thread.length; ++i) {
+        final long procId = i + 1;
+        thread[i] = new Thread() {
+          public void run() {
+            try {
+              LOG.debug("[S] INSERT " + procId);
+              store.insert(new TestProcedure(procId, -1), null);
+              LOG.debug("[E] INSERT " + procId);
+            } catch (RuntimeException e) {
+              reCount.incrementAndGet();
+              LOG.debug("[F] INSERT " + procId + ": " + e.getMessage());
+            }
           }
-        }
-      };
-      thread[i].start();
-    }
+        };
+        thread[i].start();
+      }
 
-    Thread.sleep(1000);
-    LOG.info("Stop DataNode");
-    UTIL.getDFSCluster().stopDataNode(0);
-    assertEquals(2, UTIL.getDFSCluster().getDataNodes().size());
+      Thread.sleep(1000);
+      LOG.info("Stop DataNode");
+      UTIL.getDFSCluster().stopDataNode(0);
+      assertEquals(2, UTIL.getDFSCluster().getDataNodes().size());
 
-    for (int i = 0; i < thread.length; ++i) {
-      thread[i].join();
-    }
+      for (int i = 0; i < thread.length; ++i) {
+        thread[i].join();
+      }
 
-    assertFalse(store.isRunning());
-    assertTrue(reCount.toString(), reCount.get() >= store.getNumThreads() &&
-                                   reCount.get() < thread.length);
+      assertFalse(store.isRunning());
+      assertTrue(reCount.toString(), reCount.get() >= store.getNumThreads() &&
+                                     reCount.get() < thread.length);
+    } finally {
+      tearDown();
+    }
   }
 
-  @Ignore ("Needs work") @Test(timeout=60000)
+  @Test(timeout=60000)
   public void testWalRollOnLowReplication() throws Exception {
-    store.unregisterListener(stopProcedureListener);
-    store.registerListener(new ProcedureStore.ProcedureStoreListener() {
-      @Override
-      public void postSync() {}
-
-      @Override
-      public void abortProcess() {
-        LOG.info("Aborted!!!!");
-      }
-    });
-    int dnCount = 0;
-    store.insert(new TestProcedure(1, -1), null);
-    UTIL.getDFSCluster().restartDataNode(dnCount);
-    for (long i = 2; i < 100; ++i) {
-      try {
+    initConfig(UTIL.getConfiguration());
+    UTIL.getConfiguration().setInt("dfs.namenode.replication.min", 1);
+    setup();
+    try {
+      int dnCount = 0;
+      store.insert(new TestProcedure(1, -1), null);
+      UTIL.getDFSCluster().restartDataNode(dnCount);
+      for (long i = 2; i < 100; ++i) {
         store.insert(new TestProcedure(i, -1), null);
-      } catch (RuntimeException re) {
-        String msg = re.getMessage();
-        // We could get a sync failed here...if the test cluster is crawling such that DN recovery
-        // is taking a long time. If we've done enough passes, just finish up the test as a 'pass'
-        if (msg != null && msg.toLowerCase().contains("sync aborted")) {
-          LOG.info("i=" + i, re);
-          if (i > 50) {
-            LOG.info("Returning early... i=" + i + "...We ran enough of this test", re);
-            return;
-          }
+        waitForNumReplicas(3);
+        Thread.sleep(100);
+        if ((i % 30) == 0) {
+          LOG.info("Restart Data Node");
+          UTIL.getDFSCluster().restartDataNode(++dnCount % 3);
         }
-        throw re;
-      }
-      waitForNumReplicas(3);
-      Thread.sleep(100);
-      if ((i % 30) == 0) {
-        LOG.info("Restart Data Node");
-        UTIL.getDFSCluster().restartDataNode(++dnCount % 3);
       }
+      assertTrue(store.isRunning());
+    } finally {
+      tearDown();
     }
-    assertTrue(store.isRunning());
   }
 
   public void waitForNumReplicas(int numReplicas) throws Exception {