You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2020/06/19 23:33:53 UTC

[hbase] branch branch-2 updated: HBASE-24380 : Provide WAL splitting journal logging (#1860)

This is an automated email from the ASF dual-hosted git repository.

apurtell pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 48fda91  HBASE-24380 : Provide WAL splitting journal logging (#1860)
48fda91 is described below

commit 48fda91c5b45c394455c8fd1d12f4e740fb50404
Author: Viraj Jasani <vj...@apache.org>
AuthorDate: Sat Jun 20 04:55:03 2020 +0530

    HBASE-24380 : Provide WAL splitting journal logging (#1860)
    
    Signed-off-by: Andrew Purtell <ap...@apache.org>
    
    Conflicts:
    	hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredEditsOutputSink.java
    	hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredHFilesOutputSink.java
    	hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RecoveredEditsOutputSink.java
---
 .../wal/AbstractRecoveredEditsOutputSink.java      | 82 ++++++++++++++--------
 .../hbase/wal/BoundedRecoveredEditsOutputSink.java |  2 +-
 .../wal/BoundedRecoveredHFilesOutputSink.java      |  2 +-
 .../org/apache/hadoop/hbase/wal/OutputSink.java    | 22 +++++-
 .../hadoop/hbase/wal/RecoveredEditsOutputSink.java |  5 +-
 .../org/apache/hadoop/hbase/wal/WALSplitter.java   | 12 +++-
 6 files changed, 88 insertions(+), 37 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractRecoveredEditsOutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractRecoveredEditsOutputSink.java
index da952eb..0da082a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractRecoveredEditsOutputSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractRecoveredEditsOutputSink.java
@@ -57,7 +57,7 @@ abstract class AbstractRecoveredEditsOutputSink extends OutputSink {
    * @return a writer that wraps a {@link WALProvider.Writer} and its Path. Caller should close.
    */
   protected RecoveredEditsWriter createRecoveredEditsWriter(TableName tableName, byte[] region,
-    long seqId) throws IOException {
+      long seqId) throws IOException {
     Path regionEditsPath = getRegionSplitEditsPath(tableName, region, seqId,
       walSplitter.getFileBeingSplit().getPath().getName(), walSplitter.getTmpDirName(),
       walSplitter.conf);
@@ -70,27 +70,35 @@ abstract class AbstractRecoveredEditsOutputSink extends OutputSink {
       }
     }
     WALProvider.Writer w = walSplitter.createWriter(regionEditsPath);
-    LOG.info("Creating recovered edits writer path={}", regionEditsPath);
+    final String msg = "Creating recovered edits writer path=" + regionEditsPath;
+    LOG.info(msg);
+    updateStatusWithMsg(msg);
     return new RecoveredEditsWriter(region, regionEditsPath, w, seqId);
   }
 
   protected Path closeRecoveredEditsWriter(RecoveredEditsWriter editsWriter,
-    List<IOException> thrown) throws IOException {
+      List<IOException> thrown) throws IOException {
     try {
       editsWriter.writer.close();
     } catch (IOException ioe) {
-      LOG.error("Could not close recovered edits at {}", editsWriter.path, ioe);
+      final String errorMsg = "Could not close recovered edits at " + editsWriter.path;
+      LOG.error(errorMsg, ioe);
+      updateStatusWithMsg(errorMsg);
       thrown.add(ioe);
       return null;
     }
-    LOG.info("Closed recovered edits writer path={} (wrote {} edits, skipped {} edits in {} ms",
-      editsWriter.path, editsWriter.editsWritten, editsWriter.editsSkipped,
-      editsWriter.nanosSpent / 1000 / 1000);
+    final String msg = "Closed recovered edits writer path=" + editsWriter.path + " (wrote "
+      + editsWriter.editsWritten + " edits, skipped " + editsWriter.editsSkipped + " edits in " + (
+      editsWriter.nanosSpent / 1000 / 1000) + " ms)";
+    LOG.info(msg);
+    updateStatusWithMsg(msg);
     if (editsWriter.editsWritten == 0) {
       // just remove the empty recovered.edits file
-      if (walSplitter.walFS.exists(editsWriter.path) &&
-        !walSplitter.walFS.delete(editsWriter.path, false)) {
-        LOG.warn("Failed deleting empty {}", editsWriter.path);
+      if (walSplitter.walFS.exists(editsWriter.path)
+          && !walSplitter.walFS.delete(editsWriter.path, false)) {
+        final String errorMsg = "Failed deleting empty " + editsWriter.path;
+        LOG.warn(errorMsg);
+        updateStatusWithMsg(errorMsg);
         throw new IOException("Failed deleting empty  " + editsWriter.path);
       }
       return null;
@@ -107,13 +115,20 @@ abstract class AbstractRecoveredEditsOutputSink extends OutputSink {
       // TestHLogSplit#testThreading is an example.
       if (walSplitter.walFS.exists(editsWriter.path)) {
         if (!walSplitter.walFS.rename(editsWriter.path, dst)) {
-          throw new IOException(
-            "Failed renaming recovered edits " + editsWriter.path + " to " + dst);
+          final String errorMsg =
+            "Failed renaming recovered edits " + editsWriter.path + " to " + dst;
+          updateStatusWithMsg(errorMsg);
+          throw new IOException(errorMsg);
         }
-        LOG.info("Rename recovered edits {} to {}", editsWriter.path, dst);
+        final String renameEditMsg = "Rename recovered edits " + editsWriter.path + " to " + dst;
+        LOG.info(renameEditMsg);
+        updateStatusWithMsg(renameEditMsg);
       }
     } catch (IOException ioe) {
-      LOG.error("Could not rename recovered edits {} to {}", editsWriter.path, dst, ioe);
+      final String errorMsg = "Could not rename recovered edits " + editsWriter.path
+        + " to " + dst;
+      LOG.error(errorMsg, ioe);
+      updateStatusWithMsg(errorMsg);
       thrown.add(ioe);
       return null;
     }
@@ -216,26 +231,33 @@ abstract class AbstractRecoveredEditsOutputSink extends OutputSink {
 
     void writeRegionEntries(List<WAL.Entry> entries) throws IOException {
       long startTime = System.nanoTime();
-      try {
-        int editsCount = 0;
-        for (WAL.Entry logEntry : entries) {
-          filterCellByStore(logEntry);
-          if (!logEntry.getEdit().isEmpty()) {
+      int editsCount = 0;
+      for (WAL.Entry logEntry : entries) {
+        filterCellByStore(logEntry);
+        if (!logEntry.getEdit().isEmpty()) {
+          try {
             writer.append(logEntry);
-            updateRegionMaximumEditLogSeqNum(logEntry);
-            editsCount++;
-          } else {
-            incrementSkippedEdits(1);
+          } catch (IOException e) {
+            logAndThrowWriterAppendFailure(logEntry, e);
           }
+          updateRegionMaximumEditLogSeqNum(logEntry);
+          editsCount++;
+        } else {
+          incrementSkippedEdits(1);
         }
-        // Pass along summary statistics
-        incrementEdits(editsCount);
-        incrementNanoTime(System.nanoTime() - startTime);
-      } catch (IOException e) {
-        e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
-        LOG.error(HBaseMarkers.FATAL, "Got while writing log entry to log", e);
-        throw e;
       }
+      // Pass along summary statistics
+      incrementEdits(editsCount);
+      incrementNanoTime(System.nanoTime() - startTime);
+    }
+
+    private void logAndThrowWriterAppendFailure(WAL.Entry logEntry, IOException e)
+        throws IOException {
+      e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
+      final String errorMsg = "Failed to write log entry " + logEntry.toString() + " to log";
+      LOG.error(HBaseMarkers.FATAL, errorMsg, e);
+      updateStatusWithMsg(errorMsg);
+      throw e;
     }
 
     private void filterCellByStore(WAL.Entry logEntry) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredEditsOutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredEditsOutputSink.java
index 0ed7c20..9532fd7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredEditsOutputSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredEditsOutputSink.java
@@ -86,7 +86,7 @@ class BoundedRecoveredEditsOutputSink extends AbstractRecoveredEditsOutputSink {
   public List<Path> close() throws IOException {
     boolean isSuccessful = true;
     try {
-      isSuccessful &= finishWriterThreads(false);
+      isSuccessful = finishWriterThreads(false);
     } finally {
       isSuccessful &= writeRemainingEntryBuffers();
     }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredHFilesOutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredHFilesOutputSink.java
index 1d6fc4a..9b45821 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredHFilesOutputSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredHFilesOutputSink.java
@@ -131,7 +131,7 @@ public class BoundedRecoveredHFilesOutputSink extends OutputSink {
   public List<Path> close() throws IOException {
     boolean isSuccessful = true;
     try {
-      isSuccessful &= finishWriterThreads(false);
+      isSuccessful = finishWriterThreads(false);
     } finally {
       isSuccessful &= writeRemainingEntryBuffers();
     }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java
index bdc7772..60a190a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -61,6 +62,8 @@ public abstract class OutputSink {
    */
   protected final List<Path> splits = new ArrayList<>();
 
+  protected MonitoredTask status = null;
+
   /**
    * Used when close this output sink.
    */
@@ -81,6 +84,10 @@ public abstract class OutputSink {
     this.reporter = reporter;
   }
 
+  void setStatus(MonitoredTask status) {
+    this.status = status;
+  }
+
   /**
    * Start the threads that will pump data from the entryBuffers to the output files.
    */
@@ -135,7 +142,9 @@ public abstract class OutputSink {
       }
     }
     controller.checkForErrors();
-    LOG.info("{} split writer threads finished", this.writerThreads.size());
+    final String msg = this.writerThreads.size() + " split writer threads finished";
+    LOG.info(msg);
+    updateStatusWithMsg(msg);
     return (!progressFailed);
   }
 
@@ -150,6 +159,7 @@ public abstract class OutputSink {
 
   /**
    * @param buffer A buffer of some number of edits for a given region.
+   * @throws IOException For any IO errors
    */
   protected abstract void append(EntryBuffers.RegionEntryBuffer buffer) throws IOException;
 
@@ -172,6 +182,16 @@ public abstract class OutputSink {
    */
   protected abstract boolean keepRegionEvent(WAL.Entry entry);
 
+  /**
+   * Set status message in {@link MonitoredTask} instance that is set in this OutputSink
+   * @param msg message to update the status with
+   */
+  protected final void updateStatusWithMsg(String msg) {
+    if (status != null) {
+      status.setStatus(msg);
+    }
+  }
+
   public static class WriterThread extends Thread {
     private volatile boolean shouldStop = false;
     private WALSplitter.PipelineController controller;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RecoveredEditsOutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RecoveredEditsOutputSink.java
index 798c716..2ac0cea 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RecoveredEditsOutputSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RecoveredEditsOutputSink.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.MultipleIOException;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -74,7 +75,7 @@ class RecoveredEditsOutputSink extends AbstractRecoveredEditsOutputSink {
    * @return null if this region shouldn't output any logs
    */
   private RecoveredEditsWriter getRecoveredEditsWriter(TableName tableName, byte[] region,
-    long seqId) throws IOException {
+      long seqId) throws IOException {
     RecoveredEditsWriter ret = writers.get(Bytes.toString(region));
     if (ret != null) {
       return ret;
@@ -92,7 +93,7 @@ class RecoveredEditsOutputSink extends AbstractRecoveredEditsOutputSink {
   public List<Path> close() throws IOException {
     boolean isSuccessful = true;
     try {
-      isSuccessful &= finishWriterThreads(false);
+      isSuccessful = finishWriterThreads(false);
     } finally {
       isSuccessful &= closeWriters();
     }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
index ae0347f..d7fd21f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
@@ -269,6 +269,7 @@ public class WALSplitter {
 
     status = TaskMonitor.get().createStatus(
           "Splitting log file " + logfile.getPath() + "into a temporary staging area.");
+    status.enableStatusJournal(true);
     Reader logFileReader = null;
     this.fileBeingSplit = logfile;
     long startTS = EnvironmentEdgeManager.currentTime();
@@ -276,7 +277,7 @@ public class WALSplitter {
       long logLength = logfile.getLen();
       LOG.info("Splitting WAL={}, size={} ({} bytes)", logPath, StringUtils.humanSize(logLength),
           logLength);
-      status.setStatus("Opening log file");
+      status.setStatus("Opening log file " + logPath);
       if (reporter != null && !reporter.progress()) {
         progressFailed = true;
         return false;
@@ -291,6 +292,7 @@ public class WALSplitter {
       int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3);
       int numOpenedFilesLastCheck = 0;
       outputSink.setReporter(reporter);
+      outputSink.setStatus(status);
       outputSink.startWriterThreads();
       outputSinkStarted = true;
       Entry entry;
@@ -375,7 +377,9 @@ public class WALSplitter {
       e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
       throw e;
     } finally {
-      LOG.debug("Finishing writing output logs and closing down");
+      final String log = "Finishing writing output logs and closing down";
+      LOG.debug(log);
+      status.setStatus(log);
       try {
         if (null != logFileReader) {
           logFileReader.close();
@@ -400,6 +404,10 @@ public class WALSplitter {
             ", corrupted=" + isCorrupted + ", progress failed=" + progressFailed;
         LOG.info(msg);
         status.markComplete(msg);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("WAL split completed for {} , Journal Log: {}", logPath,
+            status.prettyPrintJournal());
+        }
       }
     }
     return !progressFailed;