You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by vj...@apache.org on 2020/06/22 18:13:27 UTC
[hbase] branch branch-1 updated: HBASE-24380 : Provide WAL
splitting journal logging (#1860) (#1939)
This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-1 by this push:
new d144923 HBASE-24380 : Provide WAL splitting journal logging (#1860) (#1939)
d144923 is described below
commit d1449231f06f670f90a230ec36807a3a76850a75
Author: Viraj Jasani <vj...@apache.org>
AuthorDate: Mon Jun 22 23:43:12 2020 +0530
HBASE-24380 : Provide WAL splitting journal logging (#1860) (#1939)
Signed-off-by: Andrew Purtell <ap...@apache.org>
---
.../org/apache/hadoop/hbase/wal/WALSplitter.java | 138 +++++++++++++++------
1 file changed, 101 insertions(+), 37 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
index f769e6d..9273b6a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
@@ -312,13 +312,14 @@ public class WALSplitter {
status =
TaskMonitor.get().createStatus(
"Splitting log file " + logfile.getPath() + "into a temporary staging area.");
+ status.enableStatusJournal(true);
Reader in = null;
this.fileBeingSplit = logfile;
try {
long logLength = logfile.getLen();
LOG.info("Splitting wal: " + logPath + ", length=" + logLength);
LOG.info("DistributedLogReplay = " + this.distributedLogReplay);
- status.setStatus("Opening log file");
+ status.setStatus("Opening log file " + logPath);
if (reporter != null && !reporter.progress()) {
progress_failed = true;
return false;
@@ -346,6 +347,7 @@ public class WALSplitter {
int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3);
int numOpenedFilesLastCheck = 0;
outputSink.setReporter(reporter);
+ outputSink.setStatus(status);
outputSink.startWriterThreads();
outputSinkStarted = true;
Entry entry;
@@ -436,7 +438,9 @@ public class WALSplitter {
e = RemoteExceptionHandler.checkIOException(e);
throw e;
} finally {
- LOG.debug("Finishing writing output logs and closing down.");
+ final String log = "Finishing writing output logs and closing down";
+ LOG.debug(log);
+ status.setStatus(log);
try {
if (null != in) {
in.close();
@@ -460,6 +464,10 @@ public class WALSplitter {
", corrupted=" + isCorrupted + ", progress failed=" + progress_failed;
LOG.info(msg);
status.markComplete(msg);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("WAL split completed for " + logPath + " , Journal Log: "
+ + status.prettyPrintJournal());
+ }
}
}
return !progress_failed;
@@ -1210,6 +1218,8 @@ public class WALSplitter {
protected List<Path> splits = null;
+ protected MonitoredTask status = null;
+
public OutputSink(PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
numThreads = numWriters;
this.controller = controller;
@@ -1262,6 +1272,10 @@ public class WALSplitter {
return this.skippedEdits.get();
}
+ void setStatus(MonitoredTask status) {
+ this.status = status;
+ }
+
/**
* Wait for writer threads to dump all info to the sink
* @return true when there is no error
@@ -1292,7 +1306,9 @@ public class WALSplitter {
}
}
controller.checkForErrors();
- LOG.info(this.writerThreads.size() + " split writers finished; closing...");
+ final String msg = this.writerThreads.size() + " split writer threads finished";
+ LOG.info(msg);
+ updateStatusWithMsg(msg);
return (!progress_failed);
}
@@ -1329,6 +1345,17 @@ public class WALSplitter {
* @return Return true if this sink wants to accept this region-level WALEdit.
*/
public abstract boolean keepRegionEvent(Entry entry);
+
+ /**
+ * Set status message in {@link MonitoredTask} instance that is set in this OutputSink
+ *
+ * @param msg message to update the status with
+ */
+ protected final void updateStatusWithMsg(String msg) {
+ if (status != null) {
+ status.setStatus(msg);
+ }
+ }
}
/**
@@ -1386,19 +1413,29 @@ public class WALSplitter {
}
}
if (wap.minLogSeqNum < dstMinLogSeqNum) {
- LOG.warn("Found existing old edits file. It could be the result of a previous failed"
+ final String errorMsg =
+ "Found existing old edits file. It could be the result of a previous failed"
+ " split attempt or we have duplicated wal entries. Deleting " + dst + ", length="
- + walFS.getFileStatus(dst).getLen());
+ + walFS.getFileStatus(dst).getLen();
+ LOG.warn(errorMsg);
+ updateStatusWithMsg(errorMsg);
if (!walFS.delete(dst, false)) {
- LOG.warn("Failed deleting of old " + dst);
+ final String msg = "Failed deleting of old " + dst;
+ LOG.warn(msg);
+ updateStatusWithMsg(msg);
throw new IOException("Failed deleting of old " + dst);
}
} else {
- LOG.warn("Found existing old edits file and we have less entries. Deleting " + wap.p
- + ", length=" + walFS.getFileStatus(wap.p).getLen());
+ final String errorMsg =
+ "Found existing old edits file and we have less entries. Deleting " + wap.p + ", length="
+ + walFS.getFileStatus(wap.p).getLen();
+ LOG.warn(errorMsg);
+ updateStatusWithMsg(errorMsg);
if (!walFS.delete(wap.p, false)) {
- LOG.warn("Failed deleting of " + wap.p);
- throw new IOException("Failed deleting of " + wap.p);
+ final String failureMsg = "Failed deleting of " + wap.p;
+ LOG.warn(failureMsg);
+ updateStatusWithMsg(failureMsg);
+ throw new IOException(failureMsg);
}
}
}
@@ -1484,19 +1521,24 @@ public class WALSplitter {
try {
wap.w.close();
} catch (IOException ioe) {
- LOG.error("Couldn't close log at " + wap.p, ioe);
+ final String errorMsg = "Couldn't close log at " + wap.p;
+ LOG.error(errorMsg, ioe);
+ updateStatusWithMsg(errorMsg);
thrown.add(ioe);
return null;
}
+ final String msg =
+ "Closed wap " + wap.p + " (wrote " + wap.editsWritten + " edits, skipped "
+ + wap.editsSkipped + " edits in " + (wap.nanosSpent / 1000 / 1000) + "ms";
if (LOG.isDebugEnabled()) {
- LOG.debug("Closed wap " + wap.p + " (wrote " + wap.editsWritten
- + " edits, skipped " + wap.editsSkipped + " edits in "
- + (wap.nanosSpent / 1000 / 1000) + "ms");
+ LOG.debug(msg);
}
+ updateStatusWithMsg(msg);
if (wap.editsWritten == 0) {
// just remove the empty recovered.edits file
if (walFS.exists(wap.p) && !walFS.delete(wap.p, false)) {
- LOG.warn("Failed deleting empty " + wap.p);
+ final String errorMsg = "Failed deleting empty " + wap.p;
+ LOG.warn(errorMsg);
throw new IOException("Failed deleting empty " + wap.p);
}
return null;
@@ -1513,12 +1555,18 @@ public class WALSplitter {
// TestHLogSplit#testThreading is an example.
if (walFS.exists(wap.p)) {
if (!walFS.rename(wap.p, dst)) {
- throw new IOException("Failed renaming " + wap.p + " to " + dst);
+ final String errorMsg = "Failed renaming " + wap.p + " to " + dst;
+ updateStatusWithMsg(errorMsg);
+ throw new IOException(errorMsg);
}
- LOG.info("Rename " + wap.p + " to " + dst);
+ final String renameLog = "Rename " + wap.p + " to " + dst;
+ LOG.info(renameLog);
+ updateStatusWithMsg(renameLog);
}
} catch (IOException ioe) {
- LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe);
+ final String errorMsg = "Couldn't rename " + wap.p + " to " + dst;
+ LOG.error(errorMsg, ioe);
+ updateStatusWithMsg(errorMsg);
thrown.add(ioe);
return null;
}
@@ -1555,13 +1603,17 @@ public class WALSplitter {
wap = (WriterAndPath) tmpWAP;
wap.w.close();
} catch (IOException ioe) {
- LOG.error("Couldn't close log at " + wap.p, ioe);
+ final String errorMsg = "Couldn't close log at " + wap.p;
+ LOG.error(errorMsg, ioe);
+ updateStatusWithMsg(errorMsg);
thrown.add(ioe);
continue;
}
- LOG.info(
- "Closed log " + wap.p + " (wrote " + wap.editsWritten + " edits in " + (wap.nanosSpent
- / 1000 / 1000) + "ms)");
+ final String msg =
+ "Closed log " + wap.p + " (wrote " + wap.editsWritten + " edits in " + (wap.nanosSpent
+ / 1000 / 1000) + "ms)";
+ LOG.info(msg);
+ updateStatusWithMsg(msg);
}
writersClosed = true;
}
@@ -1610,15 +1662,21 @@ public class WALSplitter {
return null;
}
if (walFS.exists(regionedits)) {
- LOG.warn("Found old edits file. It could be the "
- + "result of a previous failed split attempt. Deleting " + regionedits + ", length="
- + walFS.getFileStatus(regionedits).getLen());
+ final String warnMsg = "Found old edits file. It could be the "
+ + "result of a previous failed split attempt. Deleting " + regionedits + ", length="
+ + walFS.getFileStatus(regionedits).getLen();
+ LOG.warn(warnMsg);
+ updateStatusWithMsg(warnMsg);
if (!walFS.delete(regionedits, false)) {
- LOG.warn("Failed delete of old " + regionedits);
+ final String errorMsg = "Failed delete of old " + regionedits;
+ LOG.warn(errorMsg);
+ updateStatusWithMsg(errorMsg);
}
}
Writer w = createWriter(regionedits);
- LOG.debug("Creating writer path=" + regionedits);
+ final String msg = "Creating writer path=" + regionedits;
+ LOG.debug(msg);
+ updateStatusWithMsg(msg);
return new WriterAndPath(regionedits, w, entry.getKey().getLogSeqNum());
}
@@ -1666,10 +1724,10 @@ public class WALSplitter {
WriterAndPath wap = null;
long startTime = System.nanoTime();
- try {
- int editsCount = 0;
+ int editsCount = 0;
- for (Entry logEntry : entries) {
+ for (Entry logEntry : entries) {
+ try {
if (wap == null) {
wap = getWriterAndPath(logEntry, reusable);
if (wap == null) {
@@ -1687,18 +1745,24 @@ public class WALSplitter {
} else {
wap.incrementSkippedEdits(1);
}
+ } catch (IOException e) {
+ logAndThrowWriterAppendFailure(logEntry, e);
}
- // Pass along summary statistics
- wap.incrementEdits(editsCount);
- wap.incrementNanoTime(System.nanoTime() - startTime);
- } catch (IOException e) {
- e = RemoteExceptionHandler.checkIOException(e);
- LOG.fatal(" Got while writing log entry to log", e);
- throw e;
}
+ // Pass along summary statistics
+ wap.incrementEdits(editsCount);
+ wap.incrementNanoTime(System.nanoTime() - startTime);
return wap;
}
+ private void logAndThrowWriterAppendFailure(Entry logEntry, IOException e) throws IOException {
+ e = RemoteExceptionHandler.checkIOException(e);
+ final String errorMsg = "Failed to write log entry " + logEntry.toString() + " to log";
+ LOG.fatal(errorMsg, e);
+ updateStatusWithMsg(errorMsg);
+ throw e;
+ }
+
@Override
public boolean keepRegionEvent(Entry entry) {
ArrayList<Cell> cells = entry.getEdit().getCells();