You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2015/09/15 07:21:36 UTC

hbase git commit: HBASE-14401 Stamp failed appends with sequenceid too.... Cleans up latches

Repository: hbase
Updated Branches:
  refs/heads/master bf46fc554 -> 72b4c906b


HBASE-14401 Stamp failed appends with sequenceid too.... Cleans up latches


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/72b4c906
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/72b4c906
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/72b4c906

Branch: refs/heads/master
Commit: 72b4c906b806236cd5fcf5a69f12628d00941df9
Parents: bf46fc5
Author: stack <st...@apache.org>
Authored: Mon Sep 14 22:20:40 2015 -0700
Committer: stack <st...@apache.org>
Committed: Mon Sep 14 22:20:40 2015 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/wal/FSHLog.java   | 69 +++++++++++---------
 .../org/apache/hadoop/hbase/wal/WALKey.java     |  8 ++-
 .../regionserver/TestFailedAppendAndSync.java   | 20 +++++-
 .../hbase/regionserver/TestWALLockup.java       | 11 ++--
 4 files changed, 69 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/72b4c906/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index c551a94..5f9e3cd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -279,8 +279,6 @@ public class FSHLog implements WAL {
 
   private final int slowSyncNs;
 
-  private final static Object [] NO_ARGS = new Object []{};
-
   // If live datanode count is lower than the default replicas value,
   // RollWriter will be triggered in each sync(So the RollWriter will be
   // triggered one by one in a short time). Using it as a workaround to slow
@@ -821,8 +819,7 @@ public class FSHLog implements WAL {
       } catch (FailedSyncBeforeLogCloseException e) {
         // If unflushed/unsynced entries on close, it is reason to abort.
         if (isUnflushedEntries()) throw e;
-        // Else, let is pass through to the close.
-        LOG.warn("Failed sync but no outstanding unsync'd edits so falling through to close; " +
+        LOG.warn("Failed sync-before-close but no outstanding appends; closing WAL: " +
           e.getMessage());
       }
 
@@ -1332,8 +1329,8 @@ public class FSHLog implements WAL {
         }
       }
     } catch (Exception e) {
-      LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e +
-        " still proceeding ahead...");
+      LOG.warn("DFSOutputStream.getNumCurrentReplicas failed because of " + e +
+        ", continuing...");
     }
     return logRollNeeded;
   }
