You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2015/07/09 19:57:45 UTC
[2/3] hbase git commit: HBASE-13832 Procedure v2: try to roll the WAL
master on sync failure before aborting
HBASE-13832 Procedure v2: try to roll the WAL master on sync failure before aborting
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/246631d6
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/246631d6
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/246631d6
Branch: refs/heads/branch-1.2
Commit: 246631d60e7b3b5ece039e67e8d3e9bb66d5f9a5
Parents: bcadcef
Author: Matteo Bertozzi <ma...@cloudera.com>
Authored: Thu Jul 9 08:34:42 2015 -0700
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Thu Jul 9 08:56:09 2015 -0700
----------------------------------------------------------------------
.../hbase/procedure2/store/ProcedureStore.java | 5 +
.../procedure2/store/ProcedureStoreBase.java | 8 +
.../procedure2/store/wal/WALProcedureStore.java | 237 ++++++++++++++-----
.../procedure2/ProcedureTestingUtility.java | 32 +++
.../store/wal/TestWALProcedureStore.java | 31 +--
.../master/procedure/MasterProcedureEnv.java | 5 +
.../TestMasterFailoverWithProcedures.java | 69 +++++-
.../procedure/TestWALProcedureStoreOnHDFS.java | 198 ++++++++++++++++
8 files changed, 487 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/246631d6/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
index 6a31eef..39a3472 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
@@ -38,6 +38,11 @@ public interface ProcedureStore {
*/
public interface ProcedureStoreListener {
/**
+ * triggered when the store sync is completed.
+ */
+ void postSync();
+
+ /**
* triggered when the store is not able to write out data.
* the main process should abort.
*/
http://git-wip-us.apache.org/repos/asf/hbase/blob/246631d6/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreBase.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreBase.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreBase.java
index e5653b6..0e0e46f 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreBase.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreBase.java
@@ -56,6 +56,14 @@ public abstract class ProcedureStoreBase implements ProcedureStore {
return listeners.remove(listener);
}
+ protected void sendPostSyncSignal() {
+ if (!this.listeners.isEmpty()) {
+ for (ProcedureStoreListener listener : this.listeners) {
+ listener.postSync();
+ }
+ }
+ }
+
protected void sendAbortProcessSignal() {
if (!this.listeners.isEmpty()) {
for (ProcedureStoreListener listener : this.listeners) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/246631d6/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
index 54b53dc..b0e2258 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.FileNotFoundException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.LinkedTransferQueue;
@@ -51,6 +52,9 @@ import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
import org.apache.hadoop.hbase.procedure2.util.ByteSlot;
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
+import org.apache.hadoop.hbase.util.Threads;
+
+import com.google.common.annotations.VisibleForTesting;
/**
* WAL implementation of the ProcedureStore.
@@ -64,7 +68,25 @@ public class WALProcedureStore extends ProcedureStoreBase {
void recoverFileLease(FileSystem fs, Path path) throws IOException;
}
- private static final int MAX_RETRIES_BEFORE_ABORT = 3;
+ private static final String MAX_RETRIES_BEFORE_ROLL_CONF_KEY =
+ "hbase.procedure.store.wal.max.retries.before.roll";
+ private static final int DEFAULT_MAX_RETRIES_BEFORE_ROLL = 3;
+
+ private static final String WAIT_BEFORE_ROLL_CONF_KEY =
+ "hbase.procedure.store.wal.wait.before.roll";
+ private static final int DEFAULT_WAIT_BEFORE_ROLL = 500;
+
+ private static final String ROLL_RETRIES_CONF_KEY =
+ "hbase.procedure.store.wal.max.roll.retries";
+ private static final int DEFAULT_ROLL_RETRIES = 3;
+
+ private static final String MAX_SYNC_FAILURE_ROLL_CONF_KEY =
+ "hbase.procedure.store.wal.sync.failure.roll.max";
+ private static final int DEFAULT_MAX_SYNC_FAILURE_ROLL = 3;
+
+ private static final String PERIODIC_ROLL_CONF_KEY =
+ "hbase.procedure.store.wal.periodic.roll.msec";
+ private static final int DEFAULT_PERIODIC_ROLL = 60 * 60 * 1000; // 1h
private static final String SYNC_WAIT_MSEC_CONF_KEY = "hbase.procedure.store.wal.sync.wait.msec";
private static final int DEFAULT_SYNC_WAIT_MSEC = 100;
@@ -88,16 +110,22 @@ public class WALProcedureStore extends ProcedureStoreBase {
private final Path logDir;
private AtomicBoolean inSync = new AtomicBoolean(false);
+ private AtomicReference<Throwable> syncException = new AtomicReference<>();
private LinkedTransferQueue<ByteSlot> slotsCache = null;
private Set<ProcedureWALFile> corruptedLogs = null;
private AtomicLong totalSynced = new AtomicLong(0);
+ private AtomicLong lastRollTs = new AtomicLong(0);
private FSDataOutputStream stream = null;
- private long lastRollTs = 0;
private long flushLogId = 0;
private int slotIndex = 0;
private Thread syncThread;
private ByteSlot[] slots;
+ private int maxRetriesBeforeRoll;
+ private int maxSyncFailureRoll;
+ private int waitBeforeRoll;
+ private int rollRetries;
+ private int periodicRollMsec;
private long rollThreshold;
private boolean useHsync;
private int syncWaitMsec;
@@ -124,7 +152,13 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
// Tunings
+ maxRetriesBeforeRoll =
+ conf.getInt(MAX_RETRIES_BEFORE_ROLL_CONF_KEY, DEFAULT_MAX_RETRIES_BEFORE_ROLL);
+ maxSyncFailureRoll = conf.getInt(MAX_SYNC_FAILURE_ROLL_CONF_KEY, DEFAULT_MAX_SYNC_FAILURE_ROLL);
+ waitBeforeRoll = conf.getInt(WAIT_BEFORE_ROLL_CONF_KEY, DEFAULT_WAIT_BEFORE_ROLL);
+ rollRetries = conf.getInt(ROLL_RETRIES_CONF_KEY, DEFAULT_ROLL_RETRIES);
rollThreshold = conf.getLong(ROLL_THRESHOLD_CONF_KEY, DEFAULT_ROLL_THRESHOLD);
+ periodicRollMsec = conf.getInt(PERIODIC_ROLL_CONF_KEY, DEFAULT_PERIODIC_ROLL);
syncWaitMsec = conf.getInt(SYNC_WAIT_MSEC_CONF_KEY, DEFAULT_SYNC_WAIT_MSEC);
useHsync = conf.getBoolean(USE_HSYNC_CONF_KEY, DEFAULT_USE_HSYNC);
@@ -132,11 +166,11 @@ public class WALProcedureStore extends ProcedureStoreBase {
syncThread = new Thread("WALProcedureStoreSyncThread") {
@Override
public void run() {
- while (isRunning()) {
- try {
- syncLoop();
- } catch (IOException e) {
- LOG.error("Got an exception from the sync-loop", e);
+ try {
+ syncLoop();
+ } catch (Throwable e) {
+ LOG.error("Got an exception from the sync-loop", e);
+ if (!isSyncAborted()) {
sendAbortProcessSignal();
}
}
@@ -155,6 +189,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
if (lock.tryLock()) {
try {
waitCond.signalAll();
+ syncCond.signalAll();
} finally {
lock.unlock();
}
@@ -310,6 +345,9 @@ public class WALProcedureStore extends ProcedureStoreBase {
// Update the store tracker
synchronized (storeTracker) {
storeTracker.insert(proc, subprocs);
+ if (logId == flushLogId) {
+ checkAndTryRoll();
+ }
}
}
@@ -342,6 +380,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
storeTracker.update(proc);
if (logId == flushLogId) {
removeOldLogs = storeTracker.isUpdated();
+ checkAndTryRoll();
}
}
@@ -377,8 +416,10 @@ public class WALProcedureStore extends ProcedureStoreBase {
synchronized (storeTracker) {
storeTracker.delete(procId);
if (logId == flushLogId) {
- if (storeTracker.isEmpty() && totalSynced.get() > rollThreshold) {
- removeOldLogs = rollWriterOrDie();
+ if (storeTracker.isEmpty() || storeTracker.isUpdated()) {
+ removeOldLogs = checkAndTryRoll();
+ } else {
+ checkAndTryRoll();
}
}
}
@@ -399,14 +440,23 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
private long pushData(final ByteSlot slot) {
- assert isRunning() && !logs.isEmpty() : "recoverLease() must be called before inserting data";
- long logId = -1;
+ if (!isRunning()) {
+ throw new RuntimeException("the store must be running before inserting data");
+ }
+ if (logs.isEmpty()) {
+ throw new RuntimeException("recoverLease() must be called before inserting data");
+ }
+ long logId = -1;
lock.lock();
try {
// Wait for the sync to be completed
while (true) {
- if (inSync.get()) {
+ if (!isRunning()) {
+ throw new RuntimeException("store no longer running");
+ } else if (isSyncAborted()) {
+ throw new RuntimeException("sync aborted", syncException.get());
+ } else if (inSync.get()) {
syncCond.await();
} else if (slotIndex == slots.length) {
slotCond.signal();
@@ -434,72 +484,101 @@ public class WALProcedureStore extends ProcedureStoreBase {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
sendAbortProcessSignal();
+ throw new RuntimeException(e);
} finally {
lock.unlock();
+ if (isSyncAborted()) {
+ throw new RuntimeException("sync aborted", syncException.get());
+ }
}
return logId;
}
- private void syncLoop() throws IOException {
+ private boolean isSyncAborted() {
+ return syncException.get() != null;
+ }
+
+ private void syncLoop() throws Throwable {
inSync.set(false);
- while (isRunning()) {
- lock.lock();
- try {
- // Wait until new data is available
- if (slotIndex == 0) {
- if (LOG.isTraceEnabled()) {
- float rollTsSec = (System.currentTimeMillis() - lastRollTs) / 1000.0f;
- LOG.trace(String.format("Waiting for data. flushed=%s (%s/sec)",
- StringUtils.humanSize(totalSynced.get()),
- StringUtils.humanSize(totalSynced.get() / rollTsSec)));
- }
- waitCond.await();
+ lock.lock();
+ try {
+ while (isRunning()) {
+ try {
+ // Wait until new data is available
if (slotIndex == 0) {
- // no data.. probably a stop()
- continue;
+ if (LOG.isTraceEnabled()) {
+ float rollTsSec = getMillisFromLastRoll() / 1000.0f;
+ LOG.trace(String.format("Waiting for data. flushed=%s (%s/sec)",
+ StringUtils.humanSize(totalSynced.get()),
+ StringUtils.humanSize(totalSynced.get() / rollTsSec)));
+ }
+
+ waitCond.await(getMillisToNextPeriodicRoll(), TimeUnit.MILLISECONDS);
+ if (slotIndex == 0) {
+ // no data.. probably a stop()
+ checkAndTryRoll();
+ continue;
+ }
}
- }
- // Wait SYNC_WAIT_MSEC or the signal of "slots full" before flushing
- long syncWaitSt = System.currentTimeMillis();
- if (slotIndex != slots.length) {
- slotCond.await(syncWaitMsec, TimeUnit.MILLISECONDS);
- }
- long syncWaitMs = System.currentTimeMillis() - syncWaitSt;
- if (LOG.isTraceEnabled() && (syncWaitMs > 10 || slotIndex < slots.length)) {
- float rollSec = (System.currentTimeMillis() - lastRollTs) / 1000.0f;
- LOG.trace("Sync wait " + StringUtils.humanTimeDiff(syncWaitMs) +
- ", slotIndex=" + slotIndex +
- ", totalSynced=" + StringUtils.humanSize(totalSynced.get()) +
- " " + StringUtils.humanSize(totalSynced.get() / rollSec) + "/sec");
- }
+ // Wait SYNC_WAIT_MSEC or the signal of "slots full" before flushing
+ long syncWaitSt = System.currentTimeMillis();
+ if (slotIndex != slots.length) {
+ slotCond.await(syncWaitMsec, TimeUnit.MILLISECONDS);
+ }
+ long syncWaitMs = System.currentTimeMillis() - syncWaitSt;
+ if (LOG.isTraceEnabled() && (syncWaitMs > 10 || slotIndex < slots.length)) {
+ float rollSec = getMillisFromLastRoll() / 1000.0f;
+ LOG.trace(String.format("Sync wait %s, slotIndex=%s , totalSynced=%s/sec",
+ StringUtils.humanTimeDiff(syncWaitMs), slotIndex,
+ StringUtils.humanSize(totalSynced.get()),
+ StringUtils.humanSize(totalSynced.get() / rollSec)));
+ }
- inSync.set(true);
- totalSynced.addAndGet(syncSlots());
- slotIndex = 0;
- inSync.set(false);
- syncCond.signalAll();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- sendAbortProcessSignal();
- } finally {
- lock.unlock();
+ inSync.set(true);
+ totalSynced.addAndGet(syncSlots());
+ slotIndex = 0;
+ inSync.set(false);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ sendAbortProcessSignal();
+ syncException.compareAndSet(null, e);
+ throw e;
+ } catch (Throwable t) {
+ syncException.compareAndSet(null, t);
+ throw t;
+ } finally {
+ syncCond.signalAll();
+ }
}
+ } finally {
+ lock.unlock();
}
}
- private long syncSlots() {
+ private long syncSlots() throws Throwable {
int retry = 0;
+ int logRolled = 0;
long totalSynced = 0;
do {
try {
totalSynced = syncSlots(stream, slots, 0, slotIndex);
break;
} catch (Throwable e) {
- if (++retry == MAX_RETRIES_BEFORE_ABORT) {
- LOG.error("Sync slot failed, abort.", e);
- sendAbortProcessSignal();
+ if (++retry >= maxRetriesBeforeRoll) {
+ if (logRolled >= maxSyncFailureRoll) {
+ LOG.error("Sync slots after log roll failed, abort.", e);
+ sendAbortProcessSignal();
+ throw e;
+ }
+
+ if (!rollWriterOrDie()) {
+ throw e;
+ }
+
+ logRolled++;
+ retry = 0;
}
}
} while (isRunning());
@@ -520,6 +599,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
} else {
stream.hflush();
}
+ sendPostSyncSignal();
if (LOG.isTraceEnabled()) {
LOG.trace("Sync slots=" + count + '/' + slots.length +
@@ -528,14 +608,45 @@ public class WALProcedureStore extends ProcedureStoreBase {
return totalSynced;
}
- private boolean rollWriterOrDie() {
- try {
- return rollWriter();
- } catch (IOException e) {
- LOG.warn("Unable to roll the log", e);
- sendAbortProcessSignal();
- return false;
+ @VisibleForTesting
+ public boolean rollWriterOrDie() {
+ for (int i = 1; i <= rollRetries; ++i) {
+ try {
+ if (rollWriter()) {
+ return true;
+ }
+ } catch (IOException e) {
+ LOG.warn("Unable to roll the log, attempt=" + i, e);
+ Threads.sleepWithoutInterrupt(waitBeforeRoll);
+ }
+ }
+ LOG.fatal("Unable to roll the log");
+ sendAbortProcessSignal();
+ throw new RuntimeException("unable to roll the log");
+ }
+
+ protected boolean checkAndTryRoll() {
+ if (!isRunning()) return false;
+
+ if (totalSynced.get() > rollThreshold || getMillisToNextPeriodicRoll() <= 0) {
+ try {
+ return rollWriter();
+ } catch (IOException e) {
+ LOG.warn("Unable to roll the log", e);
+ }
+ }
+ return false;
+ }
+
+ private long getMillisToNextPeriodicRoll() {
+ if (lastRollTs.get() > 0 && periodicRollMsec > 0) {
+ return periodicRollMsec - getMillisFromLastRoll();
}
+ return Long.MAX_VALUE;
+ }
+
+ private long getMillisFromLastRoll() {
+ return (System.currentTimeMillis() - lastRollTs.get());
}
protected boolean rollWriter() throws IOException {
@@ -573,7 +684,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
stream = newStream;
flushLogId = logId;
totalSynced.set(0);
- lastRollTs = System.currentTimeMillis();
+ lastRollTs.set(System.currentTimeMillis());
logs.add(new ProcedureWALFile(fs, newLogFile, header, startPos));
} finally {
lock.unlock();
http://git-wip-us.apache.org/repos/asf/hbase/blob/246631d6/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
index 89a6434..1534a5f 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
@@ -19,6 +19,8 @@
package org.apache.hadoop.hbase.procedure2;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -182,4 +184,34 @@ public class ProcedureTestingUtility {
assertTrue("expected abort exception, got "+ cause,
cause instanceof ProcedureAbortedException);
}
+
+ public static class TestProcedure extends Procedure<Void> {
+ public TestProcedure() {}
+
+ public TestProcedure(long procId, long parentId) {
+ setProcId(procId);
+ if (parentId > 0) {
+ setParentProcId(parentId);
+ }
+ }
+
+ public void addStackId(final int index) {
+ addStackIndex(index);
+ }
+
+ @Override
+ protected Procedure[] execute(Void env) { return null; }
+
+ @Override
+ protected void rollback(Void env) { }
+
+ @Override
+ protected boolean abort(Void env) { return false; }
+
+ @Override
+ protected void serializeStateData(final OutputStream stream) throws IOException { }
+
+ @Override
+ protected void deserializeStateData(final InputStream stream) throws IOException { }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/246631d6/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
index 19a9ea4..c2e6a77 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
import org.apache.hadoop.hbase.procedure2.SequentialProcedure;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
@@ -354,36 +355,6 @@ public class TestWALProcedureStore {
});
}
- public static class TestProcedure extends Procedure<Void> {
- public TestProcedure() {}
-
- public TestProcedure(long procId, long parentId) {
- setProcId(procId);
- if (parentId > 0) {
- setParentProcId(parentId);
- }
- }
-
- public void addStackId(final int index) {
- addStackIndex(index);
- }
-
- @Override
- protected Procedure[] execute(Void env) { return null; }
-
- @Override
- protected void rollback(Void env) { }
-
- @Override
- protected boolean abort(Void env) { return false; }
-
- @Override
- protected void serializeStateData(final OutputStream stream) throws IOException { }
-
- @Override
- protected void deserializeStateData(final InputStream stream) throws IOException { }
- }
-
private void corruptLog(final FileStatus logFile, final long dropBytes)
throws IOException {
assertTrue(logFile.getLen() > dropBytes);
http://git-wip-us.apache.org/repos/asf/hbase/blob/246631d6/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
index f2f4bf3..6700b63 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
@@ -75,6 +75,11 @@ public class MasterProcedureEnv {
}
@Override
+ public void postSync() {
+ // no-op
+ }
+
+ @Override
public void abortProcess() {
master.abort("The Procedure Store lost the lease");
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/246631d6/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterFailoverWithProcedures.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterFailoverWithProcedures.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterFailoverWithProcedures.java
index ddb0ab1..429c83b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterFailoverWithProcedures.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterFailoverWithProcedures.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.master.procedure;
+import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import org.apache.commons.logging.Log;
@@ -31,10 +32,12 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
+import org.apache.hadoop.hbase.procedure2.store.wal.TestWALProcedureStore.TestSequentialProcedure;
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.CreateTableState;
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.DeleteTableState;
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.DisableTableState;
@@ -44,7 +47,6 @@ import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.ModifyRegionUtils;
-
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -52,7 +54,6 @@ import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -63,6 +64,11 @@ public class TestMasterFailoverWithProcedures {
protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static void setupConf(Configuration conf) {
+ // don't waste time retrying with the roll, the test is already slow enough.
+ conf.setInt("hbase.procedure.store.wal.max.retries.before.roll", 1);
+ conf.setInt("hbase.procedure.store.wal.wait.before.roll", 0);
+ conf.setInt("hbase.procedure.store.wal.max.roll.retries", 1);
+ conf.setInt("hbase.procedure.store.wal.sync.failure.roll.max", 1);
}
@Before
@@ -94,6 +100,9 @@ public class TestMasterFailoverWithProcedures {
final CountDownLatch masterStoreAbort = new CountDownLatch(1);
masterStore.registerListener(new ProcedureStore.ProcedureStoreListener() {
@Override
+ public void postSync() {}
+
+ @Override
public void abortProcess() {
LOG.debug("Abort store of Master");
masterStoreAbort.countDown();
@@ -113,6 +122,9 @@ public class TestMasterFailoverWithProcedures {
final CountDownLatch backupStore3Abort = new CountDownLatch(1);
backupStore3.registerListener(new ProcedureStore.ProcedureStoreListener() {
@Override
+ public void postSync() {}
+
+ @Override
public void abortProcess() {
LOG.debug("Abort store of backupMaster3");
backupStore3Abort.countDown();
@@ -126,8 +138,13 @@ public class TestMasterFailoverWithProcedures {
HTableDescriptor htd = MasterProcedureTestingUtility.createHTD(TableName.valueOf("mtb"), "f");
HRegionInfo[] regions = ModifyRegionUtils.createHRegionInfos(htd, null);
LOG.debug("submit proc");
- getMasterProcedureExecutor().submitProcedure(
- new CreateTableProcedure(getMasterProcedureExecutor().getEnvironment(), htd, regions));
+ try {
+ getMasterProcedureExecutor().submitProcedure(
+ new CreateTableProcedure(getMasterProcedureExecutor().getEnvironment(), htd, regions));
+ fail("expected RuntimeException 'sync aborted'");
+ } catch (RuntimeException e) {
+ LOG.info("got " + e.getMessage());
+ }
LOG.debug("wait master store abort");
masterStoreAbort.await();
@@ -139,10 +156,52 @@ public class TestMasterFailoverWithProcedures {
// wait the store in here to abort (the test will fail due to timeout if it doesn't)
LOG.debug("wait the store to abort");
backupStore3.getStoreTracker().setDeleted(1, false);
- backupStore3.delete(1);
+ try {
+ backupStore3.delete(1);
+ fail("expected RuntimeException 'sync aborted'");
+ } catch (RuntimeException e) {
+ LOG.info("got " + e.getMessage());
+ }
backupStore3Abort.await();
}
+ @Test(timeout=60000)
+ public void testWALfencingWithWALRolling() throws IOException {
+ final ProcedureStore procStore = getMasterProcedureExecutor().getStore();
+ assertTrue("expected WALStore for this test", procStore instanceof WALProcedureStore);
+
+ HMaster firstMaster = UTIL.getHBaseCluster().getMaster();
+
+ HMaster backupMaster3 = Mockito.mock(HMaster.class);
+ Mockito.doReturn(firstMaster.getConfiguration()).when(backupMaster3).getConfiguration();
+ Mockito.doReturn(true).when(backupMaster3).isActiveMaster();
+ final WALProcedureStore procStore2 = new WALProcedureStore(firstMaster.getConfiguration(),
+ firstMaster.getMasterFileSystem().getFileSystem(),
+ ((WALProcedureStore)procStore).getLogDir(),
+ new MasterProcedureEnv.WALStoreLeaseRecovery(backupMaster3));
+
+ // start a second store which should fence the first one out
+ LOG.info("Starting new WALProcedureStore");
+ procStore2.start(1);
+ procStore2.recoverLease();
+
+ LOG.info("Inserting into second WALProcedureStore");
+ // insert something to the second store then delete it, causing a WAL roll
+ Procedure proc2 = new TestSequentialProcedure();
+ procStore2.insert(proc2, null);
+ procStore2.rollWriterOrDie();
+
+ LOG.info("Inserting into first WALProcedureStore");
+ // insert something to the first store
+ proc2 = new TestSequentialProcedure();
+ try {
+ procStore.insert(proc2, null);
+ fail("expected RuntimeException 'sync aborted'");
+ } catch (RuntimeException e) {
+ LOG.info("got " + e.getMessage());
+ }
+ }
+
// ==========================================================================
// Test Create Table
// ==========================================================================
http://git-wip-us.apache.org/repos/asf/hbase/blob/246631d6/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestWALProcedureStoreOnHDFS.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestWALProcedureStoreOnHDFS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestWALProcedureStoreOnHDFS.java
new file mode 100644
index 0000000..89ad5d6
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestWALProcedureStoreOnHDFS.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.procedure;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
+import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.CreateTableState;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.DeleteTableState;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.DisableTableState;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.EnableTableState;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.TruncateTableState;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.ModifyRegionUtils;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@Category(LargeTests.class)
+public class TestWALProcedureStoreOnHDFS {
+ private static final Log LOG = LogFactory.getLog(TestWALProcedureStoreOnHDFS.class);
+
+ protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ private WALProcedureStore store;
+
+ private static void setupConf(Configuration conf) {
+ conf.setInt("dfs.replication", 3);
+ conf.setInt("dfs.namenode.replication.min", 3);
+
+ // increase the value for slow test-env
+ conf.setInt("hbase.procedure.store.wal.wait.before.roll", 1000);
+ conf.setInt("hbase.procedure.store.wal.max.roll.retries", 5);
+ conf.setInt("hbase.procedure.store.wal.sync.failure.roll.max", 5);
+ }
+
+ @Before
+ public void setup() throws Exception {
+ setupConf(UTIL.getConfiguration());
+ MiniDFSCluster dfs = UTIL.startMiniDFSCluster(3);
+
+ Path logDir = new Path(new Path(dfs.getFileSystem().getUri()), "/test-logs");
+ store = ProcedureTestingUtility.createWalStore(
+ UTIL.getConfiguration(), dfs.getFileSystem(), logDir);
+ store.registerListener(new ProcedureStore.ProcedureStoreListener() {
+ @Override
+ public void postSync() {}
+
+ @Override
+ public void abortProcess() {
+ LOG.fatal("Abort the Procedure Store");
+ store.stop(true);
+ }
+ });
+ store.start(8);
+ store.recoverLease();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ store.stop(false);
+ UTIL.getDFSCluster().getFileSystem().delete(store.getLogDir(), true);
+
+ try {
+ UTIL.shutdownMiniCluster();
+ } catch (Exception e) {
+ LOG.warn("failure shutting down cluster", e);
+ }
+ }
+
+ @Test(timeout=60000, expected=RuntimeException.class)
+ public void testWalAbortOnLowReplication() throws Exception {
+ assertEquals(3, UTIL.getDFSCluster().getDataNodes().size());
+
+ LOG.info("Stop DataNode");
+ UTIL.getDFSCluster().stopDataNode(0);
+ assertEquals(2, UTIL.getDFSCluster().getDataNodes().size());
+
+ store.insert(new TestProcedure(1, -1), null);
+ for (long i = 2; store.isRunning(); ++i) {
+ assertEquals(2, UTIL.getDFSCluster().getDataNodes().size());
+ store.insert(new TestProcedure(i, -1), null);
+ Thread.sleep(100);
+ }
+ assertFalse(store.isRunning());
+ fail("The store.insert() should throw an exeption");
+ }
+
+ @Test(timeout=60000)
+ public void testWalAbortOnLowReplicationWithQueuedWriters() throws Exception {
+ assertEquals(3, UTIL.getDFSCluster().getDataNodes().size());
+
+ store.registerListener(new ProcedureStore.ProcedureStoreListener() {
+ @Override
+ public void postSync() {
+ Threads.sleepWithoutInterrupt(2000);
+ }
+
+ @Override
+ public void abortProcess() {}
+ });
+
+ final AtomicInteger reCount = new AtomicInteger(0);
+ Thread[] thread = new Thread[store.getNumThreads() * 2 + 1];
+ for (int i = 0; i < thread.length; ++i) {
+ final long procId = i + 1;
+ thread[i] = new Thread() {
+ public void run() {
+ try {
+ LOG.debug("[S] INSERT " + procId);
+ store.insert(new TestProcedure(procId, -1), null);
+ LOG.debug("[E] INSERT " + procId);
+ } catch (RuntimeException e) {
+ reCount.incrementAndGet();
+ LOG.debug("[F] INSERT " + procId + ": " + e.getMessage());
+ }
+ }
+ };
+ thread[i].start();
+ }
+
+ Thread.sleep(1000);
+ LOG.info("Stop DataNode");
+ UTIL.getDFSCluster().stopDataNode(0);
+ assertEquals(2, UTIL.getDFSCluster().getDataNodes().size());
+
+ for (int i = 0; i < thread.length; ++i) {
+ thread[i].join();
+ }
+
+ assertFalse(store.isRunning());
+ assertTrue(reCount.toString(), reCount.get() >= store.getNumThreads() &&
+ reCount.get() < thread.length);
+ }
+
+ @Test(timeout=60000)
+ public void testWalRollOnLowReplication() throws Exception {
+ int dnCount = 0;
+ store.insert(new TestProcedure(1, -1), null);
+ UTIL.getDFSCluster().restartDataNode(dnCount);
+ for (long i = 2; i < 100; ++i) {
+ store.insert(new TestProcedure(i, -1), null);
+ Thread.sleep(100);
+ if ((i % 30) == 0) {
+ LOG.info("Restart Data Node");
+ UTIL.getDFSCluster().restartDataNode(++dnCount % 3);
+ }
+ }
+ assertTrue(store.isRunning());
+ }
+}