You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2016/09/22 16:34:07 UTC
[19/50] [abbrv] hbase git commit: HBASE-16554 Rebuild WAL tracker if
trailer is corrupted.
HBASE-16554 Rebuild WAL tracker if trailer is corrupted.
Change-Id: Iecc3347de3de9fc57f57ab5f498aad404d02ec52
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b2eac0da
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b2eac0da
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b2eac0da
Branch: refs/heads/hbase-14439
Commit: b2eac0da33c4161aa8188213171afb03b72048a4
Parents: c5b8aab
Author: Apekshit Sharma <ap...@apache.org>
Authored: Sat Sep 17 17:38:40 2016 -0700
Committer: Apekshit Sharma <ap...@apache.org>
Committed: Mon Sep 19 12:23:48 2016 -0700
----------------------------------------------------------------------
.../procedure2/store/ProcedureStoreTracker.java | 15 +++-
.../procedure2/store/wal/ProcedureWALFile.java | 2 +
.../store/wal/ProcedureWALFormat.java | 14 +++-
.../store/wal/ProcedureWALFormatReader.java | 59 +++++++++++---
.../procedure2/store/wal/WALProcedureStore.java | 50 ++++++------
.../store/wal/TestWALProcedureStore.java | 82 ++++++++++++++++++++
6 files changed, 178 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/b2eac0da/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
index 78d6a44..a60ba3f 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
@@ -93,6 +93,7 @@ public class ProcedureStoreTracker {
private long[] updated;
/**
* Keeps track of procedure ids which belong to this bitmap's range and have been deleted.
+ * This represents global state since it's not reset on WAL rolls.
*/
private long[] deleted;
/**
@@ -449,8 +450,7 @@ public class ProcedureStoreTracker {
}
}
- public void resetToProto(ProcedureProtos.ProcedureStoreTracker trackerProtoBuf)
- throws IOException {
+ public void resetToProto(final ProcedureProtos.ProcedureStoreTracker trackerProtoBuf) {
reset();
for (ProcedureProtos.ProcedureStoreTracker.TrackerNode protoNode: trackerProtoBuf.getNodeList()) {
final BitSetNode node = new BitSetNode(protoNode);
@@ -536,6 +536,7 @@ public class ProcedureStoreTracker {
BitSetNode node = getOrCreateNode(procId);
assert node.contains(procId) : "expected procId=" + procId + " in the node=" + node;
node.updateState(procId, isDeleted);
+ trackProcIds(procId);
}
public void reset() {
@@ -545,6 +546,11 @@ public class ProcedureStoreTracker {
resetUpdates();
}
+ public boolean isUpdated(long procId) {
+ final Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId);
+ return entry != null && entry.getValue().contains(procId) && entry.getValue().isUpdated(procId);
+ }
+
/**
* If {@link #partial} is false, returns state from the bitmap. If no state is found for
* {@code procId}, returns YES.
@@ -583,6 +589,10 @@ public class ProcedureStoreTracker {
}
}
+ public boolean isPartial() {
+ return partial;
+ }
+
public void setPartialFlag(boolean isPartial) {
if (this.partial && !isPartial) {
for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
@@ -720,6 +730,7 @@ public class ProcedureStoreTracker {
entry.getValue().dump();
}
}
+
/**
* Iterates over
* {@link BitSetNode}s in this.map and subtracts with corresponding ones from {@code other}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b2eac0da/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
index 99e7a7e..b9726a8 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
@@ -62,6 +62,7 @@ public class ProcedureWALFile implements Comparable<ProcedureWALFile> {
this.logFile = logStatus.getPath();
this.logSize = logStatus.getLen();
this.timestamp = logStatus.getModificationTime();
+ tracker.setPartialFlag(true);
}
public ProcedureWALFile(FileSystem fs, Path logFile, ProcedureWALHeader header,
@@ -72,6 +73,7 @@ public class ProcedureWALFile implements Comparable<ProcedureWALFile> {
this.startPos = startPos;
this.logSize = startPos;
this.timestamp = timestamp;
+ tracker.setPartialFlag(true);
}
public void open() throws IOException {
http://git-wip-us.apache.org/repos/asf/hbase/blob/b2eac0da/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
index 775ec11..5f726d0 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
@@ -25,6 +25,8 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.util.Iterator;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -44,6 +46,8 @@ import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALTr
@InterfaceAudience.Private
@InterfaceStability.Evolving
public final class ProcedureWALFormat {
+ private static final Log LOG = LogFactory.getLog(ProcedureWALFormat.class);
+
static final byte LOG_TYPE_STREAM = 0;
static final byte LOG_TYPE_COMPACTED = 1;
static final byte LOG_TYPE_MAX_VALID = 1;
@@ -72,19 +76,21 @@ public final class ProcedureWALFormat {
public static void load(final Iterator<ProcedureWALFile> logs,
final ProcedureStoreTracker tracker, final Loader loader) throws IOException {
- ProcedureWALFormatReader reader = new ProcedureWALFormatReader(tracker);
+ final ProcedureWALFormatReader reader = new ProcedureWALFormatReader(tracker, loader);
tracker.setKeepDeletes(true);
try {
+ // Ignore the last log which is current active log.
while (logs.hasNext()) {
ProcedureWALFile log = logs.next();
log.open();
try {
- reader.read(log, loader);
+ reader.read(log);
} finally {
log.close();
}
}
- reader.finalize(loader);
+ reader.finish();
+
// The tracker is now updated with all the procedures read from the logs
tracker.setPartialFlag(false);
tracker.resetUpdates();
@@ -246,4 +252,4 @@ public final class ProcedureWALFormat {
}
builder.build().writeDelimitedTo(slot);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b2eac0da/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
index 8678c86..118ec19 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
@@ -101,19 +101,40 @@ public class ProcedureWALFormatReader {
private final WalProcedureMap localProcedureMap = new WalProcedureMap(1024);
private final WalProcedureMap procedureMap = new WalProcedureMap(1024);
- //private long compactionLogId;
+ // private long compactionLogId;
private long maxProcId = 0;
-
+ /**
+ * If tracker for a log file is partial (see {@link ProcedureStoreTracker#partial}), we
+ * re-build the list of procedures updated in that WAL because we need it for log cleaning
+ * purpose. If all procedures updated in a WAL are found to be obsolete, it can be safely deleted.
+ * (see {@link WALProcedureStore#removeInactiveLogs()}).
+ * However, we don't need deleted part of a WAL's tracker for this purpose, so we don't bother
+ * re-building it. (To understand why, take a look at
+ * {@link ProcedureStoreTracker.BitSetNode#subtract(ProcedureStoreTracker.BitSetNode)}).
+ */
+ private ProcedureStoreTracker localTracker;
+ private final ProcedureWALFormat.Loader loader;
+ /**
+ * Global tracker. If set to partial, it will be updated as procedures are loaded from wals,
+ * otherwise not.
+ */
private final ProcedureStoreTracker tracker;
- private final boolean hasFastStartSupport;
+ // private final boolean hasFastStartSupport;
- public ProcedureWALFormatReader(final ProcedureStoreTracker tracker) {
+ public ProcedureWALFormatReader(final ProcedureStoreTracker tracker,
+ ProcedureWALFormat.Loader loader) {
this.tracker = tracker;
+ this.loader = loader;
// we support fast-start only if we have a clean shutdown.
- this.hasFastStartSupport = !tracker.isEmpty();
+ // this.hasFastStartSupport = !tracker.isEmpty();
}
- public void read(ProcedureWALFile log, ProcedureWALFormat.Loader loader) throws IOException {
+ public void read(final ProcedureWALFile log) throws IOException {
+ localTracker = log.getTracker().isPartial() ? log.getTracker() : null;
+ if (localTracker != null) {
+ LOG.info("Rebuilding tracker for log - " + log);
+ }
+
FSDataInputStream stream = log.getStream();
try {
boolean hasMore = true;
@@ -121,7 +142,6 @@ public class ProcedureWALFormatReader {
ProcedureWALEntry entry = ProcedureWALFormat.readEntry(stream);
if (entry == null) {
LOG.warn("nothing left to decode. exiting with missing EOF");
- hasMore = false;
break;
}
switch (entry.getType()) {
@@ -150,9 +170,13 @@ public class ProcedureWALFormatReader {
loader.markCorruptedWAL(log, e);
}
+ if (localTracker != null) {
+ localTracker.setPartialFlag(false);
+ }
if (!localProcedureMap.isEmpty()) {
log.setProcIds(localProcedureMap.getMinProcId(), localProcedureMap.getMaxProcId());
procedureMap.mergeTail(localProcedureMap);
+
//if (hasFastStartSupport) {
// TODO: Some procedure may be already runnables (see readInitEntry())
// (we can also check the "update map" in the log trackers)
@@ -164,7 +188,7 @@ public class ProcedureWALFormatReader {
}
}
- public void finalize(ProcedureWALFormat.Loader loader) throws IOException {
+ public void finish() throws IOException {
// notify the loader about the max proc ID
loader.setMaxProcId(maxProcId);
@@ -185,7 +209,12 @@ public class ProcedureWALFormatReader {
LOG.trace("read " + entry.getType() + " entry " + proc.getProcId());
}
localProcedureMap.add(proc);
- tracker.setDeleted(proc.getProcId(), false);
+ if (tracker.isPartial()) {
+ tracker.insert(proc.getProcId());
+ }
+ }
+ if (localTracker != null) {
+ localTracker.insert(proc.getProcId());
}
}
@@ -236,7 +265,13 @@ public class ProcedureWALFormatReader {
maxProcId = Math.max(maxProcId, procId);
localProcedureMap.remove(procId);
assert !procedureMap.contains(procId);
- tracker.setDeleted(procId, true);
+ if (tracker.isPartial()) {
+ tracker.setDeleted(procId, true);
+ }
+ if (localTracker != null) {
+ // In case there is only delete entry for this procedure in current log.
+ localTracker.setDeleted(procId, true);
+ }
}
private boolean isDeleted(final long procId) {
@@ -264,7 +299,7 @@ public class ProcedureWALFormatReader {
// unlinkFromLinkList = None
// ==========================================================================
private static class Entry {
- // hash-table next
+ // For bucketed linked lists in hash-table.
protected Entry hashNext;
// child head
protected Entry childHead;
@@ -511,6 +546,8 @@ public class ProcedureWALFormatReader {
childUnlinkedHead = other.childUnlinkedHead;
}
}
+ maxProcId = Math.max(maxProcId, other.maxProcId);
+ minProcId = Math.max(minProcId, other.minProcId);
other.clear();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b2eac0da/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
index 0cfe4b0..bcd4e5f 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
@@ -30,7 +30,6 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
-import java.util.ListIterator;
import java.util.Set;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;
@@ -881,24 +880,22 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
private void closeCurrentLogStream() {
+ if (stream == null) return;
try {
- if (stream != null) {
- try {
- ProcedureWALFile log = logs.getLast();
- log.setProcIds(storeTracker.getUpdatedMinProcId(), storeTracker.getUpdatedMaxProcId());
- log.updateLocalTracker(storeTracker);
- long trailerSize = ProcedureWALFormat.writeTrailer(stream, storeTracker);
- log.addToSize(trailerSize);
- } catch (IOException e) {
- LOG.warn("Unable to write the trailer: " + e.getMessage());
- }
- stream.close();
- }
+ ProcedureWALFile log = logs.getLast();
+ log.setProcIds(storeTracker.getUpdatedMinProcId(), storeTracker.getUpdatedMaxProcId());
+ log.updateLocalTracker(storeTracker);
+ long trailerSize = ProcedureWALFormat.writeTrailer(stream, storeTracker);
+ log.addToSize(trailerSize);
+ } catch (IOException e) {
+ LOG.warn("Unable to write the trailer: " + e.getMessage());
+ }
+ try {
+ stream.close();
} catch (IOException e) {
LOG.error("Unable to close the stream", e);
- } finally {
- stream = null;
}
+ stream = null;
}
// ==========================================================================
@@ -1058,11 +1055,15 @@ public class WALProcedureStore extends ProcedureStoreBase {
return maxLogId;
}
+ /**
+ * If last log's tracker is not null, use it as {@link #storeTracker}.
+ * Otherwise, set storeTracker as partial, and let {@link ProcedureWALFormatReader} rebuild
+ * it using entries in the log.
+ */
private void initTrackerFromOldLogs() {
- // TODO: Load the most recent tracker available
if (logs.isEmpty()) return;
ProcedureWALFile log = logs.getLast();
- if (log.getTracker() != null) {
+ if (!log.getTracker().isPartial()) {
storeTracker.resetTo(log.getTracker());
} else {
storeTracker.reset();
@@ -1074,7 +1075,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
* Loads given log file and it's tracker.
*/
private ProcedureWALFile initOldLog(final FileStatus logFile) throws IOException {
- ProcedureWALFile log = new ProcedureWALFile(fs, logFile);
+ final ProcedureWALFile log = new ProcedureWALFile(fs, logFile);
if (logFile.getLen() == 0) {
LOG.warn("Remove uninitialized log: " + logFile);
log.removeFile();
@@ -1095,20 +1096,15 @@ public class WALProcedureStore extends ProcedureStoreBase {
throw new IOException(msg, e);
}
- if (log.isCompacted()) {
- try {
- log.readTrailer();
- } catch (IOException e) {
- LOG.warn("Unfinished compacted log: " + logFile, e);
- log.removeFile();
- return null;
- }
- }
try {
log.readTracker();
} catch (IOException e) {
+ log.getTracker().reset();
+ log.getTracker().setPartialFlag(true);
LOG.warn("Unable to read tracker for " + log + " - " + e.getMessage());
}
+
+ log.close();
return log;
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b2eac0da/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
index 2e2a038..5353d62 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
@@ -360,6 +360,88 @@ public class TestWALProcedureStore {
assertEquals(0, loader.getCorruptedCount());
}
+ void assertUpdated(final ProcedureStoreTracker tracker, Procedure[] procs,
+ int[] updatedProcs, int[] nonUpdatedProcs) {
+ for (int index : updatedProcs) {
+ long procId = procs[index].getProcId();
+ assertTrue("Procedure id : " + procId, tracker.isUpdated(procId));
+ }
+ for (int index : nonUpdatedProcs) {
+ long procId = procs[index].getProcId();
+ assertFalse("Procedure id : " + procId, tracker.isUpdated(procId));
+ }
+ }
+
+ void assertDeleted(final ProcedureStoreTracker tracker, Procedure[] procs,
+ int[] deletedProcs, int[] nonDeletedProcs) {
+ for (int index : deletedProcs) {
+ long procId = procs[index].getProcId();
+ assertEquals("Procedure id : " + procId,
+ ProcedureStoreTracker.DeleteState.YES, tracker.isDeleted(procId));
+ }
+ for (int index : nonDeletedProcs) {
+ long procId = procs[index].getProcId();
+ assertEquals("Procedure id : " + procId,
+ ProcedureStoreTracker.DeleteState.NO, tracker.isDeleted(procId));
+ }
+ }
+
+ @Test
+ public void testCorruptedTrailersRebuild() throws Exception {
+ final Procedure[] procs = new Procedure[6];
+ for (int i = 0; i < procs.length; ++i) {
+ procs[i] = new TestSequentialProcedure();
+ }
+ // Log State (I=insert, U=updated, D=delete)
+ // | log 1 | log 2 | log 3 |
+ // 0 | I, D | | |
+ // 1 | I | | |
+ // 2 | I | D | |
+ // 3 | I | U | |
+ // 4 | | I | D |
+ // 5 | | | I |
+ procStore.insert(procs[0], null);
+ procStore.insert(procs[1], null);
+ procStore.insert(procs[2], null);
+ procStore.insert(procs[3], null);
+ procStore.delete(procs[0], null);
+ procStore.rollWriterForTesting();
+ procStore.delete(procs[2], null);
+ procStore.update(procs[3]);
+ procStore.insert(procs[4], null);
+ procStore.rollWriterForTesting();
+ procStore.delete(procs[4], null);
+ procStore.insert(procs[5], null);
+
+ // Stop the store
+ procStore.stop(false);
+
+ // Remove 4 byte from the trailers
+ final FileStatus[] logs = fs.listStatus(logDir);
+ assertEquals(3, logs.length);
+ for (int i = 0; i < logs.length; ++i) {
+ corruptLog(logs[i], 4);
+ }
+
+ // Restart the store
+ final LoadCounter loader = new LoadCounter();
+ storeRestart(loader);
+ assertEquals(3, loader.getLoadedCount()); // procs 1, 3 and 5
+ assertEquals(0, loader.getCorruptedCount());
+
+ // Check the Trackers
+ final ArrayList<ProcedureWALFile> walFiles = procStore.getActiveLogs();
+ assertEquals(4, walFiles.size());
+ LOG.info("Checking wal " + walFiles.get(0));
+ assertUpdated(walFiles.get(0).getTracker(), procs, new int[]{0, 1, 2, 3}, new int[] {4, 5});
+ LOG.info("Checking wal " + walFiles.get(1));
+ assertUpdated(walFiles.get(1).getTracker(), procs, new int[]{2, 3, 4}, new int[] {0, 1, 5});
+ LOG.info("Checking wal " + walFiles.get(2));
+ assertUpdated(walFiles.get(2).getTracker(), procs, new int[]{4, 5}, new int[] {0, 1, 2, 3});
+ LOG.info("Checking global tracker ");
+ assertDeleted(procStore.getStoreTracker(), procs, new int[]{0, 2, 4}, new int[] {1, 3, 5});
+ }
+
@Test
public void testCorruptedEntries() throws Exception {
// Insert something