You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2016/10/19 02:41:42 UTC

[1/5] hbase git commit: HBASE-16824 Writer.flush() can be called on already closed streams in WAL roll

Repository: hbase
Updated Branches:
  refs/heads/branch-1 66941910b -> 019c7f930
  refs/heads/branch-1.1 382f88ae8 -> 4e304b3f9
  refs/heads/branch-1.2 bcc74e5ee -> 571814425
  refs/heads/branch-1.3 d38310aa4 -> c51722629
  refs/heads/master 6c89c6251 -> ef8c65e54


HBASE-16824 Writer.flush() can be called on already closed streams in WAL roll


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ef8c65e5
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ef8c65e5
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ef8c65e5

Branch: refs/heads/master
Commit: ef8c65e54201b37edfb9a8f4f4d24137544b8ec1
Parents: 6c89c62
Author: Enis Soztutar <en...@apache.org>
Authored: Tue Oct 18 18:46:02 2016 -0700
Committer: Enis Soztutar <en...@apache.org>
Committed: Tue Oct 18 18:46:02 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/wal/FSHLog.java   | 48 +++++++++++++++-----
 .../wal/TestLogRollingNoCluster.java            | 42 +++++++++++------
 2 files changed, 65 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/ef8c65e5/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index 3e0e829..142ab63 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -30,15 +30,6 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.lmax.disruptor.BlockingWaitStrategy;
-import com.lmax.disruptor.EventHandler;
-import com.lmax.disruptor.ExceptionHandler;
-import com.lmax.disruptor.LifecycleAware;
-import com.lmax.disruptor.TimeoutException;
-import com.lmax.disruptor.dsl.Disruptor;
-import com.lmax.disruptor.dsl.ProducerType;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -68,6 +59,15 @@ import org.apache.htrace.Span;
 import org.apache.htrace.Trace;
 import org.apache.htrace.TraceScope;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.lmax.disruptor.BlockingWaitStrategy;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.ExceptionHandler;
+import com.lmax.disruptor.LifecycleAware;
+import com.lmax.disruptor.TimeoutException;
+import com.lmax.disruptor.dsl.Disruptor;
+import com.lmax.disruptor.dsl.ProducerType;
+
 /**
  * The default implementation of FSWAL.
  */
@@ -499,6 +499,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
     private volatile long sequence;
     // Keep around last exception thrown. Clear on successful sync.
     private final BlockingQueue<SyncFuture> syncFutures;
+    private volatile SyncFuture takeSyncFuture = null;
 
     /**
      * UPDATE!
@@ -546,6 +547,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
       if (!syncFuture.done(currentSequence, t)) {
         throw new IllegalStateException();
       }
+
       // This function releases one sync future only.
       return 1;
     }
@@ -589,13 +591,21 @@ public class FSHLog extends AbstractFSWAL<Writer> {
       return sequence;
     }
 
+    boolean areSyncFuturesReleased() {
+      // check whether there is no sync futures offered, and no in-flight sync futures that is being
+      // processed.
+      return syncFutures.size() <= 0
+          && takeSyncFuture == null;
+    }
+
     public void run() {
       long currentSequence;
       while (!isInterrupted()) {
         int syncCount = 0;
-        SyncFuture takeSyncFuture;
+
         try {
           while (true) {
+            takeSyncFuture = null;
             // We have to process what we 'take' from the queue
             takeSyncFuture = this.syncFutures.take();
             currentSequence = this.sequence;
@@ -975,11 +985,23 @@ public class FSHLog extends AbstractFSWAL<Writer> {
      * @return True if outstanding sync futures still
      */
     private boolean isOutstandingSyncs() {
+      // Look at SyncFutures in the EventHandler
       for (int i = 0; i < this.syncFuturesCount; i++) {
         if (!this.syncFutures[i].isDone()) {
           return true;
         }
       }
+
+      return false;
+    }
+
+    private boolean isOutstandingSyncsFromRunners() {
+      // Look at SyncFutures in the SyncRunners
+      for (SyncRunner syncRunner: syncRunners) {
+        if(syncRunner.isAlive() && !syncRunner.areSyncFuturesReleased()) {
+          return true;
+        }
+      }
       return false;
     }
 
