You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/01/04 08:19:36 UTC

[02/38] hbase git commit: HBASE-19613 Miscellaneous changes to WALSplitter.

HBASE-19613 Miscellaneous changes to WALSplitter.

* Use ArrayList instead LinkedList
* Use Apache Commons where appropriate
* Parameterize and improve logging


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/30106256
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/30106256
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/30106256

Branch: refs/heads/HBASE-19397
Commit: 301062566ac6e32d5bc3c6dbfd819b5e62742e8c
Parents: 6e136f2
Author: BELUGA BEHR <da...@gmail.com>
Authored: Wed Jan 3 18:29:09 2018 -0800
Committer: Apekshit Sharma <ap...@apache.org>
Committed: Wed Jan 3 18:30:10 2018 -0800

----------------------------------------------------------------------
 .../apache/hadoop/hbase/wal/WALSplitter.java    | 163 +++++++++----------
 1 file changed, 75 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/30106256/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
index 328390e..2aad203 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
@@ -24,9 +24,9 @@ import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.text.ParseException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
@@ -48,6 +48,9 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileStatus;
@@ -86,14 +89,14 @@ import org.apache.hadoop.hbase.wal.WALProvider.Writer;
 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
 import org.apache.hadoop.io.MultipleIOException;
 import org.apache.hadoop.ipc.RemoteException;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
 import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
