You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2016/10/10 19:00:45 UTC
[3/9] hbase git commit: HBASE-16781 Fix flaky
TestMasterProcedureWalLease
HBASE-16781 Fix flaky TestMasterProcedureWalLease
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/29d701a3
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/29d701a3
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/29d701a3
Branch: refs/heads/hbase-12439
Commit: 29d701a314b6bf56771a217b42c4c10832b15753
Parents: c7cae6b
Author: Matteo Bertozzi <ma...@cloudera.com>
Authored: Fri Oct 7 17:32:19 2016 -0700
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Fri Oct 7 18:01:53 2016 -0700
----------------------------------------------------------------------
.../procedure2/store/wal/WALProcedureStore.java | 41 +++++++++++++-------
.../hadoop/hbase/master/MasterServices.java | 5 +++
.../master/procedure/MasterProcedureEnv.java | 7 +++-
.../hbase/master/MockNoopMasterServices.java | 5 +++
.../MasterProcedureTestingUtility.java | 1 +
5 files changed, 43 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/29d701a3/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
index 36cf7af..1e60402 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
@@ -122,6 +122,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
private final AtomicBoolean inSync = new AtomicBoolean(false);
private final AtomicLong totalSynced = new AtomicLong(0);
private final AtomicLong lastRollTs = new AtomicLong(0);
+ private final AtomicLong syncId = new AtomicLong(0);
private LinkedTransferQueue<ByteSlot> slotsCache = null;
private Set<ProcedureWALFile> corruptedLogs = null;
@@ -226,15 +227,15 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
@Override
- public void stop(boolean abort) {
+ public void stop(final boolean abort) {
if (!setRunning(false)) {
return;
}
- LOG.info("Stopping the WAL Procedure Store");
+ LOG.info("Stopping the WAL Procedure Store, isAbort=" + abort +
+ (isSyncAborted() ? " (self aborting)" : ""));
sendStopSignal();
-
- if (!abort) {
+ if (!isSyncAborted()) {
try {
while (syncThread.isAlive()) {
sendStopSignal();
@@ -525,6 +526,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
}
+ final long pushSyncId = syncId.get();
updateStoreTracker(type, procId, subProcIds);
slots[slotIndex++] = slot;
logId = flushLogId;
@@ -540,7 +542,9 @@ public class WALProcedureStore extends ProcedureStoreBase {
slotCond.signal();
}
- syncCond.await();
+ while (pushSyncId == syncId.get() && isRunning()) {
+ syncCond.await();
+ }
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
sendAbortProcessSignal();
@@ -642,13 +646,15 @@ public class WALProcedureStore extends ProcedureStoreBase {
totalSyncedToStore = totalSynced.addAndGet(slotSize);
slotIndex = 0;
inSync.set(false);
+ syncId.incrementAndGet();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- sendAbortProcessSignal();
syncException.compareAndSet(null, e);
+ sendAbortProcessSignal();
throw e;
} catch (Throwable t) {
syncException.compareAndSet(null, t);
+ sendAbortProcessSignal();
throw t;
} finally {
syncCond.signalAll();
@@ -679,13 +685,12 @@ public class WALProcedureStore extends ProcedureStoreBase {
} catch (Throwable e) {
LOG.warn("unable to sync slots, retry=" + retry);
if (++retry >= maxRetriesBeforeRoll) {
- if (logRolled >= maxSyncFailureRoll) {
+ if (logRolled >= maxSyncFailureRoll && isRunning()) {
LOG.error("Sync slots after log roll failed, abort.", e);
- sendAbortProcessSignal();
throw e;
}
- if (!rollWriterOrDie()) {
+ if (!rollWriterWithRetries()) {
throw e;
}
@@ -720,8 +725,8 @@ public class WALProcedureStore extends ProcedureStoreBase {
return totalSynced;
}
- private boolean rollWriterOrDie() {
- for (int i = 0; i < rollRetries; ++i) {
+ private boolean rollWriterWithRetries() {
+ for (int i = 0; i < rollRetries && isRunning(); ++i) {
if (i > 0) Threads.sleepWithoutInterrupt(waitBeforeRoll * i);
try {
@@ -733,8 +738,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
}
LOG.fatal("Unable to roll the log");
- sendAbortProcessSignal();
- throw new RuntimeException("unable to roll the log");
+ return false;
}
private boolean tryRollWriter() {
@@ -777,7 +781,8 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
}
- @VisibleForTesting void removeInactiveLogsForTesting() throws Exception {
+ @VisibleForTesting
+ protected void removeInactiveLogsForTesting() throws Exception {
lock.lock();
try {
removeInactiveLogs();
@@ -812,6 +817,8 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
private boolean rollWriter() throws IOException {
+ if (!isRunning()) return false;
+
// Create new state-log
if (!rollWriter(flushLogId + 1)) {
LOG.warn("someone else has already created log " + flushLogId);
@@ -1043,6 +1050,10 @@ public class WALProcedureStore extends ProcedureStoreBase {
for (int i = 0; i < logFiles.length; ++i) {
final Path logPath = logFiles[i].getPath();
leaseRecovery.recoverFileLease(fs, logPath);
+ if (!isRunning()) {
+ throw new IOException("wal aborting");
+ }
+
maxLogId = Math.max(maxLogId, getLogIdFromName(logPath.getName()));
ProcedureWALFile log = initOldLog(logFiles[i]);
if (log != null) {
@@ -1061,7 +1072,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
* it using entries in the log.
*/
private void initTrackerFromOldLogs() {
- if (logs.isEmpty()) return;
+ if (logs.isEmpty() || !isRunning()) return;
ProcedureWALFile log = logs.getLast();
if (!log.getTracker().isPartial()) {
storeTracker.resetTo(log.getTracker());
http://git-wip-us.apache.org/repos/asf/hbase/blob/29d701a3/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
index 670642f..9bdcf76 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
@@ -374,4 +374,9 @@ public interface MasterServices extends Server {
* @return load balancer
*/
public LoadBalancer getLoadBalancer();
+
+ /**
+ * @return True if this master is stopping.
+ */
+ boolean isStopping();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/29d701a3/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
index 213f80c..e90813c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
@@ -61,10 +61,15 @@ public class MasterProcedureEnv {
@Override
public boolean progress() {
LOG.debug("Recover Procedure Store log lease: " + path);
- return master.isActiveMaster();
+ return isRunning();
}
});
}
+
+ private boolean isRunning() {
+ return master.isActiveMaster() && !master.isStopped() &&
+ !master.isStopping() && !master.isAborted();
+ }
}
@InterfaceAudience.Private
http://git-wip-us.apache.org/repos/asf/hbase/blob/29d701a3/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
index 56a8522..87fb169 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
@@ -180,6 +180,11 @@ public class MockNoopMasterServices implements MasterServices, Server {
}
@Override
+ public boolean isStopping() {
+ return stopped;
+ }
+
+ @Override
public boolean isStopped() {
return stopped;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/29d701a3/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureTestingUtility.java
index 9b69830..c3cb2da 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureTestingUtility.java
@@ -398,6 +398,7 @@ public class MasterProcedureTestingUtility {
// restart executor/store
// rollback step N - save on store
InjectAbortOnLoadListener abortListener = new InjectAbortOnLoadListener(procExec);
+ abortListener.addProcId(procId);
procExec.registerListener(abortListener);
try {
for (int i = 0; !procExec.isFinished(procId); ++i) {