You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2016/10/10 19:00:45 UTC

[3/9] hbase git commit: HBASE-16781 Fix flaky TestMasterProcedureWalLease

HBASE-16781 Fix flaky TestMasterProcedureWalLease


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/29d701a3
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/29d701a3
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/29d701a3

Branch: refs/heads/hbase-12439
Commit: 29d701a314b6bf56771a217b42c4c10832b15753
Parents: c7cae6b
Author: Matteo Bertozzi <ma...@cloudera.com>
Authored: Fri Oct 7 17:32:19 2016 -0700
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Fri Oct 7 18:01:53 2016 -0700

----------------------------------------------------------------------
 .../procedure2/store/wal/WALProcedureStore.java | 41 +++++++++++++-------
 .../hadoop/hbase/master/MasterServices.java     |  5 +++
 .../master/procedure/MasterProcedureEnv.java    |  7 +++-
 .../hbase/master/MockNoopMasterServices.java    |  5 +++
 .../MasterProcedureTestingUtility.java          |  1 +
 5 files changed, 43 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/29d701a3/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
index 36cf7af..1e60402 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
@@ -122,6 +122,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
   private final AtomicBoolean inSync = new AtomicBoolean(false);
   private final AtomicLong totalSynced = new AtomicLong(0);
   private final AtomicLong lastRollTs = new AtomicLong(0);
+  private final AtomicLong syncId = new AtomicLong(0);
 
   private LinkedTransferQueue<ByteSlot> slotsCache = null;
   private Set<ProcedureWALFile> corruptedLogs = null;
@@ -226,15 +227,15 @@ public class WALProcedureStore extends ProcedureStoreBase {
   }
 
   @Override