@@ -1095,11 +1117,13 @@ public class FSHLog extends AbstractFSWAL<Writer> {
         // Wait on outstanding syncers; wait for them to finish syncing (unless we've been
         // shutdown or unless our latch has been thrown because we have been aborted or unless
         // this WAL is broken and we can't get a sync/append to complete).
-        while (!this.shutdown && this.zigzagLatch.isCocked()
+        while ((!this.shutdown && this.zigzagLatch.isCocked()
             && highestSyncedTxid.get() < currentSequence &&
             // We could be in here and all syncs are failing or failed. Check for this. Otherwise
             // we'll just be stuck here for ever. In other words, ensure there syncs running.
-        isOutstandingSyncs()) {
+            isOutstandingSyncs())
+            // Wait for all SyncRunners to finish their work so that we can replace the writer
+            || isOutstandingSyncsFromRunners()) {
           synchronized (this.safePointWaiter) {
             this.safePointWaiter.wait(0, 1);
           }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ef8c65e5/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
index eda7df7..7412128 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
@@ -18,11 +18,10 @@
 package org.apache.hadoop.hbase.regionserver.wal;
 
 import static org.junit.Assert.assertFalse;
-
 import java.io.IOException;
 import java.util.NavigableMap;
 import java.util.TreeMap;
-
+import java.util.concurrent.ThreadLocalRandom;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -39,6 +38,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALKey;
@@ -56,7 +56,18 @@ public class TestLogRollingNoCluster {
       withLookingForStuckThread(true).build();
   private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
   private final static byte [] EMPTY_1K_ARRAY = new byte[1024];
-  private static final int THREAD_COUNT = 100; // Spin up this many threads
+  private static final int NUM_THREADS = 100; // Spin up this many threads
+  private static final int NUM_ENTRIES = 100; // How many entries to write
+
+  /** ProtobufLogWriter that simulates higher latencies in sync() call */
+  public static class HighLatencySyncWriter extends  ProtobufLogWriter {
+    @Override
+    public void sync() throws IOException {
+      Threads.sleep(ThreadLocalRandom.current().nextInt(10));
+      super.sync();
+      Threads.sleep(ThreadLocalRandom.current().nextInt(10));
+    }
+  }
 
   /**
    * Spin up a bunch of threads and have them all append to a WAL.  Roll the
@@ -65,38 +76,42 @@ public class TestLogRollingNoCluster {
    * @throws InterruptedException
    */
   @Test
-  public void testContendedLogRolling() throws IOException, InterruptedException {
-    Path dir = TEST_UTIL.getDataTestDir();
+  public void testContendedLogRolling() throws Exception {
+    TEST_UTIL.startMiniDFSCluster(3);
+    Path dir = TEST_UTIL.getDataTestDirOnTestFS();
+
     // The implementation needs to know the 'handler' count.
-    TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, THREAD_COUNT);
+    TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, NUM_THREADS);
     final Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
     conf.set(WALFactory.WAL_PROVIDER, "filesystem");
     FSUtils.setRootDir(conf, dir);
+    conf.set("hbase.regionserver.hlog.writer.impl", HighLatencySyncWriter.class.getName());
     final WALFactory wals = new WALFactory(conf, null, TestLogRollingNoCluster.class.getName());
     final WAL wal = wals.getWAL(new byte[]{}, null);
     
     Appender [] appenders = null;
 
-    final int count = THREAD_COUNT;
-    appenders = new Appender[count];
+    final int numThreads = NUM_THREADS;
+    appenders = new Appender[numThreads];
     try {
-      for (int i = 0; i < count; i++) {
+      for (int i = 0; i < numThreads; i++) {
         // Have each appending thread write 'count' entries
-        appenders[i] = new Appender(wal, i, count);
+        appenders[i] = new Appender(wal, i, NUM_ENTRIES);
       }
-      for (int i = 0; i < count; i++) {
+      for (int i = 0; i < numThreads; i++) {
         appenders[i].start();
       }
-      for (int i = 0; i < count; i++) {
+      for (int i = 0; i < numThreads; i++) {
         //ensure that all threads are joined before closing the wal
         appenders[i].join();
       }
     } finally {
       wals.close();
     }
-    for (int i = 0; i < count; i++) {
+    for (int i = 0; i < numThreads; i++) {
       assertFalse(appenders[i].isException());
     }
+    TEST_UTIL.shutdownMiniDFSCluster();
   }
 
   /**
@@ -149,6 +164,7 @@ public class TestLogRollingNoCluster {
           }
           final long txid = wal.append(hri, new WALKey(hri.getEncodedNameAsBytes(),
               TableName.META_TABLE_NAME, now, mvcc, scopes), edit, true);
+          Threads.sleep(ThreadLocalRandom.current().nextInt(5));
           wal.sync(txid);
         }
         String msg = getName() + " finished";


[3/5] hbase git commit: HBASE-16824 Writer.flush() can be called on already closed streams in WAL roll

Posted by en...@apache.org.
HBASE-16824 Writer.flush() can be called on already closed streams in WAL roll


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c5172262
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c5172262
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c5172262

Branch: refs/heads/branch-1.3
Commit: c51722629418b8b5e3a6e688219ee7d806f251c7
Parents: d38310a
Author: Enis Soztutar <en...@apache.org>
Authored: Tue Oct 18 18:46:02 2016 -0700
Committer: Enis Soztutar <en...@apache.org>
Committed: Tue Oct 18 19:16:31 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/wal/FSHLog.java   | 31 ++++++++++++--
 .../wal/TestLogRollingNoCluster.java            | 43 ++++++++++++++------
 2 files changed, 57 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/c5172262/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index 097101b..9993d62 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -1131,6 +1131,7 @@ public class FSHLog implements WAL {
     private volatile long sequence;
     // Keep around last exception thrown. Clear on successful sync.
     private final BlockingQueue<SyncFuture> syncFutures;
+    private volatile SyncFuture takeSyncFuture = null;
 
     /**
      * UPDATE!
@@ -1220,13 +1221,21 @@ public class FSHLog implements WAL {
       return sequence;
     }
 
+    boolean areSyncFuturesReleased() {
+      // check whether there is no sync futures offered, and no in-flight sync futures that is being
+      // processed.
+      return syncFutures.size() <= 0
+          && takeSyncFuture == null;
+    }
+
     public void run() {
       long currentSequence;
       while (!isInterrupted()) {
         int syncCount = 0;
-        SyncFuture takeSyncFuture;
+
         try {
           while (true) {
+            takeSyncFuture = null;
             // We have to process what we 'take' from the queue
             takeSyncFuture = this.syncFutures.take();
             currentSequence = this.sequence;
@@ -1737,9 +1746,21 @@ public class FSHLog implements WAL {
      * @return True if outstanding sync futures still
      */
     private boolean isOutstandingSyncs() {
+      // Look at SyncFutures in the EventHandler
       for (int i = 0; i < this.syncFuturesCount; i++) {
         if (!this.syncFutures[i].isDone()) return true;
       }
+
+      return false;
+    }
+
+    private boolean isOutstandingSyncsFromRunners() {
+      // Look at SyncFutures in the SyncRunners
+      for (SyncRunner syncRunner: syncRunners) {
+        if(syncRunner.isAlive() && !syncRunner.areSyncFuturesReleased()) {
+          return true;
+        }
+      }
       return false;
     }
 
@@ -1850,11 +1871,13 @@ public class FSHLog implements WAL {
         // Wait on outstanding syncers; wait for them to finish syncing (unless we've been
         // shutdown or unless our latch has been thrown because we have been aborted or unless
         // this WAL is broken and we can't get a sync/append to complete).
-        while (!this.shutdown && this.zigzagLatch.isCocked() &&
-            highestSyncedSequence.get() < currentSequence &&
+        while ((!this.shutdown && this.zigzagLatch.isCocked()
+            && highestSyncedSequence.get() < currentSequence &&
             // We could be in here and all syncs are failing or failed. Check for this. Otherwise
             // we'll just be stuck here for ever. In other words, ensure there syncs running.
-            isOutstandingSyncs()) {
+            isOutstandingSyncs())
+            // Wait for all SyncRunners to finish their work so that we can replace the writer
+            || isOutstandingSyncsFromRunners()) {
           synchronized (this.safePointWaiter) {
             this.safePointWaiter.wait(0, 1);
           }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c5172262/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
index 7ce3615..bca4a7e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
@@ -18,9 +18,9 @@
 package org.apache.hadoop.hbase.regionserver.wal;
 
 import static org.junit.Assert.assertFalse;
-
 import java.io.IOException;
-
+import java.util.TreeMap;
+import java.util.concurrent.ThreadLocalRandom;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSTableDescriptors;
 import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALKey;
@@ -49,7 +50,18 @@ import org.junit.experimental.categories.Category;
 public class TestLogRollingNoCluster {
   private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
   private final static byte [] EMPTY_1K_ARRAY = new byte[1024];
-  private static final int THREAD_COUNT = 100; // Spin up this many threads
+  private static final int NUM_THREADS = 100; // Spin up this many threads
+  private static final int NUM_ENTRIES = 100; // How many entries to write
+
+  /** ProtobufLogWriter that simulates higher latencies in sync() call */
+  public static class HighLatencySyncWriter extends  ProtobufLogWriter {
+    @Override
+    public void sync() throws IOException {
+      Threads.sleep(ThreadLocalRandom.current().nextInt(10));
+      super.sync();
+      Threads.sleep(ThreadLocalRandom.current().nextInt(10));
+    }
+  }
 
   /**
    * Spin up a bunch of threads and have them all append to a WAL.  Roll the
@@ -58,37 +70,41 @@ public class TestLogRollingNoCluster {
    * @throws InterruptedException
    */
   @Test
-  public void testContendedLogRolling() throws IOException, InterruptedException {
-    Path dir = TEST_UTIL.getDataTestDir();
+  public void testContendedLogRolling() throws Exception {
+    TEST_UTIL.startMiniDFSCluster(3);
+    Path dir = TEST_UTIL.getDataTestDirOnTestFS();
+
     // The implementation needs to know the 'handler' count.
-    TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, THREAD_COUNT);
+    TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, NUM_THREADS);
     final Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
     FSUtils.setRootDir(conf, dir);
+    conf.set("hbase.regionserver.hlog.writer.impl", HighLatencySyncWriter.class.getName());
     final WALFactory wals = new WALFactory(conf, null, TestLogRollingNoCluster.class.getName());
     final WAL wal = wals.getWAL(new byte[]{}, null);
     
     Appender [] appenders = null;
 
-    final int count = THREAD_COUNT;
-    appenders = new Appender[count];
+    final int numThreads = NUM_THREADS;
+    appenders = new Appender[numThreads];
     try {
-      for (int i = 0; i < count; i++) {
+      for (int i = 0; i < numThreads; i++) {
         // Have each appending thread write 'count' entries
-        appenders[i] = new Appender(wal, i, count);
+        appenders[i] = new Appender(wal, i, NUM_ENTRIES);
       }
-      for (int i = 0; i < count; i++) {
+      for (int i = 0; i < numThreads; i++) {
         appenders[i].start();
       }
-      for (int i = 0; i < count; i++) {
+      for (int i = 0; i < numThreads; i++) {
         //ensure that all threads are joined before closing the wal
         appenders[i].join();
       }
     } finally {
       wals.close();
     }
-    for (int i = 0; i < count; i++) {
+    for (int i = 0; i < numThreads; i++) {
       assertFalse(appenders[i].isException());
     }
+    TEST_UTIL.shutdownMiniDFSCluster();
   }
 
   /**
@@ -137,6 +153,7 @@ public class TestLogRollingNoCluster {
           final HTableDescriptor htd = fts.get(TableName.META_TABLE_NAME);
           final long txid = wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(),
               TableName.META_TABLE_NAME, now, mvcc), edit, true);
+          Threads.sleep(ThreadLocalRandom.current().nextInt(5));
           wal.sync(txid);
         }
         String msg = getName() + " finished";


[4/5] hbase git commit: HBASE-16824 Writer.flush() can be called on already closed streams in WAL roll

Posted by en...@apache.org.
HBASE-16824 Writer.flush() can be called on already closed streams in WAL roll


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/57181442
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/57181442
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/57181442

Branch: refs/heads/branch-1.2
Commit: 57181442577c36689114334b011a6e72de4ae785
Parents: bcc74e5
Author: Enis Soztutar <en...@apache.org>
Authored: Tue Oct 18 18:46:02 2016 -0700
Committer: Enis Soztutar <en...@apache.org>
Committed: Tue Oct 18 19:19:12 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/wal/FSHLog.java   | 31 ++++++++++++--
 .../wal/TestLogRollingNoCluster.java            | 43 ++++++++++++++------
 2 files changed, 57 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/57181442/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index 79ff1bc..7e3d82b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -1127,6 +1127,7 @@ public class FSHLog implements WAL {
     private volatile long sequence;
     // Keep around last exception thrown. Clear on successful sync.
     private final BlockingQueue<SyncFuture> syncFutures;
+    private volatile SyncFuture takeSyncFuture = null;
 
     /**
      * UPDATE!
@@ -1216,13 +1217,21 @@ public class FSHLog implements WAL {
       return sequence;
     }
 
+    boolean areSyncFuturesReleased() {
+      // check whether there is no sync futures offered, and no in-flight sync futures that is being
+      // processed.
+      return syncFutures.size() <= 0
+          && takeSyncFuture == null;
+    }
+
     public void run() {
       long currentSequence;
       while (!isInterrupted()) {
         int syncCount = 0;
-        SyncFuture takeSyncFuture;
+
         try {
           while (true) {
+            takeSyncFuture = null;
             // We have to process what we 'take' from the queue
             takeSyncFuture = this.syncFutures.take();
             currentSequence = this.sequence;
@@ -1733,9 +1742,21 @@ public class FSHLog implements WAL {
      * @return True if outstanding sync futures still
      */
     private boolean isOutstandingSyncs() {
+      // Look at SyncFutures in the EventHandler
       for (int i = 0; i < this.syncFuturesCount; i++) {
         if (!this.syncFutures[i].isDone()) return true;
       }
+
+      return false;
+    }
+
+    private boolean isOutstandingSyncsFromRunners() {
+      // Look at SyncFutures in the SyncRunners
+      for (SyncRunner syncRunner: syncRunners) {
+        if(syncRunner.isAlive() && !syncRunner.areSyncFuturesReleased()) {
+          return true;
+        }
+      }
       return false;
     }
 
@@ -1846,11 +1867,13 @@ public class FSHLog implements WAL {
         // Wait on outstanding syncers; wait for them to finish syncing (unless we've been
         // shutdown or unless our latch has been thrown because we have been aborted or unless
         // this WAL is broken and we can't get a sync/append to complete).
-        while (!this.shutdown && this.zigzagLatch.isCocked() &&
-            highestSyncedSequence.get() < currentSequence &&
+        while ((!this.shutdown && this.zigzagLatch.isCocked()
+            && highestSyncedSequence.get() < currentSequence &&
             // We could be in here and all syncs are failing or failed. Check for this. Otherwise
             // we'll just be stuck here for ever. In other words, ensure there syncs running.
-            isOutstandingSyncs()) {
+            isOutstandingSyncs())
+            // Wait for all SyncRunners to finish their work so that we can replace the writer
+            || isOutstandingSyncsFromRunners()) {
           synchronized (this.safePointWaiter) {
             this.safePointWaiter.wait(0, 1);
           }

http://git-wip-us.apache.org/repos/asf/hbase/blob/57181442/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
index 1c36552..034ddcd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
@@ -18,9 +18,9 @@
 package org.apache.hadoop.hbase.regionserver.wal;
 
 import static org.junit.Assert.assertFalse;
-
 import java.io.IOException;
-
+import java.util.TreeMap;
+import java.util.concurrent.ThreadLocalRandom;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSTableDescriptors;
 import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALKey;
@@ -49,7 +50,18 @@ import org.junit.experimental.categories.Category;
 public class TestLogRollingNoCluster {
   private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
   private final static byte [] EMPTY_1K_ARRAY = new byte[1024];
-  private static final int THREAD_COUNT = 100; // Spin up this many threads
+  private static final int NUM_THREADS = 100; // Spin up this many threads
+  private static final int NUM_ENTRIES = 100; // How many entries to write
+
+  /** ProtobufLogWriter that simulates higher latencies in sync() call */
+  public static class HighLatencySyncWriter extends  ProtobufLogWriter {
+    @Override
+    public void sync() throws IOException {
+      Threads.sleep(ThreadLocalRandom.current().nextInt(10));
+      super.sync();
+      Threads.sleep(ThreadLocalRandom.current().nextInt(10));
+    }
+  }
 
   /**
    * Spin up a bunch of threads and have them all append to a WAL.  Roll the
@@ -58,37 +70,41 @@ public class TestLogRollingNoCluster {
    * @throws InterruptedException
    */
   @Test
-  public void testContendedLogRolling() throws IOException, InterruptedException {
-    Path dir = TEST_UTIL.getDataTestDir();
+  public void testContendedLogRolling() throws Exception {
+    TEST_UTIL.startMiniDFSCluster(3);
+    Path dir = TEST_UTIL.getDataTestDirOnTestFS();
+
     // The implementation needs to know the 'handler' count.
-    TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, THREAD_COUNT);
+    TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, NUM_THREADS);
     final Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
     FSUtils.setRootDir(conf, dir);
+    conf.set("hbase.regionserver.hlog.writer.impl", HighLatencySyncWriter.class.getName());
     final WALFactory wals = new WALFactory(conf, null, TestLogRollingNoCluster.class.getName());
     final WAL wal = wals.getWAL(new byte[]{});
     
     Appender [] appenders = null;
 
-    final int count = THREAD_COUNT;
-    appenders = new Appender[count];
+    final int numThreads = NUM_THREADS;
+    appenders = new Appender[numThreads];
     try {
-      for (int i = 0; i < count; i++) {
+      for (int i = 0; i < numThreads; i++) {
         // Have each appending thread write 'count' entries
-        appenders[i] = new Appender(wal, i, count);
+        appenders[i] = new Appender(wal, i, NUM_ENTRIES);
       }
-      for (int i = 0; i < count; i++) {
+      for (int i = 0; i < numThreads; i++) {
         appenders[i].start();
       }
-      for (int i = 0; i < count; i++) {
+      for (int i = 0; i < numThreads; i++) {
         //ensure that all threads are joined before closing the wal
         appenders[i].join();
       }
     } finally {
       wals.close();
     }
-    for (int i = 0; i < count; i++) {
+    for (int i = 0; i < numThreads; i++) {
       assertFalse(appenders[i].isException());
     }
+    TEST_UTIL.shutdownMiniDFSCluster();
   }
 
   /**
@@ -137,6 +153,7 @@ public class TestLogRollingNoCluster {
           final HTableDescriptor htd = fts.get(TableName.META_TABLE_NAME);
           final long txid = wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(),
               TableName.META_TABLE_NAME, now, mvcc), edit, true);
+          Threads.sleep(ThreadLocalRandom.current().nextInt(5));
           wal.sync(txid);
         }
         String msg = getName() + " finished";


[5/5] hbase git commit: HBASE-16824 Writer.flush() can be called on already closed streams in WAL roll

Posted by en...@apache.org.
HBASE-16824 Writer.flush() can be called on already closed streams in WAL roll


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4e304b3f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4e304b3f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4e304b3f

Branch: refs/heads/branch-1.1
Commit: 4e304b3f919a9000e15fd66df190ab97e63bc07d
Parents: 382f88a
Author: Enis Soztutar <en...@apache.org>
Authored: Tue Oct 18 18:46:02 2016 -0700
Committer: Enis Soztutar <en...@apache.org>
Committed: Tue Oct 18 19:41:04 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/wal/FSHLog.java   | 31 ++++++++++++--
 .../wal/TestLogRollingNoCluster.java            | 44 +++++++++++++-------
 2 files changed, 57 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/4e304b3f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index 53545ed..76d09c1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -1278,6 +1278,7 @@ public class FSHLog implements WAL {
     private volatile long sequence;
     // Keep around last exception thrown. Clear on successful sync.
     private final BlockingQueue<SyncFuture> syncFutures;
+    private volatile SyncFuture takeSyncFuture = null;
 
     /**
      * UPDATE!
@@ -1367,13 +1368,21 @@ public class FSHLog implements WAL {
       return sequence;
     }
 
+    boolean areSyncFuturesReleased() {
+      // check whether there is no sync futures offered, and no in-flight sync futures that is being
+      // processed.
+      return syncFutures.size() <= 0
+          && takeSyncFuture == null;
+    }
+
     public void run() {
       long currentSequence;
       while (!isInterrupted()) {
         int syncCount = 0;
-        SyncFuture takeSyncFuture;
+
         try {
           while (true) {
+            takeSyncFuture = null;
             // We have to process what we 'take' from the queue
             takeSyncFuture = this.syncFutures.take();
             currentSequence = this.sequence;
@@ -2010,9 +2019,21 @@ public class FSHLog implements WAL {
      * @return True if outstanding sync futures still
      */
     private boolean isOutstandingSyncs() {
+      // Look at SyncFutures in the EventHandler
       for (int i = 0; i < this.syncFuturesCount; i++) {
         if (!this.syncFutures[i].isDone()) return true;
       }
+
+      return false;
+    }
+
+    private boolean isOutstandingSyncsFromRunners() {
+      // Look at SyncFutures in the SyncRunners
+      for (SyncRunner syncRunner: syncRunners) {
+        if(syncRunner.isAlive() && !syncRunner.areSyncFuturesReleased()) {
+          return true;
+        }
+      }
       return false;
     }
 
@@ -2123,11 +2144,13 @@ public class FSHLog implements WAL {
         // Wait on outstanding syncers; wait for them to finish syncing (unless we've been
         // shutdown or unless our latch has been thrown because we have been aborted or unless
         // this WAL is broken and we can't get a sync/append to complete).
-        while (!this.shutdown && this.zigzagLatch.isCocked() &&
-            highestSyncedSequence.get() < currentSequence &&
+        while ((!this.shutdown && this.zigzagLatch.isCocked()
+            && highestSyncedSequence.get() < currentSequence &&
             // We could be in here and all syncs are failing or failed. Check for this. Otherwise
             // we'll just be stuck here for ever. In other words, ensure there syncs running.
-            isOutstandingSyncs()) {
+            isOutstandingSyncs())
+            // Wait for all SyncRunners to finish their work so that we can replace the writer
+            || isOutstandingSyncsFromRunners()) {
           synchronized (this.safePointWaiter) {
             this.safePointWaiter.wait(0, 1);
           }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4e304b3f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
index 8727e23..722c218 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
@@ -18,10 +18,10 @@
 package org.apache.hadoop.hbase.regionserver.wal;
 
 import static org.junit.Assert.assertFalse;
-
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicLong;
-
+import java.util.TreeMap;
+import java.util.concurrent.ThreadLocalRandom;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSTableDescriptors;
 import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALKey;
@@ -50,7 +51,18 @@ import org.junit.experimental.categories.Category;
 public class TestLogRollingNoCluster {
   private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
   private final static byte [] EMPTY_1K_ARRAY = new byte[1024];
-  private static final int THREAD_COUNT = 100; // Spin up this many threads
+  private static final int NUM_THREADS = 100; // Spin up this many threads
+  private static final int NUM_ENTRIES = 100; // How many entries to write
+
+  /** ProtobufLogWriter that simulates higher latencies in sync() call */
+  public static class HighLatencySyncWriter extends  ProtobufLogWriter {
+    @Override
+    public void sync() throws IOException {
+      Threads.sleep(ThreadLocalRandom.current().nextInt(10));
+      super.sync();
+      Threads.sleep(ThreadLocalRandom.current().nextInt(10));
+    }
+  }
 
   /**
    * Spin up a bunch of threads and have them all append to a WAL.  Roll the
@@ -59,38 +71,41 @@ public class TestLogRollingNoCluster {
    * @throws InterruptedException
    */
   @Test
-  public void testContendedLogRolling() throws IOException, InterruptedException {
-    FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
-    Path dir = TEST_UTIL.getDataTestDir();
+  public void testContendedLogRolling() throws Exception {
+    TEST_UTIL.startMiniDFSCluster(3);
+    Path dir = TEST_UTIL.getDataTestDirOnTestFS();
+
     // The implementation needs to know the 'handler' count.
-    TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, THREAD_COUNT);
+    TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, NUM_THREADS);
     final Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
     FSUtils.setRootDir(conf, dir);
+    conf.set("hbase.regionserver.hlog.writer.impl", HighLatencySyncWriter.class.getName());
     final WALFactory wals = new WALFactory(conf, null, TestLogRollingNoCluster.class.getName());
     final WAL wal = wals.getWAL(new byte[]{});
     
     Appender [] appenders = null;
 
-    final int count = THREAD_COUNT;
-    appenders = new Appender[count];
+    final int numThreads = NUM_THREADS;
+    appenders = new Appender[numThreads];
     try {
-      for (int i = 0; i < count; i++) {
+      for (int i = 0; i < numThreads; i++) {
         // Have each appending thread write 'count' entries
-        appenders[i] = new Appender(wal, i, count);
+        appenders[i] = new Appender(wal, i, NUM_ENTRIES);
       }
-      for (int i = 0; i < count; i++) {
+      for (int i = 0; i < numThreads; i++) {
         appenders[i].start();
       }
-      for (int i = 0; i < count; i++) {
+      for (int i = 0; i < numThreads; i++) {
         //ensure that all threads are joined before closing the wal
         appenders[i].join();
       }
     } finally {
       wals.close();
     }
-    for (int i = 0; i < count; i++) {
+    for (int i = 0; i < numThreads; i++) {
       assertFalse(appenders[i].isException());
     }
+    TEST_UTIL.shutdownMiniDFSCluster();
   }
 
   /**
@@ -139,6 +154,7 @@ public class TestLogRollingNoCluster {
           final HTableDescriptor htd = fts.get(TableName.META_TABLE_NAME);
           final long txid = wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(),
               TableName.META_TABLE_NAME, now), edit, sequenceId, true, null);
+          Threads.sleep(ThreadLocalRandom.current().nextInt(5));
           wal.sync(txid);
         }
         String msg = getName() + " finished";


[2/5] hbase git commit: HBASE-16824 Writer.flush() can be called on already closed streams in WAL roll

Posted by en...@apache.org.
HBASE-16824 Writer.flush() can be called on already closed streams in WAL roll


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/019c7f93
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/019c7f93
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/019c7f93

Branch: refs/heads/branch-1
Commit: 019c7f9303a7242b7c5d6713bed414b180b5c84a
Parents: 6694191
Author: Enis Soztutar <en...@apache.org>
Authored: Tue Oct 18 18:46:02 2016 -0700
Committer: Enis Soztutar <en...@apache.org>
Committed: Tue Oct 18 19:14:20 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/wal/FSHLog.java   | 31 ++++++++++++--
 .../wal/TestLogRollingNoCluster.java            | 43 ++++++++++++++------
 2 files changed, 57 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/019c7f93/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index 11ebfef..a8b0372 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -1138,6 +1138,7 @@ public class FSHLog implements WAL {
     private volatile long sequence;
     // Keep around last exception thrown. Clear on successful sync.
     private final BlockingQueue<SyncFuture> syncFutures;
+    private volatile SyncFuture takeSyncFuture = null;
 
     /**
      * UPDATE!
@@ -1227,13 +1228,21 @@ public class FSHLog implements WAL {
       return sequence;
     }
 
+    boolean areSyncFuturesReleased() {
+      // check whether there is no sync futures offered, and no in-flight sync futures that is being
+      // processed.
+      return syncFutures.size() <= 0
+          && takeSyncFuture == null;
+    }
+
     public void run() {
       long currentSequence;
       while (!isInterrupted()) {
         int syncCount = 0;
-        SyncFuture takeSyncFuture;
+
         try {
           while (true) {
+            takeSyncFuture = null;
             // We have to process what we 'take' from the queue
             takeSyncFuture = this.syncFutures.take();
             currentSequence = this.sequence;
@@ -1744,9 +1753,21 @@ public class FSHLog implements WAL {
      * @return True if outstanding sync futures still
      */
     private boolean isOutstandingSyncs() {
+      // Look at SyncFutures in the EventHandler
       for (int i = 0; i < this.syncFuturesCount; i++) {
         if (!this.syncFutures[i].isDone()) return true;
       }
+
+      return false;
+    }
+
+    private boolean isOutstandingSyncsFromRunners() {
+      // Look at SyncFutures in the SyncRunners
+      for (SyncRunner syncRunner: syncRunners) {
+        if(syncRunner.isAlive() && !syncRunner.areSyncFuturesReleased()) {
+          return true;
+        }
+      }
       return false;
     }
 
@@ -1857,11 +1878,13 @@ public class FSHLog implements WAL {
         // Wait on outstanding syncers; wait for them to finish syncing (unless we've been
         // shutdown or unless our latch has been thrown because we have been aborted or unless
         // this WAL is broken and we can't get a sync/append to complete).
-        while (!this.shutdown && this.zigzagLatch.isCocked() &&
-            highestSyncedSequence.get() < currentSequence &&
+        while ((!this.shutdown && this.zigzagLatch.isCocked()
+            && highestSyncedSequence.get() < currentSequence &&
             // We could be in here and all syncs are failing or failed. Check for this. Otherwise
             // we'll just be stuck here for ever. In other words, ensure there syncs running.
-            isOutstandingSyncs()) {
+            isOutstandingSyncs())
+            // Wait for all SyncRunners to finish their work so that we can replace the writer
+            || isOutstandingSyncsFromRunners()) {
           synchronized (this.safePointWaiter) {
             this.safePointWaiter.wait(0, 1);
           }

http://git-wip-us.apache.org/repos/asf/hbase/blob/019c7f93/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
index 7ce3615..bca4a7e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
@@ -18,9 +18,9 @@
 package org.apache.hadoop.hbase.regionserver.wal;
 
 import static org.junit.Assert.assertFalse;
-
 import java.io.IOException;
-
+import java.util.TreeMap;
+import java.util.concurrent.ThreadLocalRandom;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSTableDescriptors;
 import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALKey;
@@ -49,7 +50,18 @@ import org.junit.experimental.categories.Category;
 public class TestLogRollingNoCluster {
   private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
   private final static byte [] EMPTY_1K_ARRAY = new byte[1024];
-  private static final int THREAD_COUNT = 100; // Spin up this many threads
+  private static final int NUM_THREADS = 100; // Spin up this many threads
+  private static final int NUM_ENTRIES = 100; // How many entries to write
+
+  /** ProtobufLogWriter that simulates higher latencies in sync() call */
+  public static class HighLatencySyncWriter extends  ProtobufLogWriter {
+    @Override
+    public void sync() throws IOException {
+      Threads.sleep(ThreadLocalRandom.current().nextInt(10));
+      super.sync();
+      Threads.sleep(ThreadLocalRandom.current().nextInt(10));
+    }
+  }
 
   /**
    * Spin up a bunch of threads and have them all append to a WAL.  Roll the
@@ -58,37 +70,41 @@ public class TestLogRollingNoCluster {
    * @throws InterruptedException
    */
   @Test
-  public void testContendedLogRolling() throws IOException, InterruptedException {
-    Path dir = TEST_UTIL.getDataTestDir();
+  public void testContendedLogRolling() throws Exception {
+    TEST_UTIL.startMiniDFSCluster(3);
+    Path dir = TEST_UTIL.getDataTestDirOnTestFS();
+
     // The implementation needs to know the 'handler' count.
-    TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, THREAD_COUNT);
+    TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, NUM_THREADS);
     final Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
     FSUtils.setRootDir(conf, dir);
+    conf.set("hbase.regionserver.hlog.writer.impl", HighLatencySyncWriter.class.getName());
     final WALFactory wals = new WALFactory(conf, null, TestLogRollingNoCluster.class.getName());
     final WAL wal = wals.getWAL(new byte[]{}, null);
     
     Appender [] appenders = null;
 
-    final int count = THREAD_COUNT;
-    appenders = new Appender[count];
+    final int numThreads = NUM_THREADS;
+    appenders = new Appender[numThreads];
     try {
-      for (int i = 0; i < count; i++) {
+      for (int i = 0; i < numThreads; i++) {
         // Have each appending thread write 'count' entries
-        appenders[i] = new Appender(wal, i, count);
+        appenders[i] = new Appender(wal, i, NUM_ENTRIES);
       }
-      for (int i = 0; i < count; i++) {
+      for (int i = 0; i < numThreads; i++) {
         appenders[i].start();
       }
-      for (int i = 0; i < count; i++) {
+      for (int i = 0; i < numThreads; i++) {
         //ensure that all threads are joined before closing the wal
         appenders[i].join();
       }
     } finally {
       wals.close();
     }
-    for (int i = 0; i < count; i++) {
+    for (int i = 0; i < numThreads; i++) {
       assertFalse(appenders[i].isException());
     }
+    TEST_UTIL.shutdownMiniDFSCluster();
   }
 
   /**
@@ -137,6 +153,7 @@ public class TestLogRollingNoCluster {
           final HTableDescriptor htd = fts.get(TableName.META_TABLE_NAME);
           final long txid = wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(),
               TableName.META_TABLE_NAME, now, mvcc), edit, true);
+          Threads.sleep(ThreadLocalRandom.current().nextInt(5));
           wal.sync(txid);
         }
         String msg = getName() + " finished";