@@ -203,7 +206,7 @@ public class WALSplitter {
     final FileStatus[] logfiles = SplitLogManager.getFileList(conf,
         Collections.singletonList(logDir), null);
     List<Path> splits = new ArrayList<>();
-    if (logfiles != null && logfiles.length > 0) {
+    if (ArrayUtils.isNotEmpty(logfiles)) {
       for (FileStatus logfile: logfiles) {
         WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, null, null);
         if (s.splitLogFile(logfile, null)) {
@@ -245,7 +248,7 @@ public class WALSplitter {
     this.fileBeingSplit = logfile;
     try {
       long logLength = logfile.getLen();
-      LOG.info("Splitting WAL=" + logPath + ", length=" + logLength);
+      LOG.info("Splitting WAL={}, length={}", logPath, logLength);
       status.setStatus("Opening log file");
       if (reporter != null && !reporter.progress()) {
         progress_failed = true;
@@ -253,7 +256,7 @@ public class WALSplitter {
       }
       logFileReader = getReader(logfile, skipErrors, reporter);
       if (logFileReader == null) {
-        LOG.warn("Nothing to split in WAL=" + logPath);
+        LOG.warn("Nothing to split in WAL={}", logPath);
         return true;
       }
       int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3);
@@ -317,7 +320,7 @@ public class WALSplitter {
       iie.initCause(ie);
       throw iie;
     } catch (CorruptedLogFileException e) {
-      LOG.warn("Could not parse, corrupted WAL=" + logPath, e);
+      LOG.warn("Could not parse, corrupted WAL={}", logPath, e);
       if (splitLogWorkerCoordination != null) {
         // Some tests pass in a csm of null.
         splitLogWorkerCoordination.markCorrupted(rootDir, logfile.getPath().getName(), fs);
@@ -330,14 +333,13 @@ public class WALSplitter {
       e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
       throw e;
     } finally {
-      LOG.debug("Finishing writing output logs and closing down.");
+      LOG.debug("Finishing writing output logs and closing down");
       try {
         if (null != logFileReader) {
           logFileReader.close();
         }
       } catch (IOException exception) {
-        LOG.warn("Could not close WAL reader: " + exception.getMessage());
-        LOG.debug("exception details", exception);
+        LOG.warn("Could not close WAL reader", exception);
       }
       try {
         if (outputSinkStarted) {
@@ -417,11 +419,11 @@ public class WALSplitter {
       final FileSystem fs, final Configuration conf) throws IOException {
     final Path corruptDir = new Path(FSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME);
     if (conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir") != null) {
-      LOG.warn("hbase.regionserver.hlog.splitlog.corrupt.dir is deprecated. Default to "
-          + corruptDir);
+      LOG.warn("hbase.regionserver.hlog.splitlog.corrupt.dir is deprecated. Default to {}",
+          corruptDir);
     }
     if (!fs.mkdirs(corruptDir)) {
-      LOG.info("Unable to mkdir " + corruptDir);
+      LOG.info("Unable to mkdir {}", corruptDir);
     }
     fs.mkdirs(oldLogDir);
 
@@ -431,9 +433,9 @@ public class WALSplitter {
       Path p = new Path(corruptDir, corrupted.getName());
       if (fs.exists(corrupted)) {
         if (!fs.rename(corrupted, p)) {
-          LOG.warn("Unable to move corrupted log " + corrupted + " to " + p);
+          LOG.warn("Unable to move corrupted log {} to {}", corrupted, p);
         } else {
-          LOG.warn("Moved corrupted log " + corrupted + " to " + p);
+          LOG.warn("Moved corrupted log {} to {}", corrupted, p);
         }
       }
     }
@@ -442,9 +444,9 @@ public class WALSplitter {
       Path newPath = AbstractFSWAL.getWALArchivePath(oldLogDir, p);
       if (fs.exists(p)) {
         if (!FSUtils.renameAndSetModifyTime(fs, p, newPath)) {
-          LOG.warn("Unable to move  " + p + " to " + newPath);
+          LOG.warn("Unable to move {} to {}", p, newPath);
         } else {
-          LOG.info("Archived processed log " + p + " to " + newPath);
+          LOG.info("Archived processed log {} to {}", p, newPath);
         }
       }
     }
@@ -474,9 +476,9 @@ public class WALSplitter {
     Path dir = getRegionDirRecoveredEditsDir(regiondir);
 
     if (!fs.exists(regiondir)) {
-      LOG.info("This region's directory doesn't exist: "
-          + regiondir.toString() + ". It is very likely that it was" +
-          " already split so it's safe to discard those edits.");
+      LOG.info("This region's directory does not exist: {}."
+          + "It is very likely that it was already split so it is "
+          + "safe to discard those edits.", regiondir);
       return null;
     }
     if (fs.exists(dir) && fs.isFile(dir)) {
@@ -486,16 +488,16 @@ public class WALSplitter {
       }
       tmp = new Path(tmp,
         HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionName);
-      LOG.warn("Found existing old file: " + dir + ". It could be some "
+      LOG.warn("Found existing old file: {}. It could be some "
         + "leftover of an old installation. It should be a folder instead. "
-        + "So moving it to " + tmp);
+        + "So moving it to {}", dir, tmp);
       if (!fs.rename(dir, tmp)) {
-        LOG.warn("Failed to sideline old file " + dir);
+        LOG.warn("Failed to sideline old file {}", dir);
       }
     }
 
     if (!fs.exists(dir) && !fs.mkdirs(dir)) {
-      LOG.warn("mkdir failed on " + dir);
+      LOG.warn("mkdir failed on {}", dir);
     }
     // Append fileBeingSplit to prevent name conflict since we may have duplicate wal entries now.
     // Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure
@@ -554,8 +556,9 @@ public class WALSplitter {
       final Path regiondir) throws IOException {
     NavigableSet<Path> filesSorted = new TreeSet<>();
     Path editsdir = getRegionDirRecoveredEditsDir(regiondir);
-    if (!fs.exists(editsdir))
+    if (!fs.exists(editsdir)) {
       return filesSorted;
+    }
     FileStatus[] files = FSUtils.listStatus(fs, editsdir, new PathFilter() {
       @Override
       public boolean accept(Path p) {
@@ -577,16 +580,13 @@ public class WALSplitter {
             result = false;
           }
         } catch (IOException e) {
-          LOG.warn("Failed isFile check on " + p);
+          LOG.warn("Failed isFile check on {}", p, e);
         }
         return result;
       }
     });
-    if (files == null) {
-      return filesSorted;
-    }
-    for (FileStatus status : files) {
-      filesSorted.add(status.getPath());
+    if (ArrayUtils.isNotEmpty(files)) {
+      Arrays.asList(files).forEach(status -> filesSorted.add(status.getPath()));
     }
     return filesSorted;
   }
@@ -605,7 +605,7 @@ public class WALSplitter {
     Path moveAsideName = new Path(edits.getParent(), edits.getName() + "."
         + System.currentTimeMillis());
     if (!fs.rename(edits, moveAsideName)) {
-      LOG.warn("Rename failed from " + edits + " to " + moveAsideName);
+      LOG.warn("Rename failed from {} to {}", edits, moveAsideName);
     }
     return moveAsideName;
   }
@@ -655,7 +655,7 @@ public class WALSplitter {
                 - SEQUENCE_ID_FILE_SUFFIX_LENGTH));
             maxSeqId = Math.max(tmpSeqId, maxSeqId);
           } catch (NumberFormatException ex) {
-            LOG.warn("Invalid SeqId File Name=" + fileName);
+            LOG.warn("Invalid SeqId File Name={}", fileName);
           }
         }
       }
@@ -672,10 +672,8 @@ public class WALSplitter {
         if (!fs.createNewFile(newSeqIdFile) && !fs.exists(newSeqIdFile)) {
           throw new IOException("Failed to create SeqId file:" + newSeqIdFile);
         }
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Wrote file=" + newSeqIdFile + ", newSeqId=" + newSeqId
-              + ", maxSeqId=" + maxSeqId);
-        }
+        LOG.debug("Wrote file={}, newSeqId={}, maxSeqId={}", newSeqIdFile,
+            newSeqId, maxSeqId);
       } catch (FileAlreadyExistsException ignored) {
         // latest hdfs throws this exception. it's all right if newSeqIdFile already exists
       }
@@ -683,10 +681,9 @@ public class WALSplitter {
     // remove old ones
     if (files != null) {
       for (FileStatus status : files) {
-        if (newSeqIdFile.equals(status.getPath())) {
-          continue;
+        if (!newSeqIdFile.equals(status.getPath())) {
+          fs.delete(status.getPath(), false);
         }
-        fs.delete(status.getPath(), false);
       }
     }
     return newSeqId;
@@ -710,7 +707,7 @@ public class WALSplitter {
     // zero length even if the file has been sync'd. Revisit if HDFS-376 or
     // HDFS-878 is committed.
     if (length <= 0) {
-      LOG.warn("File " + path + " might be still open, length is 0");
+      LOG.warn("File {} might be still open, length is 0", path);
     }
 
     try {
@@ -724,17 +721,15 @@ public class WALSplitter {
           // ignore if this is the last log in sequence.
           // TODO is this scenario still possible if the log has been
           // recovered (i.e. closed)
-          LOG.warn("Could not open " + path + " for reading. File is empty", e);
-          return null;
-        } else {
-          // EOFException being ignored
-          return null;
+          LOG.warn("Could not open {} for reading. File is empty", path, e);
         }
+        // EOFException being ignored
+        return null;
       }
     } catch (IOException e) {
       if (e instanceof FileNotFoundException) {
         // A wal file may not exist anymore. Nothing can be recovered so move on
-        LOG.warn("File " + path + " doesn't exist anymore.", e);
+        LOG.warn("File {} does not exist anymore", path, e);
         return null;
       }
       if (!skipErrors || e instanceof InterruptedIOException) {
@@ -755,7 +750,7 @@ public class WALSplitter {
       return in.next();
     } catch (EOFException eof) {
       // truncated files are expected if a RS crashes (see HBASE-2643)
-      LOG.info("EOF from wal " + path + ".  continuing");
+      LOG.info("EOF from wal {}. Continuing.", path);
       return null;
     } catch (IOException e) {
       // If the IOE resulted from bad file format,
@@ -763,8 +758,7 @@ public class WALSplitter {
       if (e.getCause() != null &&
           (e.getCause() instanceof ParseException ||
            e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)) {
-        LOG.warn("Parse exception " + e.getCause().toString() + " from wal "
-           + path + ".  continuing");
+        LOG.warn("Parse exception from wal {}. Continuing", path, e);
         return null;
       }
       if (!skipErrors) {
@@ -893,8 +887,7 @@ public class WALSplitter {
       synchronized (controller.dataAvailable) {
         totalBuffered += incrHeap;
         while (totalBuffered > maxHeapUsage && controller.thrown.get() == null) {
-          LOG.debug("Used " + totalBuffered +
-              " bytes of buffered edits, waiting for IO threads...");
+          LOG.debug("Used {} bytes of buffered edits, waiting for IO threads", totalBuffered);
           controller.dataAvailable.wait(2000);
         }
         controller.dataAvailable.notifyAll();
@@ -980,7 +973,7 @@ public class WALSplitter {
     RegionEntryBuffer(TableName tableName, byte[] region) {
       this.tableName = tableName;
       this.encodedRegionName = region;
-      this.entryBuffer = new LinkedList<>();
+      this.entryBuffer = new ArrayList<>();
     }
 
     long appendEntry(Entry entry) {
@@ -1041,7 +1034,7 @@ public class WALSplitter {
     }
 
     private void doRun() throws IOException {
-      if (LOG.isTraceEnabled()) LOG.trace("Writer thread starting");
+      LOG.trace("Writer thread starting");
       while (true) {
         RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
         if (buffer == null) {
@@ -1190,7 +1183,7 @@ public class WALSplitter {
         }
       }
       controller.checkForErrors();
-      LOG.info(this.writerThreads.size() + " split writers finished; closing...");
+      LOG.info("{} split writers finished; closing.", this.writerThreads.size());
       return (!progress_failed);
     }
 
@@ -1257,7 +1250,7 @@ public class WALSplitter {
       } finally {
         result = close();
         List<IOException> thrown = closeLogWriters(null);
-        if (thrown != null && !thrown.isEmpty()) {
+        if (CollectionUtils.isNotEmpty(thrown)) {
           throw MultipleIOException.createIOException(thrown);
         }
       }
@@ -1276,25 +1269,22 @@ public class WALSplitter {
           dstMinLogSeqNum = entry.getKey().getSequenceId();
         }
       } catch (EOFException e) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(
-            "Got EOF when reading first WAL entry from " + dst + ", an empty or broken WAL file?",
-            e);
-        }
+        LOG.debug("Got EOF when reading first WAL entry from {}, an empty or broken WAL file?",
+            dst, e);
       }
       if (wap.minLogSeqNum < dstMinLogSeqNum) {
         LOG.warn("Found existing old edits file. It could be the result of a previous failed"
             + " split attempt or we have duplicated wal entries. Deleting " + dst + ", length="
             + fs.getFileStatus(dst).getLen());
         if (!fs.delete(dst, false)) {
-          LOG.warn("Failed deleting of old " + dst);
+          LOG.warn("Failed deleting of old {}", dst);
           throw new IOException("Failed deleting of old " + dst);
         }
       } else {
         LOG.warn("Found existing old edits file and we have less entries. Deleting " + wap.p
             + ", length=" + fs.getFileStatus(wap.p).getLen());
         if (!fs.delete(wap.p, false)) {
-          LOG.warn("Failed deleting of " + wap.p);
+          LOG.warn("Failed deleting of {}", wap.p);
           throw new IOException("Failed deleting of " + wap.p);
         }
       }
@@ -1377,13 +1367,11 @@ public class WALSplitter {
 
     Path closeWriter(String encodedRegionName, WriterAndPath wap,
         List<IOException> thrown) throws IOException{
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Closing " + wap.p);
-      }
+      LOG.trace("Closing {}", wap.p);
       try {
         wap.w.close();
       } catch (IOException ioe) {
-        LOG.error("Couldn't close log at " + wap.p, ioe);
+        LOG.error("Could not close log at {}", wap.p, ioe);
         thrown.add(ioe);
         return null;
       }
@@ -1395,7 +1383,7 @@ public class WALSplitter {
       if (wap.editsWritten == 0) {
         // just remove the empty recovered.edits file
         if (fs.exists(wap.p) && !fs.delete(wap.p, false)) {
-          LOG.warn("Failed deleting empty " + wap.p);
+          LOG.warn("Failed deleting empty {}", wap.p);
           throw new IOException("Failed deleting empty  " + wap.p);
         }
         return null;
@@ -1414,10 +1402,10 @@ public class WALSplitter {
           if (!fs.rename(wap.p, dst)) {
             throw new IOException("Failed renaming " + wap.p + " to " + dst);
           }
-          LOG.info("Rename " + wap.p + " to " + dst);
+          LOG.info("Rename {} to {}", wap.p, dst);
         }
       } catch (IOException ioe) {
-        LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe);
+        LOG.error("Could not rename {} to {}", wap.p, dst, ioe);
         thrown.add(ioe);
         return null;
       }
@@ -1428,7 +1416,6 @@ public class WALSplitter {
       if (writersClosed) {
         return thrown;
       }
-
       if (thrown == null) {
         thrown = Lists.newArrayList();
       }
@@ -1453,7 +1440,7 @@ public class WALSplitter {
             wap = (WriterAndPath) tmpWAP;
             wap.w.close();
           } catch (IOException ioe) {
-            LOG.error("Couldn't close log at " + wap.p, ioe);
+            LOG.error("Couldn't close log at {}", wap.p, ioe);
             thrown.add(ioe);
             continue;
           }
@@ -1508,18 +1495,18 @@ public class WALSplitter {
             + "result of a previous failed split attempt. Deleting " + regionedits + ", length="
             + fs.getFileStatus(regionedits).getLen());
         if (!fs.delete(regionedits, false)) {
-          LOG.warn("Failed delete of old " + regionedits);
+          LOG.warn("Failed delete of old {}", regionedits);
         }
       }
       Writer w = createWriter(regionedits);
-      LOG.debug("Creating writer path=" + regionedits);
+      LOG.debug("Creating writer path={}", regionedits);
       return new WriterAndPath(regionedits, w, entry.getKey().getSequenceId());
     }
 
     void filterCellByStore(Entry logEntry) {
       Map<byte[], Long> maxSeqIdInStores =
           regionMaxSeqIdInStores.get(Bytes.toString(logEntry.getKey().getEncodedRegionName()));
-      if (maxSeqIdInStores == null || maxSeqIdInStores.isEmpty()) {
+      if (MapUtils.isEmpty(maxSeqIdInStores)) {
         return;
       }
       // Create the array list for the cells that aren't filtered.
@@ -1567,11 +1554,9 @@ public class WALSplitter {
           if (wap == null) {
             wap = getWriterAndPath(logEntry, reusable);
             if (wap == null) {
-              if (LOG.isTraceEnabled()) {
-                // This log spews the full edit. Can be massive in the log. Enable only debugging
-                // WAL lost edit issues.
-                LOG.trace("getWriterAndPath decided we don't need to write edits for " + logEntry);
-              }
+              // This log spews the full edit. Can be massive in the log. Enable only debugging
+              // WAL lost edit issues.
+              LOG.trace("getWriterAndPath decided we don't need to write edits for {}", logEntry);
               return null;
             }
           }
@@ -1590,7 +1575,7 @@ public class WALSplitter {
       } catch (IOException e) {
           e = e instanceof RemoteException ?
                   ((RemoteException)e).unwrapRemoteException() : e;
-        LOG.error(HBaseMarkers.FATAL, " Got while writing log entry to log", e);
+        LOG.error(HBaseMarkers.FATAL, "Got while writing log entry to log", e);
         throw e;
       }
       return wap;
@@ -1599,8 +1584,8 @@ public class WALSplitter {
     @Override
     public boolean keepRegionEvent(Entry entry) {
       ArrayList<Cell> cells = entry.getEdit().getCells();
-      for (int i = 0; i < cells.size(); i++) {
-        if (WALEdit.isCompactionMarker(cells.get(i))) {
+      for (Cell cell : cells) {
+        if (WALEdit.isCompactionMarker(cell)) {
           return true;
         }
       }
@@ -1657,7 +1642,7 @@ public class WALSplitter {
         List<IOException> thrown, List<Path> paths)
         throws InterruptedException, ExecutionException {
       for (final Map.Entry<byte[], RegionEntryBuffer> buffer : entryBuffers.buffers.entrySet()) {
-        LOG.info("Submitting writeThenClose of " + buffer.getValue().encodedRegionName);
+        LOG.info("Submitting writeThenClose of {}", buffer.getValue().encodedRegionName);
         completionService.submit(new Callable<Void>() {
           public Void call() throws Exception {
             Path dst = writeThenClose(buffer.getValue());
@@ -1835,7 +1820,7 @@ public class WALSplitter {
 
     if (entry == null) {
       // return an empty array
-      return new ArrayList<>();
+      return Collections.emptyList();
     }
 
     long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
@@ -1846,7 +1831,9 @@ public class WALSplitter {
     Mutation m = null;
     WALKeyImpl key = null;
     WALEdit val = null;
-    if (logEntry != null) val = new WALEdit();
+    if (logEntry != null) {
+      val = new WALEdit();
+    }
 
     for (int i = 0; i < count; i++) {
       // Throw index out of bounds if our cell count is off