You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2015/07/09 19:57:45 UTC

[2/3] hbase git commit: HBASE-13832 Procedure v2: try to roll the WAL master on sync failure before aborting

HBASE-13832 Procedure v2: try to roll the WAL master on sync failure before aborting


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/246631d6
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/246631d6
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/246631d6

Branch: refs/heads/branch-1.2
Commit: 246631d60e7b3b5ece039e67e8d3e9bb66d5f9a5
Parents: bcadcef
Author: Matteo Bertozzi <ma...@cloudera.com>
Authored: Thu Jul 9 08:34:42 2015 -0700
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Thu Jul 9 08:56:09 2015 -0700

----------------------------------------------------------------------
 .../hbase/procedure2/store/ProcedureStore.java  |   5 +
 .../procedure2/store/ProcedureStoreBase.java    |   8 +
 .../procedure2/store/wal/WALProcedureStore.java | 237 ++++++++++++++-----
 .../procedure2/ProcedureTestingUtility.java     |  32 +++
 .../store/wal/TestWALProcedureStore.java        |  31 +--
 .../master/procedure/MasterProcedureEnv.java    |   5 +
 .../TestMasterFailoverWithProcedures.java       |  69 +++++-
 .../procedure/TestWALProcedureStoreOnHDFS.java  | 198 ++++++++++++++++
 8 files changed, 487 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/246631d6/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
