You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2016/01/23 02:16:31 UTC
hbase git commit: HBASE-15100 Master WALProcs are deleted out of
order ending up with older wals not removed
Repository: hbase
Updated Branches:
refs/heads/branch-1.1 aa0e492f8 -> 20d5577d2
HBASE-15100 Master WALProcs are deleted out of order ending up with older wals not removed
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/20d5577d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/20d5577d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/20d5577d
Branch: refs/heads/branch-1.1
Commit: 20d5577d2e1b8e395e66be88a66a70bec8665f4f
Parents: aa0e492
Author: Matteo Bertozzi <ma...@cloudera.com>
Authored: Fri Jan 22 17:12:18 2016 -0800
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Fri Jan 22 17:12:18 2016 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/ProcedureInfo.java | 20 +++++----
.../hadoop/hbase/procedure2/Procedure.java | 17 ++------
.../hbase/procedure2/ProcedureExecutor.java | 8 ++--
.../hbase/procedure2/store/ProcedureStore.java | 1 +
.../procedure2/store/ProcedureStoreTracker.java | 10 +++--
.../store/wal/ProcedureWALFormat.java | 1 -
.../store/wal/ProcedureWALFormatReader.java | 9 ++--
.../procedure2/store/wal/WALProcedureStore.java | 32 +++++++-------
.../store/TestProcedureStoreTracker.java | 4 +-
.../store/wal/TestWALProcedureStore.java | 44 ++++++++++++++++++++
10 files changed, 91 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/20d5577d/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java
index 4a15857..c79ea98 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java
@@ -41,12 +41,12 @@ public class ProcedureInfo {
private final String procOwner;
private final ProcedureState procState;
private final long parentId;
+ private final NonceKey nonceKey;
private final ForeignExceptionMessage exception;
private final long lastUpdate;
private final long startTime;
private final byte[] result;
- private NonceKey nonceKey = null;
private long clientAckTime = -1;
public ProcedureInfo(
@@ -55,6 +55,7 @@ public class ProcedureInfo {
final String procOwner,
final ProcedureState procState,
final long parentId,
+ final NonceKey nonceKey,
final ForeignExceptionMessage exception,
final long lastUpdate,
final long startTime,
@@ -64,6 +65,7 @@ public class ProcedureInfo {
this.procOwner = procOwner;
this.procState = procState;
this.parentId = parentId;
+ this.nonceKey = nonceKey;
this.lastUpdate = lastUpdate;
this.startTime = startTime;
@@ -73,8 +75,8 @@ public class ProcedureInfo {
}
public ProcedureInfo clone() {
- return new ProcedureInfo(
- procId, procName, procOwner, procState, parentId, exception, lastUpdate, startTime, result);
+ return new ProcedureInfo(procId, procName, procOwner, procState, parentId, nonceKey,
+ exception, lastUpdate, startTime, result);
}
public long getProcId() {
@@ -105,10 +107,6 @@ public class ProcedureInfo {
return nonceKey;
}
- public void setNonceKey(NonceKey nonceKey) {
- this.nonceKey = nonceKey;
- }
-
public boolean isFailed() {
return exception != null;
}
@@ -216,13 +214,19 @@ public class ProcedureInfo {
*/
@InterfaceAudience.Private
public static ProcedureInfo convert(final ProcedureProtos.Procedure procProto) {
+ NonceKey nonceKey = null;
+ if (procProto.getNonce() != HConstants.NO_NONCE) {
+ nonceKey = new NonceKey(procProto.getNonceGroup(), procProto.getNonce());
+ }
+
return new ProcedureInfo(
procProto.getProcId(),
procProto.getClassName(),
procProto.getOwner(),
procProto.getState(),
procProto.hasParentId() ? procProto.getParentId() : -1,
- procProto.getState() == ProcedureState.ROLLEDBACK ? procProto.getException() : null,
+ nonceKey,
+ procProto.hasException() ? procProto.getException() : null,
procProto.getLastUpdate(),
procProto.getStartTime(),
procProto.getState() == ProcedureState.FINISHED ? procProto.getResult().toByteArray() : null);
http://git-wip-us.apache.org/repos/asf/hbase/blob/20d5577d/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
index b9306c4..304c225 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
@@ -600,30 +600,19 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
*/
@InterfaceAudience.Private
public static ProcedureInfo createProcedureInfo(final Procedure proc, final NonceKey nonceKey) {
- RemoteProcedureException exception;
-
- if (proc.hasException()) {
- exception = proc.getException();
- } else {
- exception = null;
- }
- ProcedureInfo procInfo = new ProcedureInfo(
+ RemoteProcedureException exception = proc.hasException() ? proc.getException() : null;
+ return new ProcedureInfo(
proc.getProcId(),
proc.toStringClass(),
proc.getOwner(),
proc.getState(),
proc.hasParent() ? proc.getParentProcId() : -1,
+ nonceKey,
exception != null ?
RemoteProcedureException.toProto(exception.getSource(), exception.getCause()) : null,
proc.getLastUpdate(),
proc.getStartTime(),
proc.getResult());
-
- if (nonceKey != null) {
- procInfo.setNonceKey(nonceKey);
- }
-
- return procInfo;
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/20d5577d/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
index dd439fd..5277fa2 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
@@ -813,13 +813,15 @@ public class ProcedureExecutor<TEnvironment> {
break;
}
- if (proc.getProcId() == rootProcId && proc.isSuccess()) {
- // Finalize the procedure state
+ if (proc.isSuccess()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Procedure completed in " +
StringUtils.humanTimeDiff(proc.elapsedTime()) + ": " + proc);
}
- procedureFinished(proc);
+ // Finalize the procedure state
+ if (proc.getProcId() == rootProcId) {
+ procedureFinished(proc);
+ }
break;
}
} while (procStack.isFailed());
http://git-wip-us.apache.org/repos/asf/hbase/blob/20d5577d/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
index 83933ff..e49475d 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.ProcedureInfo;
import org.apache.hadoop.hbase.procedure2.Procedure;
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/20d5577d/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
index 6823288..fe2904b 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
@@ -414,7 +414,9 @@ public class ProcedureStoreTracker {
node.updateState(procId, isDeleted);
}
- public void clear() {
+ public void reset() {
+ this.keepDeletes = false;
+ this.partial = false;
this.map.clear();
resetUpdates();
}
@@ -579,11 +581,11 @@ public class ProcedureStoreTracker {
}
public void readFrom(final InputStream stream) throws IOException {
- ProcedureProtos.ProcedureStoreTracker data =
+ reset();
+ final ProcedureProtos.ProcedureStoreTracker data =
ProcedureProtos.ProcedureStoreTracker.parseDelimitedFrom(stream);
- map.clear();
for (ProcedureProtos.ProcedureStoreTracker.TrackerNode protoNode: data.getNodeList()) {
- BitSetNode node = BitSetNode.convert(protoNode);
+ final BitSetNode node = BitSetNode.convert(protoNode);
map.put(node.getStart(), node);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/20d5577d/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
index 17432ac..0df4046 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
@@ -64,7 +64,6 @@ public final class ProcedureWALFormat {
}
interface Loader {
- void removeLog(ProcedureWALFile log);
void markCorruptedWAL(ProcedureWALFile log, IOException e);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/20d5577d/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
index 4dbd929..7d9a57b 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
@@ -25,9 +25,10 @@ import java.util.HashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.hbase.ProcedureInfo;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos;
@@ -91,10 +92,7 @@ public class ProcedureWALFormatReader {
loader.markCorruptedWAL(log, e);
}
- if (localProcedures.isEmpty()) {
- LOG.info("No active entry found in state log " + log + ". removing it");
- loader.removeLog(log);
- } else {
+ if (!localProcedures.isEmpty()) {
Iterator<Map.Entry<Long, ProcedureProtos.Procedure>> itd =
localProcedures.entrySet().iterator();
long minProcId = Long.MAX_VALUE;
@@ -160,6 +158,7 @@ public class ProcedureWALFormatReader {
}
maxProcId = Math.max(maxProcId, entry.getProcId());
localProcedures.remove(entry.getProcId());
+ assert !procedures.containsKey(entry.getProcId());
tracker.setDeleted(entry.getProcId(), true);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/20d5577d/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
index e5d6869..0089760 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
@@ -247,8 +247,13 @@ public class WALProcedureStore implements ProcedureStore {
return storeTracker;
}
- public LinkedList<ProcedureWALFile> getActiveLogs() {
- return logs;
+ public ArrayList<ProcedureWALFile> getActiveLogs() {
+ lock.lock();
+ try {
+ return new ArrayList<ProcedureWALFile>(logs);
+ } finally {
+ lock.unlock();
+ }
}
public Set<ProcedureWALFile> getCorruptedLogs() {
@@ -316,17 +321,11 @@ public class WALProcedureStore implements ProcedureStore {
}
// Load the old logs
- final ArrayList<ProcedureWALFile> toRemove = new ArrayList<ProcedureWALFile>();
Iterator<ProcedureWALFile> it = logs.descendingIterator();
it.next(); // Skip the current log
try {
return ProcedureWALFormat.load(it, storeTracker, new ProcedureWALFormat.Loader() {
@Override
- public void removeLog(ProcedureWALFile log) {
- toRemove.add(log);
- }
-
- @Override
public void markCorruptedWAL(ProcedureWALFile log, IOException e) {
if (corruptedLogs == null) {
corruptedLogs = new HashSet<ProcedureWALFile>();
@@ -336,11 +335,6 @@ public class WALProcedureStore implements ProcedureStore {
}
});
} finally {
- if (!toRemove.isEmpty()) {
- for (ProcedureWALFile log: toRemove) {
- removeLogFile(log);
- }
- }
loading.set(false);
}
}
@@ -589,6 +583,7 @@ public class WALProcedureStore implements ProcedureStore {
totalSynced = syncSlots(stream, slots, 0, slotIndex);
break;
} catch (Throwable e) {
+ LOG.warn("unable to sync slots, retry=" + retry);
if (++retry >= maxRetriesBeforeRoll) {
if (logRolled >= maxSyncFailureRoll) {
LOG.error("Sync slots after log roll failed, abort.", e);
@@ -648,14 +643,15 @@ public class WALProcedureStore implements ProcedureStore {
}
private boolean rollWriterOrDie() {
- for (int i = 1; i <= rollRetries; ++i) {
+ for (int i = 0; i < rollRetries; ++i) {
+ if (i > 0) Threads.sleepWithoutInterrupt(waitBeforeRoll * i);
+
try {
if (rollWriter()) {
return true;
}
} catch (IOException e) {
- LOG.warn("Unable to roll the log, attempt=" + i, e);
- Threads.sleepWithoutInterrupt(waitBeforeRoll);
+ LOG.warn("Unable to roll the log, attempt=" + (i + 1), e);
}
}
LOG.fatal("Unable to roll the log");
@@ -902,7 +898,7 @@ public class WALProcedureStore implements ProcedureStore {
}
}
- private long getMaxLogId(final FileStatus[] logFiles) {
+ private static long getMaxLogId(final FileStatus[] logFiles) {
long maxLogId = 0;
if (logFiles != null && logFiles.length > 0) {
for (int i = 0; i < logFiles.length; ++i) {
@@ -945,7 +941,7 @@ public class WALProcedureStore implements ProcedureStore {
} catch (IOException e) {
LOG.warn("Unable to read tracker for " + log + " - " + e.getMessage());
// try the next one...
- storeTracker.clear();
+ storeTracker.reset();
storeTracker.setPartialFlag(true);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/20d5577d/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
index 26a94d4..6bc5d36 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
@@ -192,7 +192,7 @@ public class TestProcedureStoreTracker {
count++;
}
- tracker.clear();
+ tracker.reset();
}
}
@@ -212,7 +212,7 @@ public class TestProcedureStoreTracker {
tracker.setDeleted(i, false);
}
- tracker.clear();
+ tracker.reset();
}
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/20d5577d/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
index 4fea6de..a33f334 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
@@ -151,6 +151,50 @@ public class TestWALProcedureStore {
}
@Test
+ public void testNoTrailerDoubleRestart() throws Exception {
+ // log-0001: proc 0, 1 and 2 are inserted
+ Procedure proc0 = new TestSequentialProcedure();
+ procStore.insert(proc0, null);
+ Procedure proc1 = new TestSequentialProcedure();
+ procStore.insert(proc1, null);
+ Procedure proc2 = new TestSequentialProcedure();
+ procStore.insert(proc2, null);
+ procStore.rollWriterForTesting();
+
+ // log-0002: proc 1 deleted
+ procStore.delete(proc1.getProcId());
+ procStore.rollWriterForTesting();
+
+ // log-0003: proc 2 is update
+ procStore.update(proc2);
+ procStore.rollWriterForTesting();
+
+ // log-0004: proc 2 deleted
+ procStore.delete(proc2.getProcId());
+
+ // stop the store and remove the trailer
+ procStore.stop(false);
+ FileStatus[] logs = fs.listStatus(logDir);
+ assertEquals(4, logs.length);
+ for (int i = 0; i < logs.length; ++i) {
+ corruptLog(logs[i], 4);
+ }
+
+ // Test Load 1
+ assertEquals(1, countProcedures(storeRestart()));
+
+ // Test Load 2
+ assertEquals(5, fs.listStatus(logDir).length);
+ assertEquals(1, countProcedures(storeRestart()));
+
+ // remove proc-0
+ procStore.delete(proc0.getProcId());
+ procStore.periodicRollForTesting();
+ assertEquals(1, fs.listStatus(logDir).length);
+ assertEquals(0, countProcedures(storeRestart()));
+ }
+
+ @Test
public void testCorruptedTrailer() throws Exception {
// Insert something
for (int i = 0; i < 100; ++i) {