You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by vj...@apache.org on 2020/06/22 18:13:27 UTC

[hbase] branch branch-1 updated: HBASE-24380 : Provide WAL splitting journal logging (#1860) (#1939)

This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-1 by this push:
     new d144923  HBASE-24380 : Provide WAL splitting journal logging (#1860) (#1939)
d144923 is described below

commit d1449231f06f670f90a230ec36807a3a76850a75
Author: Viraj Jasani <vj...@apache.org>
AuthorDate: Mon Jun 22 23:43:12 2020 +0530

    HBASE-24380 : Provide WAL splitting journal logging (#1860) (#1939)
    
    Signed-off-by: Andrew Purtell <ap...@apache.org>
---
 .../org/apache/hadoop/hbase/wal/WALSplitter.java   | 138 +++++++++++++++------
 1 file changed, 101 insertions(+), 37 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
index f769e6d..9273b6a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
@@ -312,13 +312,14 @@ public class WALSplitter {
     status =
         TaskMonitor.get().createStatus(
           "Splitting log file " + logfile.getPath() + "into a temporary staging area.");
+    status.enableStatusJournal(true);
     Reader in = null;
     this.fileBeingSplit = logfile;
     try {
       long logLength = logfile.getLen();
       LOG.info("Splitting wal: " + logPath + ", length=" + logLength);
       LOG.info("DistributedLogReplay = " + this.distributedLogReplay);
-      status.setStatus("Opening log file");
+      status.setStatus("Opening log file " + logPath);
       if (reporter != null && !reporter.progress()) {
         progress_failed = true;
         return false;
@@ -346,6 +347,7 @@ public class WALSplitter {
       int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3);
       int numOpenedFilesLastCheck = 0;
       outputSink.setReporter(reporter);
+      outputSink.setStatus(status);
       outputSink.startWriterThreads();
       outputSinkStarted = true;
       Entry entry;
@@ -436,7 +438,9 @@ public class WALSplitter {
       e = RemoteExceptionHandler.checkIOException(e);
       throw e;
     } finally {
-      LOG.debug("Finishing writing output logs and closing down.");
+      final String log = "Finishing writing output logs and closing down";
+      LOG.debug(log);
+      status.setStatus(log);
       try {
         if (null != in) {
           in.close();
@@ -460,6 +464,10 @@ public class WALSplitter {
                 ", corrupted=" + isCorrupted + ", progress failed=" + progress_failed;
         LOG.info(msg);
         status.markComplete(msg);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("WAL split completed for " + logPath + " , Journal Log: "
+            + status.prettyPrintJournal());
+        }
       }
     }
     return !progress_failed;
@@ -1210,6 +1218,8 @@ public class WALSplitter {
 
     protected List<Path> splits = null;
 
+    protected MonitoredTask status = null;
+
     public OutputSink(PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
       numThreads = numWriters;
       this.controller = controller;
@@ -1262,6 +1272,10 @@ public class WALSplitter {
       return this.skippedEdits.get();
     }
 
+    void setStatus(MonitoredTask status) {
+      this.status = status;
+    }
+
     /**
      * Wait for writer threads to dump all info to the sink
      * @return true when there is no error
@@ -1292,7 +1306,9 @@ public class WALSplitter {
         }
       }
       controller.checkForErrors();
-      LOG.info(this.writerThreads.size() + " split writers finished; closing...");
+      final String msg = this.writerThreads.size() + " split writer threads finished";
+      LOG.info(msg);
+      updateStatusWithMsg(msg);
       return (!progress_failed);
     }
 
@@ -1329,6 +1345,17 @@ public class WALSplitter {
      * @return Return true if this sink wants to accept this region-level WALEdit.
      */
     public abstract boolean keepRegionEvent(Entry entry);
+
+    /**
+     * Set status message in {@link MonitoredTask} instance that is set in this OutputSink
+     *
+     * @param msg message to update the status with
+     */
+    protected final void updateStatusWithMsg(String msg) {
+      if (status != null) {
+        status.setStatus(msg);
+      }
+    }
   }
 
   /**
@@ -1386,19 +1413,29 @@ public class WALSplitter {
         }
       }
       if (wap.minLogSeqNum < dstMinLogSeqNum) {
-        LOG.warn("Found existing old edits file. It could be the result of a previous failed"
+        final String errorMsg =
+          "Found existing old edits file. It could be the result of a previous failed"
             + " split attempt or we have duplicated wal entries. Deleting " + dst + ", length="
-            + walFS.getFileStatus(dst).getLen());
+            + walFS.getFileStatus(dst).getLen();
+        LOG.warn(errorMsg);
+        updateStatusWithMsg(errorMsg);
         if (!walFS.delete(dst, false)) {
-          LOG.warn("Failed deleting of old " + dst);
+          final String msg = "Failed deleting of old " + dst;
+          LOG.warn(msg);
+          updateStatusWithMsg(msg);
           throw new IOException("Failed deleting of old " + dst);
         }
       } else {
-        LOG.warn("Found existing old edits file and we have less entries. Deleting " + wap.p
-            + ", length=" + walFS.getFileStatus(wap.p).getLen());
+        final String errorMsg =
+          "Found existing old edits file and we have less entries. Deleting " + wap.p + ", length="
+            + walFS.getFileStatus(wap.p).getLen();
+        LOG.warn(errorMsg);
+        updateStatusWithMsg(errorMsg);
         if (!walFS.delete(wap.p, false)) {
-          LOG.warn("Failed deleting of " + wap.p);
-          throw new IOException("Failed deleting of " + wap.p);
+          final String failureMsg = "Failed deleting of " + wap.p;
+          LOG.warn(failureMsg);
+          updateStatusWithMsg(failureMsg);
+          throw new IOException(failureMsg);
         }
       }
     }
@@ -1484,19 +1521,24 @@ public class WALSplitter {
       try {
         wap.w.close();
       } catch (IOException ioe) {
-        LOG.error("Couldn't close log at " + wap.p, ioe);
+        final String errorMsg = "Couldn't close log at " + wap.p;
+        LOG.error(errorMsg, ioe);
+        updateStatusWithMsg(errorMsg);
         thrown.add(ioe);
         return null;
       }
+      final String msg =
+        "Closed wap " + wap.p + " (wrote " + wap.editsWritten + " edits, skipped "
+          + wap.editsSkipped + " edits in " + (wap.nanosSpent / 1000 / 1000) + "ms";
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Closed wap " + wap.p + " (wrote " + wap.editsWritten
-            + " edits, skipped " + wap.editsSkipped + " edits in "
-            + (wap.nanosSpent / 1000 / 1000) + "ms");
+        LOG.debug(msg);
       }
+      updateStatusWithMsg(msg);
       if (wap.editsWritten == 0) {
         // just remove the empty recovered.edits file
         if (walFS.exists(wap.p) && !walFS.delete(wap.p, false)) {
-          LOG.warn("Failed deleting empty " + wap.p);
+          final String errorMsg = "Failed deleting empty " + wap.p;
+          LOG.warn(errorMsg);
           throw new IOException("Failed deleting empty  " + wap.p);
         }
         return null;
@@ -1513,12 +1555,18 @@ public class WALSplitter {
         // TestHLogSplit#testThreading is an example.
         if (walFS.exists(wap.p)) {
           if (!walFS.rename(wap.p, dst)) {
-            throw new IOException("Failed renaming " + wap.p + " to " + dst);
+            final String errorMsg = "Failed renaming " + wap.p + " to " + dst;
+            updateStatusWithMsg(errorMsg);
+            throw new IOException(errorMsg);
           }
-          LOG.info("Rename " + wap.p + " to " + dst);
+          final String renameLog = "Rename " + wap.p + " to " + dst;
+          LOG.info(renameLog);
+          updateStatusWithMsg(renameLog);
         }
       } catch (IOException ioe) {
-        LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe);
+        final String errorMsg = "Couldn't rename " + wap.p + " to " + dst;
+        LOG.error(errorMsg, ioe);
+        updateStatusWithMsg(errorMsg);
         thrown.add(ioe);
         return null;
       }
@@ -1555,13 +1603,17 @@ public class WALSplitter {
             wap = (WriterAndPath) tmpWAP;
             wap.w.close();
           } catch (IOException ioe) {
-            LOG.error("Couldn't close log at " + wap.p, ioe);
+            final String errorMsg = "Couldn't close log at " + wap.p;
+            LOG.error(errorMsg, ioe);
+            updateStatusWithMsg(errorMsg);
             thrown.add(ioe);
             continue;
           }
-          LOG.info(
-              "Closed log " + wap.p + " (wrote " + wap.editsWritten + " edits in " + (wap.nanosSpent
-                  / 1000 / 1000) + "ms)");
+          final String msg =
+            "Closed log " + wap.p + " (wrote " + wap.editsWritten + " edits in " + (wap.nanosSpent
+              / 1000 / 1000) + "ms)";
+          LOG.info(msg);
+          updateStatusWithMsg(msg);
         }
         writersClosed = true;
       }
@@ -1610,15 +1662,21 @@ public class WALSplitter {
         return null;
       }
       if (walFS.exists(regionedits)) {
-        LOG.warn("Found old edits file. It could be the "
-            + "result of a previous failed split attempt. Deleting " + regionedits + ", length="
-            + walFS.getFileStatus(regionedits).getLen());
+        final String warnMsg = "Found old edits file. It could be the "
+          + "result of a previous failed split attempt. Deleting " + regionedits + ", length="
+          + walFS.getFileStatus(regionedits).getLen();
+        LOG.warn(warnMsg);
+        updateStatusWithMsg(warnMsg);
         if (!walFS.delete(regionedits, false)) {
-          LOG.warn("Failed delete of old " + regionedits);
+          final String errorMsg = "Failed delete of old " + regionedits;
+          LOG.warn(errorMsg);
+          updateStatusWithMsg(errorMsg);
         }
       }
       Writer w = createWriter(regionedits);
-      LOG.debug("Creating writer path=" + regionedits);
+      final String msg = "Creating writer path=" + regionedits;
+      LOG.debug(msg);
+      updateStatusWithMsg(msg);
       return new WriterAndPath(regionedits, w, entry.getKey().getLogSeqNum());
     }
 
@@ -1666,10 +1724,10 @@ public class WALSplitter {
       WriterAndPath wap = null;
 
       long startTime = System.nanoTime();
-      try {
-        int editsCount = 0;
+      int editsCount = 0;
 
-        for (Entry logEntry : entries) {
+      for (Entry logEntry : entries) {
+        try {
           if (wap == null) {
             wap = getWriterAndPath(logEntry, reusable);
             if (wap == null) {
@@ -1687,18 +1745,24 @@ public class WALSplitter {
           } else {
             wap.incrementSkippedEdits(1);
           }
+        } catch (IOException e) {
+          logAndThrowWriterAppendFailure(logEntry, e);
         }
-        // Pass along summary statistics
-        wap.incrementEdits(editsCount);
-        wap.incrementNanoTime(System.nanoTime() - startTime);
-      } catch (IOException e) {
-        e = RemoteExceptionHandler.checkIOException(e);
-        LOG.fatal(" Got while writing log entry to log", e);
-        throw e;
       }
+      // Pass along summary statistics
+      wap.incrementEdits(editsCount);
+      wap.incrementNanoTime(System.nanoTime() - startTime);
       return wap;
     }
 
+    private void logAndThrowWriterAppendFailure(Entry logEntry, IOException e) throws IOException {
+      e = RemoteExceptionHandler.checkIOException(e);
+      final String errorMsg = "Failed to write log entry " + logEntry.toString() + " to log";
+      LOG.fatal(errorMsg, e);
+      updateStatusWithMsg(errorMsg);
+      throw e;
+    }
+
     @Override
     public boolean keepRegionEvent(Entry entry) {
       ArrayList<Cell> cells = entry.getEdit().getCells();