You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2015/09/15 07:21:36 UTC
hbase git commit: HBASE-14401 Stamp failed appends with sequenceid
too.... Cleans up latches
Repository: hbase
Updated Branches:
refs/heads/master bf46fc554 -> 72b4c906b
HBASE-14401 Stamp failed appends with sequenceid too.... Cleans up latches
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/72b4c906
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/72b4c906
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/72b4c906
Branch: refs/heads/master
Commit: 72b4c906b806236cd5fcf5a69f12628d00941df9
Parents: bf46fc5
Author: stack <st...@apache.org>
Authored: Mon Sep 14 22:20:40 2015 -0700
Committer: stack <st...@apache.org>
Committed: Mon Sep 14 22:20:40 2015 -0700
----------------------------------------------------------------------
.../hadoop/hbase/regionserver/wal/FSHLog.java | 69 +++++++++++---------
.../org/apache/hadoop/hbase/wal/WALKey.java | 8 ++-
.../regionserver/TestFailedAppendAndSync.java | 20 +++++-
.../hbase/regionserver/TestWALLockup.java | 11 ++--
4 files changed, 69 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/72b4c906/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index c551a94..5f9e3cd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -279,8 +279,6 @@ public class FSHLog implements WAL {
private final int slowSyncNs;
- private final static Object [] NO_ARGS = new Object []{};
-
// If live datanode count is lower than the default replicas value,
// RollWriter will be triggered in each sync(So the RollWriter will be
// triggered one by one in a short time). Using it as a workaround to slow
@@ -821,8 +819,7 @@ public class FSHLog implements WAL {
} catch (FailedSyncBeforeLogCloseException e) {
// If unflushed/unsynced entries on close, it is reason to abort.
if (isUnflushedEntries()) throw e;
- // Else, let is pass through to the close.
- LOG.warn("Failed sync but no outstanding unsync'd edits so falling through to close; " +
+ LOG.warn("Failed sync-before-close but no outstanding appends; closing WAL: " +
e.getMessage());
}
@@ -1332,8 +1329,8 @@ public class FSHLog implements WAL {
}
}
} catch (Exception e) {
- LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e +
- " still proceeding ahead...");
+ LOG.warn("DFSOutputStream.getNumCurrentReplicas failed because of " + e +
+ ", continuing...");
}
return logRollNeeded;
}
@@ -1725,7 +1722,9 @@ public class FSHLog implements WAL {
// add appends to dfsclient as they come in. Batching appends doesn't give any significant
// benefit on measurement. Handler sync calls we will batch up. If we get an exception
// appending an edit, we fail all subsequent appends and syncs with the same exception until
- // the WAL is reset.
+ // the WAL is reset. It is important that we not short-circuit and exit early this method.
+ // It is important that we always go through the attainSafePoint on the end. Another thread,
+ // the log roller may be waiting on a signal from us here and will just hang without it.
try {
if (truck.hasSyncFuturePayload()) {
@@ -1736,15 +1735,20 @@ public class FSHLog implements WAL {
TraceScope scope = Trace.continueSpan(truck.unloadSpanPayload());
try {
FSWALEntry entry = truck.unloadFSWALEntryPayload();
- // If already an exception, do not try to append. Throw.
- if (this.exception != null) throw this.exception;
+ if (this.exception != null) {
+ // We got an exception on an earlier attempt at append. Do not let this append
+ // go through. Fail it but stamp the sequenceid into this append though failed.
+ // We need to do this to close the latch held down deep in WALKey...that is waiting
+ // on sequenceid assignment otherwise it will just hang out (The #append method
+ // called below does this also internally).
+ entry.stampRegionSequenceId();
+ // Return to keep processing events coming off the ringbuffer
+ return;
+ }
append(entry);
} catch (Exception e) {
- // Failed append. Record the exception. Throw it from here on out til new WAL in place
- this.exception = new DamagedWALException(e);
- // If append fails, presume any pending syncs will fail too; let all waiting handlers
- // know of the exception.
- cleanupOutstandingSyncsOnException(sequence, this.exception);
+ // Failed append. Record the exception.
+ this.exception = e;
// Return to keep processing events coming off the ringbuffer
return;
} finally {
@@ -1752,7 +1756,7 @@ public class FSHLog implements WAL {
scope.close(); // append scope is complete
}
} else {
- // They can't both be null. Fail all up to this!!!
+ // What is this if not an append or sync. Fail all up to this!!!
cleanupOutstandingSyncsOnException(sequence,
new IllegalStateException("Neither append nor sync"));
// Return to keep processing.
@@ -1771,23 +1775,22 @@ public class FSHLog implements WAL {
LOG.trace("Sequence=" + sequence + ", syncCount=" + this.syncFuturesCount);
}
- // Below expects that the offer 'transfers' responsibility for the outstanding syncs to the
- // syncRunner. We should never get an exception in here.
- int index = Math.abs(this.syncRunnerIndex++) % this.syncRunners.length;
- try {
- if (this.exception != null) {
- // Do not try to sync. If a this.exception, then we failed an append. Do not try to
- // sync a failed append. Fall through to the attainSafePoint below. It is part of the
- // means by which we put in place a new WAL. A new WAL is how we clean up.
- // Don't throw because then we'll not get to attainSafePoint.
- cleanupOutstandingSyncsOnException(sequence, this.exception);
- } else {
+ if (this.exception == null) {
+ // Below expects that the offer 'transfers' responsibility for the outstanding syncs to
+ // the syncRunner. We should never get an exception in here.
+ int index = Math.abs(this.syncRunnerIndex++) % this.syncRunners.length;
+ try {
this.syncRunners[index].offer(sequence, this.syncFutures, this.syncFuturesCount);
+ } catch (Exception e) {
+ // Should NEVER get here.
+ requestLogRoll();
+ this.exception = new DamagedWALException("Failed offering sync", e);
}
- } catch (Exception e) {
- // Should NEVER get here.
- cleanupOutstandingSyncsOnException(sequence, e);
- throw e;
+ }
+ // We may have picked up an exception above trying to offer sync
+ if (this.exception != null) {
+ cleanupOutstandingSyncsOnException(sequence,
+ new DamagedWALException("On sync", this.exception));
}
attainSafePoint(sequence);
this.syncFuturesCount = 0;
@@ -1883,9 +1886,11 @@ public class FSHLog implements WAL {
// Update metrics.
postAppend(entry, EnvironmentEdgeManager.currentTime() - start);
} catch (Exception e) {
- LOG.warn("Could not append. Requesting close of WAL", e);
+ String msg = "Append sequenceId=" + regionSequenceId +
+ ", requesting roll of WAL";
+ LOG.warn(msg, e);
requestLogRoll();
- throw e;
+ throw new DamagedWALException(msg, e);
}
numEntries.incrementAndGet();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/72b4c906/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
index 9b3dede..74284e0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
@@ -315,8 +315,12 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
*/
public long getSequenceId(final long maxWaitForSeqId) throws IOException {
// TODO: This implementation waiting on a latch is problematic because if a higher level
- // determines we should stop or abort, there is not global list of all these blocked WALKeys
- // waiting on a sequence id; they can't be cancelled... interrupted. See getNextSequenceId
+ // determines we should stop or abort, there is no global list of all these blocked WALKeys
+ // waiting on a sequence id; they can't be cancelled... interrupted. See getNextSequenceId.
+ //
+ // UPDATE: I think we can remove the timeout now we are stamping all walkeys with sequenceid,
+ // even those that have failed (previously we were not... so they would just hang out...).
+ // St.Ack 20150910
try {
if (maxWaitForSeqId < 0) {
this.seqNumAssignedLatch.await();
http://git-wip-us.apache.org/repos/asf/hbase/blob/72b4c906/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
index b15792e..e9ff8ec 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -33,11 +34,13 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
+import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
@@ -78,6 +81,8 @@ public class TestFailedAppendAndSync {
public void setup() throws IOException {
TEST_UTIL = HBaseTestingUtility.createLocalHTU();
CONF = TEST_UTIL.getConfiguration();
+ // Disable block cache.
+ CONF.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
dir = TEST_UTIL.getDataTestDir("TestHRegion").toString();
tableName = TableName.valueOf(name.getMethodName());
}
@@ -101,6 +106,7 @@ public class TestFailedAppendAndSync {
*/
@Test (timeout=300000)
public void testLockupAroundBadAssignSync() throws IOException {
+ final AtomicLong rolls = new AtomicLong(0);
// Dodgy WAL. Will throw exceptions when flags set.
class DodgyFSLog extends FSHLog {
volatile boolean throwSyncException = false;
@@ -112,6 +118,13 @@ public class TestFailedAppendAndSync {
}
@Override
+ public byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException {
+ byte [][] regions = super.rollWriter(force);
+ rolls.getAndIncrement();
+ return regions;
+ }
+
+ @Override
protected Writer createWriterInstance(Path path) throws IOException {
final Writer w = super.createWriterInstance(path);
return new Writer() {
@@ -175,7 +188,7 @@ public class TestFailedAppendAndSync {
} catch (IOException ioe) {
fail();
}
-
+ long rollsCount = rolls.get();
try {
dodgyWAL.throwAppendException = true;
dodgyWAL.throwSyncException = false;
@@ -185,6 +198,9 @@ public class TestFailedAppendAndSync {
} catch (IOException ioe) {
threwOnAppend = true;
}
+ while (rollsCount == rolls.get()) Threads.sleep(100);
+ rollsCount = rolls.get();
+
// When we get to here.. we should be ok. A new WAL has been put in place. There were no
// appends to sync. We should be able to continue.
@@ -197,6 +213,8 @@ public class TestFailedAppendAndSync {
} catch (IOException ioe) {
threwOnBoth = true;
}
+ while (rollsCount == rolls.get()) Threads.sleep(100);
+
// Again, all should be good. New WAL and no outstanding unsync'd edits so we should be able
// to just continue.
http://git-wip-us.apache.org/repos/asf/hbase/blob/72b4c906/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
index a620951..ccf2b15 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
@@ -81,6 +82,8 @@ public class TestWALLockup {
public void setup() throws IOException {
TEST_UTIL = HBaseTestingUtility.createLocalHTU();
CONF = TEST_UTIL.getConfiguration();
+ // Disable block cache.
+ CONF.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
dir = TEST_UTIL.getDataTestDir("TestHRegion").toString();
tableName = TableName.valueOf(name.getMethodName());
}
@@ -139,10 +142,10 @@ public class TestWALLockup {
protected void beforeWaitOnSafePoint() {
if (throwException) {
LOG.info("COUNTDOWN");
- // Don't countdown latch until someone waiting on it.
- while (this.latch.getCount() <= 0) {
- Threads.sleep(10);
- }
+ // Don't countdown latch until someone waiting on it otherwise, the above
+ // afterCreatingZigZagLatch will get to the latch and no one will ever free it and we'll
+ // be stuck; test won't go down
+ while (this.latch.getCount() <= 0) Threads.sleep(1);
this.latch.countDown();
}
}