You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ch...@apache.org on 2016/02/22 22:30:37 UTC
hbase git commit: HBASE-15169 Backport HBASE-14362
'TestWALProcedureStoreOnHDFS is super duper flaky' to branch-1.1
Repository: hbase
Updated Branches:
refs/heads/branch-1.1 ff65cba33 -> 10d607717
HBASE-15169 Backport HBASE-14362 'TestWALProcedureStoreOnHDFS is super duper flaky' to branch-1.1
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/10d60771
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/10d60771
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/10d60771
Branch: refs/heads/branch-1.1
Commit: 10d607717257af8e83368e34bc32f98f98389dc8
Parents: ff65cba
Author: chenheng <ch...@apache.org>
Authored: Tue Feb 23 05:04:34 2016 +0800
Committer: chenheng <ch...@apache.org>
Committed: Tue Feb 23 05:04:34 2016 +0800
----------------------------------------------------------------------
.../procedure/TestWALProcedureStoreOnHDFS.java | 189 +++++++++----------
1 file changed, 92 insertions(+), 97 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/10d60771/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestWALProcedureStoreOnHDFS.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestWALProcedureStoreOnHDFS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestWALProcedureStoreOnHDFS.java
index e26418d..a5a3620 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestWALProcedureStoreOnHDFS.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestWALProcedureStoreOnHDFS.java
@@ -18,45 +18,24 @@
package org.apache.hadoop.hbase.master.procedure;
-import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.CountDownLatch;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MiniHBaseCluster;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.master.HMaster;
-import org.apache.hadoop.hbase.procedure2.Procedure;
-import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.CreateTableState;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.DeleteTableState;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.DisableTableState;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.EnableTableState;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.TruncateTableState;
import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.ModifyRegionUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.junit.After;
-import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.mockito.Mockito;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -71,7 +50,18 @@ public class TestWALProcedureStoreOnHDFS {
private WALProcedureStore store;
- private static void setupConf(Configuration conf) {
+ private ProcedureStore.ProcedureStoreListener stopProcedureListener = new ProcedureStore.ProcedureStoreListener() {
+ @Override
+ public void postSync() {}
+
+ @Override
+ public void abortProcess() {
+ LOG.fatal("Abort the Procedure Store");
+ store.stop(true);
+ }
+ };
+
+ private static void initConfig(Configuration conf) {
conf.setInt("dfs.replication", 3);
conf.setInt("dfs.namenode.replication.min", 3);
@@ -81,29 +71,16 @@ public class TestWALProcedureStoreOnHDFS {
conf.setInt("hbase.procedure.store.wal.sync.failure.roll.max", 10);
}
- @Before
public void setup() throws Exception {
- setupConf(UTIL.getConfiguration());
MiniDFSCluster dfs = UTIL.startMiniDFSCluster(3);
-
Path logDir = new Path(new Path(dfs.getFileSystem().getUri()), "/test-logs");
- store = ProcedureTestingUtility.createWalStore(
- UTIL.getConfiguration(), dfs.getFileSystem(), logDir);
- store.registerListener(new ProcedureStore.ProcedureStoreListener() {
- @Override
- public void postSync() {}
-
- @Override
- public void abortProcess() {
- LOG.fatal("Abort the Procedure Store");
- store.stop(true);
- }
- });
+ store = ProcedureTestingUtility.createWalStore(UTIL.getConfiguration(), dfs.getFileSystem(),
+ logDir);
+ store.registerListener(stopProcedureListener);
store.start(8);
store.recoverLease();
}
- @After
public void tearDown() throws Exception {
store.stop(false);
UTIL.getDFSCluster().getFileSystem().delete(store.getLogDir(), true);
@@ -117,84 +94,102 @@ public class TestWALProcedureStoreOnHDFS {
@Test(timeout=60000, expected=RuntimeException.class)
public void testWalAbortOnLowReplication() throws Exception {
- assertEquals(3, UTIL.getDFSCluster().getDataNodes().size());
-
- LOG.info("Stop DataNode");
- UTIL.getDFSCluster().stopDataNode(0);
- assertEquals(2, UTIL.getDFSCluster().getDataNodes().size());
+ initConfig(UTIL.getConfiguration());
+ setup();
+ try {
+ assertEquals(3, UTIL.getDFSCluster().getDataNodes().size());
- store.insert(new TestProcedure(1, -1), null);
- for (long i = 2; store.isRunning(); ++i) {
+ LOG.info("Stop DataNode");
+ UTIL.getDFSCluster().stopDataNode(0);
assertEquals(2, UTIL.getDFSCluster().getDataNodes().size());
- store.insert(new TestProcedure(i, -1), null);
- Thread.sleep(100);
+
+ store.insert(new TestProcedure(1, -1), null);
+ for (long i = 2; store.isRunning(); ++i) {
+ assertEquals(2, UTIL.getDFSCluster().getDataNodes().size());
+ store.insert(new TestProcedure(i, -1), null);
+ Thread.sleep(100);
+ }
+ assertFalse(store.isRunning());
+ fail("The store.insert() should throw an exeption");
+ } finally {
+ tearDown();
}
- assertFalse(store.isRunning());
- fail("The store.insert() should throw an exeption");
}
@Test(timeout=60000)
public void testWalAbortOnLowReplicationWithQueuedWriters() throws Exception {
- assertEquals(3, UTIL.getDFSCluster().getDataNodes().size());
+ initConfig(UTIL.getConfiguration());
+ setup();
+ try {
+ assertEquals(3, UTIL.getDFSCluster().getDataNodes().size());
+ store.registerListener(new ProcedureStore.ProcedureStoreListener() {
+ @Override
+ public void postSync() {
+ Threads.sleepWithoutInterrupt(2000);
+ }
- store.registerListener(new ProcedureStore.ProcedureStoreListener() {
- @Override
- public void postSync() {
- Threads.sleepWithoutInterrupt(2000);
+ @Override
+ public void abortProcess() {}
+ });
+
+ final AtomicInteger reCount = new AtomicInteger(0);
+ Thread[] thread = new Thread[store.getNumThreads() * 2 + 1];
+ for (int i = 0; i < thread.length; ++i) {
+ final long procId = i + 1;
+ thread[i] = new Thread() {
+ public void run() {
+ try {
+ LOG.debug("[S] INSERT " + procId);
+ store.insert(new TestProcedure(procId, -1), null);
+ LOG.debug("[E] INSERT " + procId);
+ } catch (RuntimeException e) {
+ reCount.incrementAndGet();
+ LOG.debug("[F] INSERT " + procId + ": " + e.getMessage());
+ }
+ }
+ };
+ thread[i].start();
}
- @Override
- public void abortProcess() {}
- });
-
- final AtomicInteger reCount = new AtomicInteger(0);
- Thread[] thread = new Thread[store.getNumThreads() * 2 + 1];
- for (int i = 0; i < thread.length; ++i) {
- final long procId = i + 1;
- thread[i] = new Thread() {
- public void run() {
- try {
- LOG.debug("[S] INSERT " + procId);
- store.insert(new TestProcedure(procId, -1), null);
- LOG.debug("[E] INSERT " + procId);
- } catch (RuntimeException e) {
- reCount.incrementAndGet();
- LOG.debug("[F] INSERT " + procId + ": " + e.getMessage());
- }
- }
- };
- thread[i].start();
- }
+ Thread.sleep(1000);
+ LOG.info("Stop DataNode");
+ UTIL.getDFSCluster().stopDataNode(0);
+ assertEquals(2, UTIL.getDFSCluster().getDataNodes().size());
- Thread.sleep(1000);
- LOG.info("Stop DataNode");
- UTIL.getDFSCluster().stopDataNode(0);
- assertEquals(2, UTIL.getDFSCluster().getDataNodes().size());
+ for (int i = 0; i < thread.length; ++i) {
+ thread[i].join();
+ }
- for (int i = 0; i < thread.length; ++i) {
- thread[i].join();
+ assertFalse(store.isRunning());
+ assertTrue(reCount.toString(), reCount.get() >= store.getNumThreads() &&
+ reCount.get() < thread.length);
+ } finally {
+ tearDown();
}
-
- assertFalse(store.isRunning());
- assertTrue(reCount.toString(), reCount.get() >= store.getNumThreads() &&
- reCount.get() < thread.length);
}
@Test(timeout=60000)
public void testWalRollOnLowReplication() throws Exception {
- int dnCount = 0;
- store.insert(new TestProcedure(1, -1), null);
- UTIL.getDFSCluster().restartDataNode(dnCount);
- for (long i = 2; i < 100; ++i) {
- store.insert(new TestProcedure(i, -1), null);
- waitForNumReplicas(3);
- Thread.sleep(100);
- if ((i % 30) == 0) {
- LOG.info("Restart Data Node");
- UTIL.getDFSCluster().restartDataNode(++dnCount % 3);
+ initConfig(UTIL.getConfiguration());
+ UTIL.getConfiguration().setInt("dfs.namenode.replication.min", 1);
+ setup();
+ try {
+ int dnCount = 0;
+ store.insert(new TestProcedure(1, -1), null);
+ UTIL.getDFSCluster().restartDataNode(dnCount);
+ for (long i = 2; i < 100; ++i) {
+ store.insert(new TestProcedure(i, -1), null);
+ waitForNumReplicas(3);
+ Thread.sleep(100);
+ if ((i % 30) == 0) {
+ LOG.info("Restart Data Node");
+ UTIL.getDFSCluster().restartDataNode(++dnCount % 3);
+ }
}
+ assertTrue(store.isRunning());
+ } finally {
+ tearDown();
}
- assertTrue(store.isRunning());
}
public void waitForNumReplicas(int numReplicas) throws Exception {