-  public void stop(boolean abort) {
+  public void stop(final boolean abort) {
     if (!setRunning(false)) {
       return;
     }
 
-    LOG.info("Stopping the WAL Procedure Store");
+    LOG.info("Stopping the WAL Procedure Store, isAbort=" + abort +
+      (isSyncAborted() ? " (self aborting)" : ""));
     sendStopSignal();
-
-    if (!abort) {
+    if (!isSyncAborted()) {
       try {
         while (syncThread.isAlive()) {
           sendStopSignal();
@@ -525,6 +526,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
         }
       }
 
+      final long pushSyncId = syncId.get();
       updateStoreTracker(type, procId, subProcIds);
       slots[slotIndex++] = slot;
       logId = flushLogId;
@@ -540,7 +542,9 @@ public class WALProcedureStore extends ProcedureStoreBase {
         slotCond.signal();
       }
 
-      syncCond.await();
+      while (pushSyncId == syncId.get() && isRunning()) {
+        syncCond.await();
+      }
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       sendAbortProcessSignal();
@@ -642,13 +646,15 @@ public class WALProcedureStore extends ProcedureStoreBase {
           totalSyncedToStore = totalSynced.addAndGet(slotSize);
           slotIndex = 0;
           inSync.set(false);
+          syncId.incrementAndGet();
         } catch (InterruptedException e) {
           Thread.currentThread().interrupt();
-          sendAbortProcessSignal();
           syncException.compareAndSet(null, e);
+          sendAbortProcessSignal();
           throw e;
         } catch (Throwable t) {
           syncException.compareAndSet(null, t);
+          sendAbortProcessSignal();
           throw t;
         } finally {
           syncCond.signalAll();
@@ -679,13 +685,12 @@ public class WALProcedureStore extends ProcedureStoreBase {
       } catch (Throwable e) {
         LOG.warn("unable to sync slots, retry=" + retry);
         if (++retry >= maxRetriesBeforeRoll) {
-          if (logRolled >= maxSyncFailureRoll) {
+          if (logRolled >= maxSyncFailureRoll && isRunning()) {
             LOG.error("Sync slots after log roll failed, abort.", e);
-            sendAbortProcessSignal();
             throw e;
           }
 
-          if (!rollWriterOrDie()) {
+          if (!rollWriterWithRetries()) {
             throw e;
           }
 
@@ -720,8 +725,8 @@ public class WALProcedureStore extends ProcedureStoreBase {
     return totalSynced;
   }
 
-  private boolean rollWriterOrDie() {
-    for (int i = 0; i < rollRetries; ++i) {
+  private boolean rollWriterWithRetries() {
+    for (int i = 0; i < rollRetries && isRunning(); ++i) {
       if (i > 0) Threads.sleepWithoutInterrupt(waitBeforeRoll * i);
 
       try {
@@ -733,8 +738,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
       }
     }
     LOG.fatal("Unable to roll the log");
-    sendAbortProcessSignal();
-    throw new RuntimeException("unable to roll the log");
+    return false;
   }
 
   private boolean tryRollWriter() {
@@ -777,7 +781,8 @@ public class WALProcedureStore extends ProcedureStoreBase {
     }
   }
 
-  @VisibleForTesting void removeInactiveLogsForTesting() throws Exception {
+  @VisibleForTesting
+  protected void removeInactiveLogsForTesting() throws Exception {
     lock.lock();
     try {
       removeInactiveLogs();
@@ -812,6 +817,8 @@ public class WALProcedureStore extends ProcedureStoreBase {
   }
 
   private boolean rollWriter() throws IOException {
+    if (!isRunning()) return false;
+
     // Create new state-log
     if (!rollWriter(flushLogId + 1)) {
       LOG.warn("someone else has already created log " + flushLogId);
@@ -1043,6 +1050,10 @@ public class WALProcedureStore extends ProcedureStoreBase {
       for (int i = 0; i < logFiles.length; ++i) {
         final Path logPath = logFiles[i].getPath();
         leaseRecovery.recoverFileLease(fs, logPath);
+        if (!isRunning()) {
+          throw new IOException("wal aborting");
+        }
+
         maxLogId = Math.max(maxLogId, getLogIdFromName(logPath.getName()));
         ProcedureWALFile log = initOldLog(logFiles[i]);
         if (log != null) {
@@ -1061,7 +1072,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
    * it using entries in the log.
    */
   private void initTrackerFromOldLogs() {
-    if (logs.isEmpty()) return;
+    if (logs.isEmpty() || !isRunning()) return;
     ProcedureWALFile log = logs.getLast();
     if (!log.getTracker().isPartial()) {
       storeTracker.resetTo(log.getTracker());

http://git-wip-us.apache.org/repos/asf/hbase/blob/29d701a3/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
index 670642f..9bdcf76 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
@@ -374,4 +374,9 @@ public interface MasterServices extends Server {
    * @return load balancer
    */
   public LoadBalancer getLoadBalancer();
+
+  /**
+   * @return True if this master is stopping.
+   */
+  boolean isStopping();
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/29d701a3/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
index 213f80c..e90813c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
@@ -61,10 +61,15 @@ public class MasterProcedureEnv {
         @Override
         public boolean progress() {
           LOG.debug("Recover Procedure Store log lease: " + path);
-          return master.isActiveMaster();
+          return isRunning();
         }
       });
     }
+
+    private boolean isRunning() {
+      return master.isActiveMaster() && !master.isStopped() &&
+        !master.isStopping() && !master.isAborted();
+    }
   }
 
   @InterfaceAudience.Private

http://git-wip-us.apache.org/repos/asf/hbase/blob/29d701a3/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
index 56a8522..87fb169 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
@@ -180,6 +180,11 @@ public class MockNoopMasterServices implements MasterServices, Server {
   }
 
   @Override
+  public boolean isStopping() {
+    return stopped;
+  }
+
+  @Override
   public boolean isStopped() {
     return stopped;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/29d701a3/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureTestingUtility.java
index 9b69830..c3cb2da 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureTestingUtility.java
@@ -398,6 +398,7 @@ public class MasterProcedureTestingUtility {
     //   restart executor/store
     //   rollback step N - save on store
     InjectAbortOnLoadListener abortListener = new InjectAbortOnLoadListener(procExec);
+    abortListener.addProcId(procId);
     procExec.registerListener(abortListener);
     try {
       for (int i = 0; !procExec.isFinished(procId); ++i) {