index 6a31eef..39a3472 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
@@ -38,6 +38,11 @@ public interface ProcedureStore {
    */
   public interface ProcedureStoreListener {
     /**
+     * triggered when the store sync is completed.
+     */
+    void postSync();
+
+    /**
      * triggered when the store is not able to write out data.
      * the main process should abort.
      */

http://git-wip-us.apache.org/repos/asf/hbase/blob/246631d6/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreBase.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreBase.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreBase.java
index e5653b6..0e0e46f 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreBase.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreBase.java
@@ -56,6 +56,14 @@ public abstract class ProcedureStoreBase implements ProcedureStore {
     return listeners.remove(listener);
   }
 
+  protected void sendPostSyncSignal() {
+    if (!this.listeners.isEmpty()) {
+      for (ProcedureStoreListener listener : this.listeners) {
+        listener.postSync();
+      }
+    }
+  }
+
   protected void sendAbortProcessSignal() {
     if (!this.listeners.isEmpty()) {
       for (ProcedureStoreListener listener : this.listeners) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/246631d6/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
index 54b53dc..b0e2258 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.FileNotFoundException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.LinkedTransferQueue;
@@ -51,6 +52,9 @@ import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
 import org.apache.hadoop.hbase.procedure2.util.ByteSlot;
 import org.apache.hadoop.hbase.procedure2.util.StringUtils;
 import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
+import org.apache.hadoop.hbase.util.Threads;
+
+import com.google.common.annotations.VisibleForTesting;
 
 /**
  * WAL implementation of the ProcedureStore.
@@ -64,7 +68,25 @@ public class WALProcedureStore extends ProcedureStoreBase {
     void recoverFileLease(FileSystem fs, Path path) throws IOException;
   }
 
-  private static final int MAX_RETRIES_BEFORE_ABORT = 3;
+  private static final String MAX_RETRIES_BEFORE_ROLL_CONF_KEY =
+    "hbase.procedure.store.wal.max.retries.before.roll";
+  private static final int DEFAULT_MAX_RETRIES_BEFORE_ROLL = 3;
+
+  private static final String WAIT_BEFORE_ROLL_CONF_KEY =
+    "hbase.procedure.store.wal.wait.before.roll";
+  private static final int DEFAULT_WAIT_BEFORE_ROLL = 500;
+
+  private static final String ROLL_RETRIES_CONF_KEY =
+    "hbase.procedure.store.wal.max.roll.retries";
+  private static final int DEFAULT_ROLL_RETRIES = 3;
+
+  private static final String MAX_SYNC_FAILURE_ROLL_CONF_KEY =
+    "hbase.procedure.store.wal.sync.failure.roll.max";
+  private static final int DEFAULT_MAX_SYNC_FAILURE_ROLL = 3;
+
+  private static final String PERIODIC_ROLL_CONF_KEY =
+    "hbase.procedure.store.wal.periodic.roll.msec";
+  private static final int DEFAULT_PERIODIC_ROLL = 60 * 60 * 1000; // 1h
 
   private static final String SYNC_WAIT_MSEC_CONF_KEY = "hbase.procedure.store.wal.sync.wait.msec";
   private static final int DEFAULT_SYNC_WAIT_MSEC = 100;
@@ -88,16 +110,22 @@ public class WALProcedureStore extends ProcedureStoreBase {
   private final Path logDir;
 
   private AtomicBoolean inSync = new AtomicBoolean(false);
+  private AtomicReference<Throwable> syncException = new AtomicReference<>();
   private LinkedTransferQueue<ByteSlot> slotsCache = null;
   private Set<ProcedureWALFile> corruptedLogs = null;
   private AtomicLong totalSynced = new AtomicLong(0);
+  private AtomicLong lastRollTs = new AtomicLong(0);
   private FSDataOutputStream stream = null;
-  private long lastRollTs = 0;
   private long flushLogId = 0;
   private int slotIndex = 0;
   private Thread syncThread;
   private ByteSlot[] slots;
 
+  private int maxRetriesBeforeRoll;
+  private int maxSyncFailureRoll;
+  private int waitBeforeRoll;
+  private int rollRetries;
+  private int periodicRollMsec;
   private long rollThreshold;
   private boolean useHsync;
   private int syncWaitMsec;
@@ -124,7 +152,13 @@ public class WALProcedureStore extends ProcedureStoreBase {
     }
 
     // Tunings
+    maxRetriesBeforeRoll =
+      conf.getInt(MAX_RETRIES_BEFORE_ROLL_CONF_KEY, DEFAULT_MAX_RETRIES_BEFORE_ROLL);
+    maxSyncFailureRoll = conf.getInt(MAX_SYNC_FAILURE_ROLL_CONF_KEY, DEFAULT_MAX_SYNC_FAILURE_ROLL);
+    waitBeforeRoll = conf.getInt(WAIT_BEFORE_ROLL_CONF_KEY, DEFAULT_WAIT_BEFORE_ROLL);
+    rollRetries = conf.getInt(ROLL_RETRIES_CONF_KEY, DEFAULT_ROLL_RETRIES);
     rollThreshold = conf.getLong(ROLL_THRESHOLD_CONF_KEY, DEFAULT_ROLL_THRESHOLD);
+    periodicRollMsec = conf.getInt(PERIODIC_ROLL_CONF_KEY, DEFAULT_PERIODIC_ROLL);
     syncWaitMsec = conf.getInt(SYNC_WAIT_MSEC_CONF_KEY, DEFAULT_SYNC_WAIT_MSEC);
     useHsync = conf.getBoolean(USE_HSYNC_CONF_KEY, DEFAULT_USE_HSYNC);
 
@@ -132,11 +166,11 @@ public class WALProcedureStore extends ProcedureStoreBase {
     syncThread = new Thread("WALProcedureStoreSyncThread") {
       @Override
       public void run() {
-        while (isRunning()) {
-          try {
-            syncLoop();
-          } catch (IOException e) {
-            LOG.error("Got an exception from the sync-loop", e);
+        try {
+          syncLoop();
+        } catch (Throwable e) {
+          LOG.error("Got an exception from the sync-loop", e);
+          if (!isSyncAborted()) {
             sendAbortProcessSignal();
           }
         }
@@ -155,6 +189,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
     if (lock.tryLock()) {
       try {
         waitCond.signalAll();
+        syncCond.signalAll();
       } finally {
         lock.unlock();
       }
@@ -310,6 +345,9 @@ public class WALProcedureStore extends ProcedureStoreBase {
     // Update the store tracker
     synchronized (storeTracker) {
       storeTracker.insert(proc, subprocs);
+      if (logId == flushLogId) {
+        checkAndTryRoll();
+      }
     }
   }
 
@@ -342,6 +380,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
       storeTracker.update(proc);
       if (logId == flushLogId) {
         removeOldLogs = storeTracker.isUpdated();
+        checkAndTryRoll();
       }
     }
 
@@ -377,8 +416,10 @@ public class WALProcedureStore extends ProcedureStoreBase {
     synchronized (storeTracker) {
       storeTracker.delete(procId);
       if (logId == flushLogId) {
-        if (storeTracker.isEmpty() && totalSynced.get() > rollThreshold) {
-          removeOldLogs = rollWriterOrDie();
+        if (storeTracker.isEmpty() || storeTracker.isUpdated()) {
+          removeOldLogs = checkAndTryRoll();
+        } else {
+          checkAndTryRoll();
         }
       }
     }
@@ -399,14 +440,23 @@ public class WALProcedureStore extends ProcedureStoreBase {
   }
 
   private long pushData(final ByteSlot slot) {
-    assert isRunning() && !logs.isEmpty() : "recoverLease() must be called before inserting data";
-    long logId = -1;
+    if (!isRunning()) {
+      throw new RuntimeException("the store must be running before inserting data");
+    }
+    if (logs.isEmpty()) {
+      throw new RuntimeException("recoverLease() must be called before inserting data");
+    }
 
+    long logId = -1;
     lock.lock();
     try {
       // Wait for the sync to be completed
       while (true) {
-        if (inSync.get()) {
+        if (!isRunning()) {
+          throw new RuntimeException("store no longer running");
+        } else if (isSyncAborted()) {
+          throw new RuntimeException("sync aborted", syncException.get());
+        } else if (inSync.get()) {
           syncCond.await();
         } else if (slotIndex == slots.length) {
           slotCond.signal();
@@ -434,72 +484,101 @@ public class WALProcedureStore extends ProcedureStoreBase {
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       sendAbortProcessSignal();
+      throw new RuntimeException(e);
     } finally {
       lock.unlock();
+      if (isSyncAborted()) {
+        throw new RuntimeException("sync aborted", syncException.get());
+      }
     }
     return logId;
   }
 
-  private void syncLoop() throws IOException {
+  private boolean isSyncAborted() {
+    return syncException.get() != null;
+  }
+
+  private void syncLoop() throws Throwable {
     inSync.set(false);
-    while (isRunning()) {
-      lock.lock();
-      try {
-        // Wait until new data is available
-        if (slotIndex == 0) {
-          if (LOG.isTraceEnabled()) {
-            float rollTsSec = (System.currentTimeMillis() - lastRollTs) / 1000.0f;
-            LOG.trace(String.format("Waiting for data. flushed=%s (%s/sec)",
-                      StringUtils.humanSize(totalSynced.get()),
-                      StringUtils.humanSize(totalSynced.get() / rollTsSec)));
-          }
-          waitCond.await();
+    lock.lock();
+    try {
+      while (isRunning()) {
+        try {
+          // Wait until new data is available
           if (slotIndex == 0) {
-            // no data.. probably a stop()
-            continue;
+            if (LOG.isTraceEnabled()) {
+              float rollTsSec = getMillisFromLastRoll() / 1000.0f;
+              LOG.trace(String.format("Waiting for data. flushed=%s (%s/sec)",
+                        StringUtils.humanSize(totalSynced.get()),
+                        StringUtils.humanSize(totalSynced.get() / rollTsSec)));
+            }
+
+            waitCond.await(getMillisToNextPeriodicRoll(), TimeUnit.MILLISECONDS);
+            if (slotIndex == 0) {
+              // no data.. probably a stop()
+              checkAndTryRoll();
+              continue;
+            }
           }
-        }
 
-        // Wait SYNC_WAIT_MSEC or the signal of "slots full" before flushing
-        long syncWaitSt = System.currentTimeMillis();
-        if (slotIndex != slots.length) {
-          slotCond.await(syncWaitMsec, TimeUnit.MILLISECONDS);
-        }
-        long syncWaitMs = System.currentTimeMillis() - syncWaitSt;
-        if (LOG.isTraceEnabled() && (syncWaitMs > 10 || slotIndex < slots.length)) {
-          float rollSec = (System.currentTimeMillis() - lastRollTs) / 1000.0f;
-          LOG.trace("Sync wait " + StringUtils.humanTimeDiff(syncWaitMs) +
-                    ", slotIndex=" + slotIndex +
-                    ", totalSynced=" + StringUtils.humanSize(totalSynced.get()) +
-                    " " + StringUtils.humanSize(totalSynced.get() / rollSec) + "/sec");
-        }
+          // Wait SYNC_WAIT_MSEC or the signal of "slots full" before flushing
+          long syncWaitSt = System.currentTimeMillis();
+          if (slotIndex != slots.length) {
+            slotCond.await(syncWaitMsec, TimeUnit.MILLISECONDS);
+          }
+          long syncWaitMs = System.currentTimeMillis() - syncWaitSt;
+          if (LOG.isTraceEnabled() && (syncWaitMs > 10 || slotIndex < slots.length)) {
+            float rollSec = getMillisFromLastRoll() / 1000.0f;
+            LOG.trace(String.format("Sync wait %s, slotIndex=%s , totalSynced=%s/sec",
+                      StringUtils.humanTimeDiff(syncWaitMs), slotIndex,
+                      StringUtils.humanSize(totalSynced.get()),
+                      StringUtils.humanSize(totalSynced.get() / rollSec)));
+          }
 
 
-        inSync.set(true);
-        totalSynced.addAndGet(syncSlots());
-        slotIndex = 0;
-        inSync.set(false);
-        syncCond.signalAll();
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        sendAbortProcessSignal();
-      } finally {
-        lock.unlock();
+          inSync.set(true);
+          totalSynced.addAndGet(syncSlots());
+          slotIndex = 0;
+          inSync.set(false);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          sendAbortProcessSignal();
+          syncException.compareAndSet(null, e);
+          throw e;
+        } catch (Throwable t) {
+          syncException.compareAndSet(null, t);
+          throw t;
+        } finally {
+          syncCond.signalAll();
+        }
       }
+    } finally {
+      lock.unlock();
     }
   }
 
-  private long syncSlots() {
+  private long syncSlots() throws Throwable {
     int retry = 0;
+    int logRolled = 0;
     long totalSynced = 0;
     do {
       try {
         totalSynced = syncSlots(stream, slots, 0, slotIndex);
         break;
       } catch (Throwable e) {
-        if (++retry == MAX_RETRIES_BEFORE_ABORT) {
-          LOG.error("Sync slot failed, abort.", e);
-          sendAbortProcessSignal();
+        if (++retry >= maxRetriesBeforeRoll) {
+          if (logRolled >= maxSyncFailureRoll) {
+            LOG.error("Sync slots after log roll failed, abort.", e);
+            sendAbortProcessSignal();
+            throw e;
+          }
+
+          if (!rollWriterOrDie()) {
+            throw e;
+          }
+
+          logRolled++;
+          retry = 0;
         }
       }
     } while (isRunning());
@@ -520,6 +599,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
     } else {
       stream.hflush();
     }
+    sendPostSyncSignal();
 
     if (LOG.isTraceEnabled()) {
       LOG.trace("Sync slots=" + count + '/' + slots.length +
@@ -528,14 +608,45 @@ public class WALProcedureStore extends ProcedureStoreBase {
     return totalSynced;
   }
 
-  private boolean rollWriterOrDie() {
-    try {
-      return rollWriter();
-    } catch (IOException e) {
-      LOG.warn("Unable to roll the log", e);
-      sendAbortProcessSignal();
-      return false;
+  @VisibleForTesting
+  public boolean rollWriterOrDie() {
+    for (int i = 1; i <= rollRetries; ++i) {
+      try {
+        if (rollWriter()) {
+          return true;
+        }
+      } catch (IOException e) {
+        LOG.warn("Unable to roll the log, attempt=" + i, e);
+        Threads.sleepWithoutInterrupt(waitBeforeRoll);
+      }
+    }
+    LOG.fatal("Unable to roll the log");
+    sendAbortProcessSignal();
+    throw new RuntimeException("unable to roll the log");
+  }
+
+  protected boolean checkAndTryRoll() {
+    if (!isRunning()) return false;
+
+    if (totalSynced.get() > rollThreshold || getMillisToNextPeriodicRoll() <= 0) {
+      try {
+        return rollWriter();
+      } catch (IOException e) {
+        LOG.warn("Unable to roll the log", e);
+      }
+    }
+    return false;
+  }
+
+  private long getMillisToNextPeriodicRoll() {
+    if (lastRollTs.get() > 0 && periodicRollMsec > 0) {
+      return periodicRollMsec - getMillisFromLastRoll();
     }
+    return Long.MAX_VALUE;
+  }
+
+  private long getMillisFromLastRoll() {
+    return (System.currentTimeMillis() - lastRollTs.get());
   }
 
   protected boolean rollWriter() throws IOException {
@@ -573,7 +684,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
       stream = newStream;
       flushLogId = logId;
       totalSynced.set(0);
-      lastRollTs = System.currentTimeMillis();
+      lastRollTs.set(System.currentTimeMillis());
       logs.add(new ProcedureWALFile(fs, newLogFile, header, startPos));
     } finally {
       lock.unlock();

http://git-wip-us.apache.org/repos/asf/hbase/blob/246631d6/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
index 89a6434..1534a5f 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
@@ -19,6 +19,8 @@
 package org.apache.hadoop.hbase.procedure2;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -182,4 +184,34 @@ public class ProcedureTestingUtility {
     assertTrue("expected abort exception, got "+ cause,
         cause instanceof ProcedureAbortedException);
   }
+
+  public static class TestProcedure extends Procedure<Void> {
+    public TestProcedure() {}
+
+    public TestProcedure(long procId, long parentId) {
+      setProcId(procId);
+      if (parentId > 0) {
+        setParentProcId(parentId);
+      }
+    }
+
+    public void addStackId(final int index) {
+      addStackIndex(index);
+    }
+
+    @Override
+    protected Procedure[] execute(Void env) { return null; }
+
+    @Override
+    protected void rollback(Void env) { }
+
+    @Override
+    protected boolean abort(Void env) { return false; }
+
+    @Override
+    protected void serializeStateData(final OutputStream stream) throws IOException { }
+
+    @Override
+    protected void deserializeStateData(final InputStream stream) throws IOException { }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/246631d6/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
index 19a9ea4..c2e6a77 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
 import org.apache.hadoop.hbase.procedure2.Procedure;
 import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
 import org.apache.hadoop.hbase.procedure2.SequentialProcedure;
 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
@@ -354,36 +355,6 @@ public class TestWALProcedureStore {
     });
   }
 
-  public static class TestProcedure extends Procedure<Void> {
-    public TestProcedure() {}
-
-    public TestProcedure(long procId, long parentId) {
-      setProcId(procId);
-      if (parentId > 0) {
-        setParentProcId(parentId);
-      }
-    }
-
-    public void addStackId(final int index) {
-      addStackIndex(index);
-    }
-
-    @Override
-    protected Procedure[] execute(Void env) { return null; }
-
-    @Override
-    protected void rollback(Void env) { }
-
-    @Override
-    protected boolean abort(Void env) { return false; }
-
-    @Override
-    protected void serializeStateData(final OutputStream stream) throws IOException { }
-
-    @Override
-    protected void deserializeStateData(final InputStream stream) throws IOException { }
-  }
-
   private void corruptLog(final FileStatus logFile, final long dropBytes)
       throws IOException {
     assertTrue(logFile.getLen() > dropBytes);

http://git-wip-us.apache.org/repos/asf/hbase/blob/246631d6/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
index f2f4bf3..6700b63 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
@@ -75,6 +75,11 @@ public class MasterProcedureEnv {
     }
 
     @Override
+    public void postSync() {
+      // no-op
+    }
+
+    @Override
     public void abortProcess() {
       master.abort("The Procedure Store lost the lease");
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/246631d6/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterFailoverWithProcedures.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterFailoverWithProcedures.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterFailoverWithProcedures.java
index ddb0ab1..429c83b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterFailoverWithProcedures.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterFailoverWithProcedures.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hbase.master.procedure;
 
+import java.io.IOException;
 import java.util.concurrent.CountDownLatch;
 
 import org.apache.commons.logging.Log;
@@ -31,10 +32,12 @@ import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.procedure2.Procedure;
 import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
 import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
 import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
+import org.apache.hadoop.hbase.procedure2.store.wal.TestWALProcedureStore.TestSequentialProcedure;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.CreateTableState;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.DeleteTableState;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.DisableTableState;
@@ -44,7 +47,6 @@ import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.ModifyRegionUtils;
-
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -52,7 +54,6 @@ import org.junit.experimental.categories.Category;
 import org.mockito.Mockito;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -63,6 +64,11 @@ public class TestMasterFailoverWithProcedures {
   protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
 
   private static void setupConf(Configuration conf) {
+    // don't waste time retrying with the roll, the test is already slow enough.
+    conf.setInt("hbase.procedure.store.wal.max.retries.before.roll", 1);
+    conf.setInt("hbase.procedure.store.wal.wait.before.roll", 0);
+    conf.setInt("hbase.procedure.store.wal.max.roll.retries", 1);
+    conf.setInt("hbase.procedure.store.wal.sync.failure.roll.max", 1);
   }
 
   @Before
@@ -94,6 +100,9 @@ public class TestMasterFailoverWithProcedures {
     final CountDownLatch masterStoreAbort = new CountDownLatch(1);
     masterStore.registerListener(new ProcedureStore.ProcedureStoreListener() {
       @Override
+      public void postSync() {}
+
+      @Override
       public void abortProcess() {
         LOG.debug("Abort store of Master");
         masterStoreAbort.countDown();
@@ -113,6 +122,9 @@ public class TestMasterFailoverWithProcedures {
     final CountDownLatch backupStore3Abort = new CountDownLatch(1);
     backupStore3.registerListener(new ProcedureStore.ProcedureStoreListener() {
       @Override
+      public void postSync() {}
+
+      @Override
       public void abortProcess() {
         LOG.debug("Abort store of backupMaster3");
         backupStore3Abort.countDown();
@@ -126,8 +138,13 @@ public class TestMasterFailoverWithProcedures {
     HTableDescriptor htd = MasterProcedureTestingUtility.createHTD(TableName.valueOf("mtb"), "f");
     HRegionInfo[] regions = ModifyRegionUtils.createHRegionInfos(htd, null);
     LOG.debug("submit proc");
-    getMasterProcedureExecutor().submitProcedure(
-      new CreateTableProcedure(getMasterProcedureExecutor().getEnvironment(), htd, regions));
+    try {
+      getMasterProcedureExecutor().submitProcedure(
+        new CreateTableProcedure(getMasterProcedureExecutor().getEnvironment(), htd, regions));
+      fail("expected RuntimeException 'sync aborted'");
+    } catch (RuntimeException e) {
+      LOG.info("got " + e.getMessage());
+    }
     LOG.debug("wait master store abort");
     masterStoreAbort.await();
 
@@ -139,10 +156,52 @@ public class TestMasterFailoverWithProcedures {
     // wait the store in here to abort (the test will fail due to timeout if it doesn't)
     LOG.debug("wait the store to abort");
     backupStore3.getStoreTracker().setDeleted(1, false);
-    backupStore3.delete(1);
+    try {
+      backupStore3.delete(1);
+      fail("expected RuntimeException 'sync aborted'");
+    } catch (RuntimeException e) {
+      LOG.info("got " + e.getMessage());
+    }
     backupStore3Abort.await();
   }
 
+  @Test(timeout=60000)
+  public void testWALfencingWithWALRolling() throws IOException {
+    final ProcedureStore procStore = getMasterProcedureExecutor().getStore();
+    assertTrue("expected WALStore for this test", procStore instanceof WALProcedureStore);
+
+    HMaster firstMaster = UTIL.getHBaseCluster().getMaster();
+
+    HMaster backupMaster3 = Mockito.mock(HMaster.class);
+    Mockito.doReturn(firstMaster.getConfiguration()).when(backupMaster3).getConfiguration();
+    Mockito.doReturn(true).when(backupMaster3).isActiveMaster();
+    final WALProcedureStore procStore2 = new WALProcedureStore(firstMaster.getConfiguration(),
+        firstMaster.getMasterFileSystem().getFileSystem(),
+        ((WALProcedureStore)procStore).getLogDir(),
+        new MasterProcedureEnv.WALStoreLeaseRecovery(backupMaster3));
+
+    // start a second store which should fence the first one out
+    LOG.info("Starting new WALProcedureStore");
+    procStore2.start(1);
+    procStore2.recoverLease();
+
+    LOG.info("Inserting into second WALProcedureStore");
+    // insert something to the second store then delete it, causing a WAL roll
+    Procedure proc2 = new TestSequentialProcedure();
+    procStore2.insert(proc2, null);
+    procStore2.rollWriterOrDie();
+
+    LOG.info("Inserting into first WALProcedureStore");
+    // insert something to the first store
+    proc2 = new TestSequentialProcedure();
+    try {
+      procStore.insert(proc2, null);
+      fail("expected RuntimeException 'sync aborted'");
+    } catch (RuntimeException e) {
+      LOG.info("got " + e.getMessage());
+    }
+  }
+
   // ==========================================================================
   //  Test Create Table
   // ==========================================================================

http://git-wip-us.apache.org/repos/asf/hbase/blob/246631d6/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestWALProcedureStoreOnHDFS.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestWALProcedureStoreOnHDFS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestWALProcedureStoreOnHDFS.java
new file mode 100644
index 0000000..89ad5d6
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestWALProcedureStoreOnHDFS.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.procedure;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
+import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.CreateTableState;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.DeleteTableState;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.DisableTableState;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.EnableTableState;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.TruncateTableState;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.ModifyRegionUtils;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@Category(LargeTests.class)
+public class TestWALProcedureStoreOnHDFS {
+  private static final Log LOG = LogFactory.getLog(TestWALProcedureStoreOnHDFS.class);
+
+  protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  private WALProcedureStore store;
+
+  private static void setupConf(Configuration conf) {
+    conf.setInt("dfs.replication", 3);
+    conf.setInt("dfs.namenode.replication.min", 3);
+
+    // increase the value for slow test-env
+    conf.setInt("hbase.procedure.store.wal.wait.before.roll", 1000);
+    conf.setInt("hbase.procedure.store.wal.max.roll.retries", 5);
+    conf.setInt("hbase.procedure.store.wal.sync.failure.roll.max", 5);
+  }
+
+  @Before
+  public void setup() throws Exception {
+    setupConf(UTIL.getConfiguration());
+    MiniDFSCluster dfs = UTIL.startMiniDFSCluster(3);
+
+    Path logDir = new Path(new Path(dfs.getFileSystem().getUri()), "/test-logs");
+    store = ProcedureTestingUtility.createWalStore(
+      UTIL.getConfiguration(), dfs.getFileSystem(), logDir);
+    store.registerListener(new ProcedureStore.ProcedureStoreListener() {
+      @Override
+      public void postSync() {}
+
+      @Override
+      public void abortProcess() {
+        LOG.fatal("Abort the Procedure Store");
+        store.stop(true);
+      }
+    });
+    store.start(8);
+    store.recoverLease();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    store.stop(false);
+    UTIL.getDFSCluster().getFileSystem().delete(store.getLogDir(), true);
+
+    try {
+      UTIL.shutdownMiniCluster();
+    } catch (Exception e) {
+      LOG.warn("failure shutting down cluster", e);
+    }
+  }
+
+  @Test(timeout=60000, expected=RuntimeException.class)
+  public void testWalAbortOnLowReplication() throws Exception {
+    assertEquals(3, UTIL.getDFSCluster().getDataNodes().size());
+
+    LOG.info("Stop DataNode");
+    UTIL.getDFSCluster().stopDataNode(0);
+    assertEquals(2, UTIL.getDFSCluster().getDataNodes().size());
+
+    store.insert(new TestProcedure(1, -1), null);
+    for (long i = 2; store.isRunning(); ++i) {
+      assertEquals(2, UTIL.getDFSCluster().getDataNodes().size());
+      store.insert(new TestProcedure(i, -1), null);
+      Thread.sleep(100);
+    }
+    assertFalse(store.isRunning());
+    fail("The store.insert() should throw an exeption");
+  }
+
+  @Test(timeout=60000)
+  public void testWalAbortOnLowReplicationWithQueuedWriters() throws Exception {
+    assertEquals(3, UTIL.getDFSCluster().getDataNodes().size());
+
+    store.registerListener(new ProcedureStore.ProcedureStoreListener() {
+      @Override
+      public void postSync() {
+        Threads.sleepWithoutInterrupt(2000);
+      }
+
+      @Override
+      public void abortProcess() {}
+    });
+
+    final AtomicInteger reCount = new AtomicInteger(0);
+    Thread[] thread = new Thread[store.getNumThreads() * 2 + 1];
+    for (int i = 0; i < thread.length; ++i) {
+      final long procId = i + 1;
+      thread[i] = new Thread() {
+        public void run() {
+          try {
+            LOG.debug("[S] INSERT " + procId);
+            store.insert(new TestProcedure(procId, -1), null);
+            LOG.debug("[E] INSERT " + procId);
+          } catch (RuntimeException e) {
+            reCount.incrementAndGet();
+            LOG.debug("[F] INSERT " + procId + ": " + e.getMessage());
+          }
+        }
+      };
+      thread[i].start();
+    }
+
+    Thread.sleep(1000);
+    LOG.info("Stop DataNode");
+    UTIL.getDFSCluster().stopDataNode(0);
+    assertEquals(2, UTIL.getDFSCluster().getDataNodes().size());
+
+    for (int i = 0; i < thread.length; ++i) {
+      thread[i].join();
+    }
+
+    assertFalse(store.isRunning());
+    assertTrue(reCount.toString(), reCount.get() >= store.getNumThreads() &&
+                                   reCount.get() < thread.length);
+  }
+
+  @Test(timeout=60000)
+  public void testWalRollOnLowReplication() throws Exception {
+    int dnCount = 0;
+    store.insert(new TestProcedure(1, -1), null);
+    UTIL.getDFSCluster().restartDataNode(dnCount);
+    for (long i = 2; i < 100; ++i) {
+      store.insert(new TestProcedure(i, -1), null);
+      Thread.sleep(100);
+      if ((i % 30) == 0) {
+        LOG.info("Restart Data Node");
+        UTIL.getDFSCluster().restartDataNode(++dnCount % 3);
+      }
+    }
+    assertTrue(store.isRunning());
+  }
+}