You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2020/06/19 23:33:53 UTC
[hbase] branch branch-2 updated: HBASE-24380 : Provide WAL
splitting journal logging (#1860)
This is an automated email from the ASF dual-hosted git repository.
apurtell pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 48fda91 HBASE-24380 : Provide WAL splitting journal logging (#1860)
48fda91 is described below
commit 48fda91c5b45c394455c8fd1d12f4e740fb50404
Author: Viraj Jasani <vj...@apache.org>
AuthorDate: Sat Jun 20 04:55:03 2020 +0530
HBASE-24380 : Provide WAL splitting journal logging (#1860)
Signed-off-by: Andrew Purtell <ap...@apache.org>
Conflicts:
hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredEditsOutputSink.java
hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredHFilesOutputSink.java
hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RecoveredEditsOutputSink.java
---
.../wal/AbstractRecoveredEditsOutputSink.java | 82 ++++++++++++++--------
.../hbase/wal/BoundedRecoveredEditsOutputSink.java | 2 +-
.../wal/BoundedRecoveredHFilesOutputSink.java | 2 +-
.../org/apache/hadoop/hbase/wal/OutputSink.java | 22 +++++-
.../hadoop/hbase/wal/RecoveredEditsOutputSink.java | 5 +-
.../org/apache/hadoop/hbase/wal/WALSplitter.java | 12 +++-
6 files changed, 88 insertions(+), 37 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractRecoveredEditsOutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractRecoveredEditsOutputSink.java
index da952eb..0da082a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractRecoveredEditsOutputSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractRecoveredEditsOutputSink.java
@@ -57,7 +57,7 @@ abstract class AbstractRecoveredEditsOutputSink extends OutputSink {
* @return a writer that wraps a {@link WALProvider.Writer} and its Path. Caller should close.
*/
protected RecoveredEditsWriter createRecoveredEditsWriter(TableName tableName, byte[] region,
- long seqId) throws IOException {
+ long seqId) throws IOException {
Path regionEditsPath = getRegionSplitEditsPath(tableName, region, seqId,
walSplitter.getFileBeingSplit().getPath().getName(), walSplitter.getTmpDirName(),
walSplitter.conf);
@@ -70,27 +70,35 @@ abstract class AbstractRecoveredEditsOutputSink extends OutputSink {
}
}
WALProvider.Writer w = walSplitter.createWriter(regionEditsPath);
- LOG.info("Creating recovered edits writer path={}", regionEditsPath);
+ final String msg = "Creating recovered edits writer path=" + regionEditsPath;
+ LOG.info(msg);
+ updateStatusWithMsg(msg);
return new RecoveredEditsWriter(region, regionEditsPath, w, seqId);
}
protected Path closeRecoveredEditsWriter(RecoveredEditsWriter editsWriter,
- List<IOException> thrown) throws IOException {
+ List<IOException> thrown) throws IOException {
try {
editsWriter.writer.close();
} catch (IOException ioe) {
- LOG.error("Could not close recovered edits at {}", editsWriter.path, ioe);
+ final String errorMsg = "Could not close recovered edits at " + editsWriter.path;
+ LOG.error(errorMsg, ioe);
+ updateStatusWithMsg(errorMsg);
thrown.add(ioe);
return null;
}
- LOG.info("Closed recovered edits writer path={} (wrote {} edits, skipped {} edits in {} ms",
- editsWriter.path, editsWriter.editsWritten, editsWriter.editsSkipped,
- editsWriter.nanosSpent / 1000 / 1000);
+ final String msg = "Closed recovered edits writer path=" + editsWriter.path + " (wrote "
+ + editsWriter.editsWritten + " edits, skipped " + editsWriter.editsSkipped + " edits in " + (
+ editsWriter.nanosSpent / 1000 / 1000) + " ms)";
+ LOG.info(msg);
+ updateStatusWithMsg(msg);
if (editsWriter.editsWritten == 0) {
// just remove the empty recovered.edits file
- if (walSplitter.walFS.exists(editsWriter.path) &&
- !walSplitter.walFS.delete(editsWriter.path, false)) {
- LOG.warn("Failed deleting empty {}", editsWriter.path);
+ if (walSplitter.walFS.exists(editsWriter.path)
+ && !walSplitter.walFS.delete(editsWriter.path, false)) {
+ final String errorMsg = "Failed deleting empty " + editsWriter.path;
+ LOG.warn(errorMsg);
+ updateStatusWithMsg(errorMsg);
throw new IOException("Failed deleting empty " + editsWriter.path);
}
return null;
@@ -107,13 +115,20 @@ abstract class AbstractRecoveredEditsOutputSink extends OutputSink {
// TestHLogSplit#testThreading is an example.
if (walSplitter.walFS.exists(editsWriter.path)) {
if (!walSplitter.walFS.rename(editsWriter.path, dst)) {
- throw new IOException(
- "Failed renaming recovered edits " + editsWriter.path + " to " + dst);
+ final String errorMsg =
+ "Failed renaming recovered edits " + editsWriter.path + " to " + dst;
+ updateStatusWithMsg(errorMsg);
+ throw new IOException(errorMsg);
}
- LOG.info("Rename recovered edits {} to {}", editsWriter.path, dst);
+ final String renameEditMsg = "Rename recovered edits " + editsWriter.path + " to " + dst;
+ LOG.info(renameEditMsg);
+ updateStatusWithMsg(renameEditMsg);
}
} catch (IOException ioe) {
- LOG.error("Could not rename recovered edits {} to {}", editsWriter.path, dst, ioe);
+ final String errorMsg = "Could not rename recovered edits " + editsWriter.path
+ + " to " + dst;
+ LOG.error(errorMsg, ioe);
+ updateStatusWithMsg(errorMsg);
thrown.add(ioe);
return null;
}
@@ -216,26 +231,33 @@ abstract class AbstractRecoveredEditsOutputSink extends OutputSink {
void writeRegionEntries(List<WAL.Entry> entries) throws IOException {
long startTime = System.nanoTime();
- try {
- int editsCount = 0;
- for (WAL.Entry logEntry : entries) {
- filterCellByStore(logEntry);
- if (!logEntry.getEdit().isEmpty()) {
+ int editsCount = 0;
+ for (WAL.Entry logEntry : entries) {
+ filterCellByStore(logEntry);
+ if (!logEntry.getEdit().isEmpty()) {
+ try {
writer.append(logEntry);
- updateRegionMaximumEditLogSeqNum(logEntry);
- editsCount++;
- } else {
- incrementSkippedEdits(1);
+ } catch (IOException e) {
+ logAndThrowWriterAppendFailure(logEntry, e);
}
+ updateRegionMaximumEditLogSeqNum(logEntry);
+ editsCount++;
+ } else {
+ incrementSkippedEdits(1);
}
- // Pass along summary statistics
- incrementEdits(editsCount);
- incrementNanoTime(System.nanoTime() - startTime);
- } catch (IOException e) {
- e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
- LOG.error(HBaseMarkers.FATAL, "Got while writing log entry to log", e);
- throw e;
}
+ // Pass along summary statistics
+ incrementEdits(editsCount);
+ incrementNanoTime(System.nanoTime() - startTime);
+ }
+
+ private void logAndThrowWriterAppendFailure(WAL.Entry logEntry, IOException e)
+ throws IOException {
+ e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
+ final String errorMsg = "Failed to write log entry " + logEntry.toString() + " to log";
+ LOG.error(HBaseMarkers.FATAL, errorMsg, e);
+ updateStatusWithMsg(errorMsg);
+ throw e;
}
private void filterCellByStore(WAL.Entry logEntry) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredEditsOutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredEditsOutputSink.java
index 0ed7c20..9532fd7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredEditsOutputSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredEditsOutputSink.java
@@ -86,7 +86,7 @@ class BoundedRecoveredEditsOutputSink extends AbstractRecoveredEditsOutputSink {
public List<Path> close() throws IOException {
boolean isSuccessful = true;
try {
- isSuccessful &= finishWriterThreads(false);
+ isSuccessful = finishWriterThreads(false);
} finally {
isSuccessful &= writeRemainingEntryBuffers();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredHFilesOutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredHFilesOutputSink.java
index 1d6fc4a..9b45821 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredHFilesOutputSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredHFilesOutputSink.java
@@ -131,7 +131,7 @@ public class BoundedRecoveredHFilesOutputSink extends OutputSink {
public List<Path> close() throws IOException {
boolean isSuccessful = true;
try {
- isSuccessful &= finishWriterThreads(false);
+ isSuccessful = finishWriterThreads(false);
} finally {
isSuccessful &= writeRemainingEntryBuffers();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java
index bdc7772..60a190a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.yetus.audience.InterfaceAudience;
@@ -61,6 +62,8 @@ public abstract class OutputSink {
*/
protected final List<Path> splits = new ArrayList<>();
+ protected MonitoredTask status = null;
+
/**
* Used when close this output sink.
*/
@@ -81,6 +84,10 @@ public abstract class OutputSink {
this.reporter = reporter;
}
+ void setStatus(MonitoredTask status) {
+ this.status = status;
+ }
+
/**
* Start the threads that will pump data from the entryBuffers to the output files.
*/
@@ -135,7 +142,9 @@ public abstract class OutputSink {
}
}
controller.checkForErrors();
- LOG.info("{} split writer threads finished", this.writerThreads.size());
+ final String msg = this.writerThreads.size() + " split writer threads finished";
+ LOG.info(msg);
+ updateStatusWithMsg(msg);
return (!progressFailed);
}
@@ -150,6 +159,7 @@ public abstract class OutputSink {
/**
* @param buffer A buffer of some number of edits for a given region.
+ * @throws IOException For any IO errors
*/
protected abstract void append(EntryBuffers.RegionEntryBuffer buffer) throws IOException;
@@ -172,6 +182,16 @@ public abstract class OutputSink {
*/
protected abstract boolean keepRegionEvent(WAL.Entry entry);
+ /**
+ * Set status message in {@link MonitoredTask} instance that is set in this OutputSink
+ * @param msg message to update the status with
+ */
+ protected final void updateStatusWithMsg(String msg) {
+ if (status != null) {
+ status.setStatus(msg);
+ }
+ }
+
public static class WriterThread extends Thread {
private volatile boolean shouldStop = false;
private WALSplitter.PipelineController controller;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RecoveredEditsOutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RecoveredEditsOutputSink.java
index 798c716..2ac0cea 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RecoveredEditsOutputSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RecoveredEditsOutputSink.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.yetus.audience.InterfaceAudience;
@@ -74,7 +75,7 @@ class RecoveredEditsOutputSink extends AbstractRecoveredEditsOutputSink {
* @return null if this region shouldn't output any logs
*/
private RecoveredEditsWriter getRecoveredEditsWriter(TableName tableName, byte[] region,
- long seqId) throws IOException {
+ long seqId) throws IOException {
RecoveredEditsWriter ret = writers.get(Bytes.toString(region));
if (ret != null) {
return ret;
@@ -92,7 +93,7 @@ class RecoveredEditsOutputSink extends AbstractRecoveredEditsOutputSink {
public List<Path> close() throws IOException {
boolean isSuccessful = true;
try {
- isSuccessful &= finishWriterThreads(false);
+ isSuccessful = finishWriterThreads(false);
} finally {
isSuccessful &= closeWriters();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
index ae0347f..d7fd21f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
@@ -269,6 +269,7 @@ public class WALSplitter {
status = TaskMonitor.get().createStatus(
"Splitting log file " + logfile.getPath() + "into a temporary staging area.");
+ status.enableStatusJournal(true);
Reader logFileReader = null;
this.fileBeingSplit = logfile;
long startTS = EnvironmentEdgeManager.currentTime();
@@ -276,7 +277,7 @@ public class WALSplitter {
long logLength = logfile.getLen();
LOG.info("Splitting WAL={}, size={} ({} bytes)", logPath, StringUtils.humanSize(logLength),
logLength);
- status.setStatus("Opening log file");
+ status.setStatus("Opening log file " + logPath);
if (reporter != null && !reporter.progress()) {
progressFailed = true;
return false;
@@ -291,6 +292,7 @@ public class WALSplitter {
int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3);
int numOpenedFilesLastCheck = 0;
outputSink.setReporter(reporter);
+ outputSink.setStatus(status);
outputSink.startWriterThreads();
outputSinkStarted = true;
Entry entry;
@@ -375,7 +377,9 @@ public class WALSplitter {
e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
throw e;
} finally {
- LOG.debug("Finishing writing output logs and closing down");
+ final String log = "Finishing writing output logs and closing down";
+ LOG.debug(log);
+ status.setStatus(log);
try {
if (null != logFileReader) {
logFileReader.close();
@@ -400,6 +404,10 @@ public class WALSplitter {
", corrupted=" + isCorrupted + ", progress failed=" + progressFailed;
LOG.info(msg);
status.markComplete(msg);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("WAL split completed for {} , Journal Log: {}", logPath,
+ status.prettyPrintJournal());
+ }
}
}
return !progressFailed;