@@ -1725,7 +1722,9 @@ public class FSHLog implements WAL {
       // add appends to dfsclient as they come in.  Batching appends doesn't give any significant
       // benefit on measurement.  Handler sync calls we will batch up. If we get an exception
       // appending an edit, we fail all subsequent appends and syncs with the same exception until
-      // the WAL is reset.
+      // the WAL is reset. It is important that we not short-circuit and exit early this method.
+      // It is important that we always go through the attainSafePoint on the end. Another thread,
+      // the log roller may be waiting on a signal from us here and will just hang without it.
 
       try {
         if (truck.hasSyncFuturePayload()) {
@@ -1736,15 +1735,20 @@ public class FSHLog implements WAL {
           TraceScope scope = Trace.continueSpan(truck.unloadSpanPayload());
           try {
             FSWALEntry entry = truck.unloadFSWALEntryPayload();
-            // If already an exception, do not try to append. Throw.
-            if (this.exception != null) throw this.exception;
+            if (this.exception != null) {
+              // We got an exception on an earlier attempt at append. Do not let this append
+              // go through. Fail it but stamp the sequenceid into this append though failed.
+              // We need to do this to close the latch held down deep in WALKey...that is waiting
+              // on sequenceid assignment otherwise it will just hang out (The #append method
+              // called below does this also internally).
+              entry.stampRegionSequenceId();
+              // Return to keep processing events coming off the ringbuffer
+              return;
+            }
             append(entry);
           } catch (Exception e) {
-            // Failed append. Record the exception. Throw it from here on out til new WAL in place
-            this.exception = new DamagedWALException(e);
-            // If append fails, presume any pending syncs will fail too; let all waiting handlers
-            // know of the exception.
-            cleanupOutstandingSyncsOnException(sequence, this.exception);
+            // Failed append. Record the exception.
+            this.exception = e;
             // Return to keep processing events coming off the ringbuffer
             return;
           } finally {
@@ -1752,7 +1756,7 @@ public class FSHLog implements WAL {
             scope.close(); // append scope is complete
           }
         } else {
-          // They can't both be null.  Fail all up to this!!!
+          // What is this if not an append or sync. Fail all up to this!!!
           cleanupOutstandingSyncsOnException(sequence,
             new IllegalStateException("Neither append nor sync"));
           // Return to keep processing.
@@ -1771,23 +1775,22 @@ public class FSHLog implements WAL {
           LOG.trace("Sequence=" + sequence + ", syncCount=" + this.syncFuturesCount);
         }
 
-        // Below expects that the offer 'transfers' responsibility for the outstanding syncs to the
-        // syncRunner. We should never get an exception in here.
-        int index = Math.abs(this.syncRunnerIndex++) % this.syncRunners.length;
-        try {
-          if (this.exception != null) {
-            // Do not try to sync. If a this.exception, then we failed an append. Do not try to
-            // sync a failed append. Fall through to the attainSafePoint below. It is part of the
-            // means by which we put in place a new WAL. A new WAL is how we clean up.
-            // Don't throw because then we'll not get to attainSafePoint.
-            cleanupOutstandingSyncsOnException(sequence, this.exception);
-          } else {
+        if (this.exception == null) {
+          // Below expects that the offer 'transfers' responsibility for the outstanding syncs to
+          // the syncRunner. We should never get an exception in here.
+          int index = Math.abs(this.syncRunnerIndex++) % this.syncRunners.length;
+          try {
             this.syncRunners[index].offer(sequence, this.syncFutures, this.syncFuturesCount);
+          } catch (Exception e) {
+            // Should NEVER get here.
+            requestLogRoll();
+            this.exception = new DamagedWALException("Failed offering sync", e);
           }
-        } catch (Exception e) {
-          // Should NEVER get here.
-          cleanupOutstandingSyncsOnException(sequence, e);
-          throw e;
+        }
+        // We may have picked up an exception above trying to offer sync
+        if (this.exception != null) {
+          cleanupOutstandingSyncsOnException(sequence,
+            new DamagedWALException("On sync", this.exception));
         }
         attainSafePoint(sequence);
         this.syncFuturesCount = 0;
@@ -1883,9 +1886,11 @@ public class FSHLog implements WAL {
         // Update metrics.
         postAppend(entry, EnvironmentEdgeManager.currentTime() - start);
       } catch (Exception e) {
-        LOG.warn("Could not append. Requesting close of WAL", e);
+        String msg = "Append sequenceId=" + regionSequenceId +
+          ", requesting roll of WAL";
+        LOG.warn(msg, e);
         requestLogRoll();
-        throw e;
+        throw new DamagedWALException(msg, e);
       }
       numEntries.incrementAndGet();
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/72b4c906/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
index 9b3dede..74284e0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
@@ -315,8 +315,12 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
    */
   public long getSequenceId(final long maxWaitForSeqId) throws IOException {
     // TODO: This implementation waiting on a latch is problematic because if a higher level
-    // determines we should stop or abort, there is not global list of all these blocked WALKeys
-    // waiting on a sequence id; they can't be cancelled... interrupted. See getNextSequenceId
+    // determines we should stop or abort, there is no global list of all these blocked WALKeys
+    // waiting on a sequence id; they can't be cancelled... interrupted. See getNextSequenceId.
+    //
+    // UPDATE: I think we can remove the timeout now we are stamping all walkeys with sequenceid,
+    // even those that have failed (previously we were not... so they would just hang out...).
+    // St.Ack 20150910
     try {
       if (maxWaitForSeqId < 0) {
         this.seqNumAssignedLatch.await();

http://git-wip-us.apache.org/repos/asf/hbase/blob/72b4c906/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
index b15792e..e9ff8ec 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -33,11 +34,13 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.DroppedSnapshotException;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
+import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
@@ -78,6 +81,8 @@ public class TestFailedAppendAndSync {
   public void setup() throws IOException {
     TEST_UTIL = HBaseTestingUtility.createLocalHTU();
     CONF = TEST_UTIL.getConfiguration();
+    // Disable block cache.
+    CONF.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
     dir = TEST_UTIL.getDataTestDir("TestHRegion").toString();
     tableName = TableName.valueOf(name.getMethodName());
   }
@@ -101,6 +106,7 @@ public class TestFailedAppendAndSync {
    */
   @Test (timeout=300000)
   public void testLockupAroundBadAssignSync() throws IOException {
+    final AtomicLong rolls = new AtomicLong(0);
     // Dodgy WAL. Will throw exceptions when flags set.
     class DodgyFSLog extends FSHLog {
       volatile boolean throwSyncException = false;
@@ -112,6 +118,13 @@ public class TestFailedAppendAndSync {
       }
 
       @Override
+      public byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException {
+        byte [][] regions = super.rollWriter(force);
+        rolls.getAndIncrement();
+        return regions;
+      }
+
+      @Override
       protected Writer createWriterInstance(Path path) throws IOException {
         final Writer w = super.createWriterInstance(path);
           return new Writer() {
@@ -175,7 +188,7 @@ public class TestFailedAppendAndSync {
       } catch (IOException ioe) {
         fail();
       }
-
+      long rollsCount = rolls.get();
       try {
         dodgyWAL.throwAppendException = true;
         dodgyWAL.throwSyncException = false;
@@ -185,6 +198,9 @@ public class TestFailedAppendAndSync {
       } catch (IOException ioe) {
         threwOnAppend = true;
       }
+      while (rollsCount == rolls.get()) Threads.sleep(100);
+      rollsCount = rolls.get();
+
       // When we get to here.. we should be ok. A new WAL has been put in place. There were no
       // appends to sync. We should be able to continue.
 
@@ -197,6 +213,8 @@ public class TestFailedAppendAndSync {
       } catch (IOException ioe) {
         threwOnBoth = true;
       }
+      while (rollsCount == rolls.get()) Threads.sleep(100);
+
       // Again, all should be good. New WAL and no outstanding unsync'd edits so we should be able
       // to just continue.
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/72b4c906/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
index a620951..ccf2b15 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.TableName;
@@ -81,6 +82,8 @@ public class TestWALLockup {
   public void setup() throws IOException {
     TEST_UTIL = HBaseTestingUtility.createLocalHTU();
     CONF = TEST_UTIL.getConfiguration();
+    // Disable block cache.
+    CONF.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
     dir = TEST_UTIL.getDataTestDir("TestHRegion").toString();
     tableName = TableName.valueOf(name.getMethodName());
   }
@@ -139,10 +142,10 @@ public class TestWALLockup {
       protected void beforeWaitOnSafePoint() {
         if (throwException) {
           LOG.info("COUNTDOWN");
-          // Don't countdown latch until someone waiting on it.
-          while (this.latch.getCount() <= 0) {
-            Threads.sleep(10);
-          }
+          // Don't countdown latch until someone waiting on it otherwise, the above
+          // afterCreatingZigZagLatch will get to the latch and no one will ever free it and we'll
+          // be stuck; test won't go down
+          while (this.latch.getCount() <= 0) Threads.sleep(1);
           this.latch.countDown();
         }
       }