You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2021/01/07 21:43:41 UTC

[accumulo] branch main updated: Remove error message on recovery success (#1856)

This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 81e36d0  Remove error message on recovery success (#1856)
81e36d0 is described below

commit 81e36d04f1c4aeeedaed14c1b1fd78c73c15b4e9
Author: Christopher Tubbs <ct...@apache.org>
AuthorDate: Thu Jan 7 16:43:33 2021 -0500

    Remove error message on recovery success (#1856)
    
    Fix an error message that the stream is closed after a successful
    recovery in LogSorter.java. This occurred because a try-with-resources
    block automatically closed the file input stream, and then it was
    attempted to be closed again in the finally block in the
    LogSorter.LogProcessor.sort() method. This resulted in an error message
    always being shown in the finally block, even if the try-with-resources
    block successfully finished without error and closed the input stream
    already. This change removes the use of try-with-resources, and relies
    on the finally block to close the resources once.
    
    Related changes include:
    
    * Remove unnecessary wrapping class to carry the original input stream
      alongside the decrypting wrapper input stream, since the original
      input stream is already available to all callers, and it was only used
      to get the decrypting wrapper input stream. The decrypting input
      stream can now be retrieved directly instead of placing it in a new
      holder object. The method to get the decrypting stream was renamed and
      a javadoc was added to make it easier to understand what it does.
    * Change LogHeaderIncompleteException to not be a subclass of
      IOException, so that it can be explicitly caught, and not easily
      overlooked when handling other IOExceptions. Its cause parameter type
      was also narrowed to the EOFException, which is the only valid
      exception to cause the LogHeaderIncompleteException to be thrown.
---
 .../org/apache/accumulo/tserver/log/DfsLogger.java | 38 ++++--------
 .../org/apache/accumulo/tserver/log/LogSorter.java | 70 ++++++++++------------
 .../apache/accumulo/tserver/logger/LogReader.java  | 33 ++++------
 .../tserver/replication/AccumuloReplicaSystem.java |  7 +--
 4 files changed, 60 insertions(+), 88 deletions(-)

diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index c7382bb..a5420e3 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -112,34 +112,14 @@ public class DfsLogger implements Comparable<DfsLogger> {
    * log. This exception is thrown when the header cannot be read from a WAL which should only
    * happen when the tserver dies as described.
    */
-  public static class LogHeaderIncompleteException extends IOException {
+  public static class LogHeaderIncompleteException extends Exception {
     private static final long serialVersionUID = 1L;
 
-    public LogHeaderIncompleteException(Throwable cause) {
+    public LogHeaderIncompleteException(EOFException cause) {
       super(cause);
     }
   }
 
-  public static class DFSLoggerInputStreams {
-
-    private FSDataInputStream originalInput;
-    private DataInputStream decryptingInputStream;
-
-    public DFSLoggerInputStreams(FSDataInputStream originalInput,
-        DataInputStream decryptingInputStream) {
-      this.originalInput = originalInput;
-      this.decryptingInputStream = decryptingInputStream;
-    }
-
-    public FSDataInputStream getOriginalInput() {
-      return originalInput;
-    }
-
-    public DataInputStream getDecryptingInputStream() {
-      return decryptingInputStream;
-    }
-  }
-
   public interface ServerResources {
     AccumuloConfiguration getConfiguration();
 
@@ -358,8 +338,15 @@ public class DfsLogger implements Comparable<DfsLogger> {
     metaReference = meta;
   }
 
-  public static DFSLoggerInputStreams readHeaderAndReturnStream(FSDataInputStream input,
-      AccumuloConfiguration conf) throws IOException {
+  /**
+   * Reads the WAL file header, and returns a decrypting stream which wraps the original stream. If
+   * the file is not encrypted, the original stream is returned.
+   *
+   * @throws LogHeaderIncompleteException
+   *           if the header cannot be fully read (can happen if the tserver died before finishing)
+   */
+  public static DataInputStream getDecryptingStream(FSDataInputStream input,
+      AccumuloConfiguration conf) throws LogHeaderIncompleteException, IOException {
     DataInputStream decryptingInput;
 
     byte[] magic4 = DfsLogger.LOG_FILE_HEADER_V4.getBytes(UTF_8);
@@ -398,12 +385,11 @@ public class DfsLogger implements Comparable<DfsLogger> {
       }
     } catch (EOFException e) {
       // Explicitly catch any exceptions that should be converted to LogHeaderIncompleteException
-
       // A TabletServer might have died before the (complete) header was written
       throw new LogHeaderIncompleteException(e);
     }
 
-    return new DFSLoggerInputStreams(input, decryptingInput);
+    return decryptingInput;
   }
 
   /**
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
index a024115..a9c47b35 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
@@ -41,7 +41,6 @@ import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.log.SortedLogState;
 import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
 import org.apache.accumulo.server.zookeeper.DistributedWorkQueue.Processor;
-import org.apache.accumulo.tserver.log.DfsLogger.DFSLoggerInputStreams;
 import org.apache.accumulo.tserver.log.DfsLogger.LogHeaderIncompleteException;
 import org.apache.accumulo.tserver.logger.LogFileKey;
 import org.apache.accumulo.tserver.logger.LogFileValue;
@@ -112,47 +111,42 @@ public class LogSorter {
         // the following call does not throw an exception if the file/dir does not exist
         fs.deleteRecursively(new Path(destPath));
 
-        try (final FSDataInputStream fsinput = fs.open(srcPath)) {
-          DFSLoggerInputStreams inputStreams;
-          try {
-            inputStreams = DfsLogger.readHeaderAndReturnStream(fsinput, conf);
-          } catch (LogHeaderIncompleteException e) {
-            log.warn("Could not read header from write-ahead log {}. Not sorting.", srcPath);
-            // Creating a 'finished' marker will cause recovery to proceed normally and the
-            // empty file will be correctly ignored downstream.
-            fs.mkdirs(new Path(destPath));
-            writeBuffer(destPath, Collections.emptyList(), part++);
-            fs.create(SortedLogState.getFinishedMarkerPath(destPath)).close();
-            return;
-          }
-
-          this.input = inputStreams.getOriginalInput();
-          this.decryptingInput = inputStreams.getDecryptingInputStream();
+        input = fs.open(srcPath);
+        try {
+          decryptingInput = DfsLogger.getDecryptingStream(input, conf);
+        } catch (LogHeaderIncompleteException e) {
+          log.warn("Could not read header from write-ahead log {}. Not sorting.", srcPath);
+          // Creating a 'finished' marker will cause recovery to proceed normally and the
+          // empty file will be correctly ignored downstream.
+          fs.mkdirs(new Path(destPath));
+          writeBuffer(destPath, Collections.emptyList(), part++);
+          fs.create(SortedLogState.getFinishedMarkerPath(destPath)).close();
+          return;
+        }
 
-          final long bufferSize = conf.getAsBytes(Property.TSERV_SORT_BUFFER_SIZE);
-          Thread.currentThread().setName("Sorting " + name + " for recovery");
-          while (true) {
-            final ArrayList<Pair<LogFileKey,LogFileValue>> buffer = new ArrayList<>();
-            try {
-              long start = input.getPos();
-              while (input.getPos() - start < bufferSize) {
-                LogFileKey key = new LogFileKey();
-                LogFileValue value = new LogFileValue();
-                key.readFields(decryptingInput);
-                value.readFields(decryptingInput);
-                buffer.add(new Pair<>(key, value));
-              }
-              writeBuffer(destPath, buffer, part++);
-              buffer.clear();
-            } catch (EOFException ex) {
-              writeBuffer(destPath, buffer, part++);
-              break;
+        final long bufferSize = conf.getAsBytes(Property.TSERV_SORT_BUFFER_SIZE);
+        Thread.currentThread().setName("Sorting " + name + " for recovery");
+        while (true) {
+          final ArrayList<Pair<LogFileKey,LogFileValue>> buffer = new ArrayList<>();
+          try {
+            long start = input.getPos();
+            while (input.getPos() - start < bufferSize) {
+              LogFileKey key = new LogFileKey();
+              LogFileValue value = new LogFileValue();
+              key.readFields(decryptingInput);
+              value.readFields(decryptingInput);
+              buffer.add(new Pair<>(key, value));
             }
+            writeBuffer(destPath, buffer, part++);
+            buffer.clear();
+          } catch (EOFException ex) {
+            writeBuffer(destPath, buffer, part++);
+            break;
           }
-          fs.create(new Path(destPath, "finished")).close();
-          log.info("Finished log sort {} {} bytes {} parts in {}ms", name, getBytesCopied(), part,
-              getSortTime());
         }
+        fs.create(new Path(destPath, "finished")).close();
+        log.info("Finished log sort {} {} bytes {} parts in {}ms", name, getBytesCopied(), part,
+            getSortTime());
       } catch (Throwable t) {
         try {
           // parent dir may not exist
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
index 8456365..17415bc 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
@@ -38,7 +38,6 @@ import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
 import org.apache.accumulo.tserver.log.DfsLogger;
-import org.apache.accumulo.tserver.log.DfsLogger.DFSLoggerInputStreams;
 import org.apache.accumulo.tserver.log.DfsLogger.LogHeaderIncompleteException;
 import org.apache.accumulo.tserver.log.RecoveryLogReader;
 import org.apache.hadoop.conf.Configuration;
@@ -108,27 +107,21 @@ public class LogReader {
         LogFileValue value = new LogFileValue();
 
         if (fs.getFileStatus(path).isFile()) {
-          try (final FSDataInputStream fsinput = fs.open(path)) {
-            // read log entries from a simple hdfs file
-            DFSLoggerInputStreams streams;
-            try {
-              streams = DfsLogger.readHeaderAndReturnStream(fsinput, siteConfig);
-            } catch (LogHeaderIncompleteException e) {
-              log.warn("Could not read header for {} . Ignoring...", path);
-              continue;
-            }
-
-            try (DataInputStream input = streams.getDecryptingInputStream()) {
-              while (true) {
-                try {
-                  key.readFields(input);
-                  value.readFields(input);
-                } catch (EOFException ex) {
-                  break;
-                }
-                printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations);
+          // read log entries from a simple hdfs file
+          try (final FSDataInputStream fsinput = fs.open(path);
+              DataInputStream input = DfsLogger.getDecryptingStream(fsinput, siteConfig)) {
+            while (true) {
+              try {
+                key.readFields(input);
+                value.readFields(input);
+              } catch (EOFException ex) {
+                break;
               }
+              printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations);
             }
+          } catch (LogHeaderIncompleteException e) {
+            log.warn("Could not read header for {} . Ignoring...", path);
+            continue;
           }
         } else {
           // read the log entries sorted in a map file
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
index afc90ff..b661e8d 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
@@ -68,7 +68,6 @@ import org.apache.accumulo.server.replication.ReplicaSystemHelper;
 import org.apache.accumulo.server.replication.StatusUtil;
 import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.apache.accumulo.tserver.log.DfsLogger;
-import org.apache.accumulo.tserver.log.DfsLogger.DFSLoggerInputStreams;
 import org.apache.accumulo.tserver.log.DfsLogger.LogHeaderIncompleteException;
 import org.apache.accumulo.tserver.logger.LogFileKey;
 import org.apache.accumulo.tserver.logger.LogFileValue;
@@ -618,13 +617,13 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
     return desiredTids;
   }
 
-  public DataInputStream getWalStream(Path p, FSDataInputStream input) throws IOException {
+  public DataInputStream getWalStream(Path p, FSDataInputStream input)
+      throws LogHeaderIncompleteException, IOException {
     try (TraceScope span = Trace.startSpan("Read WAL header")) {
       if (span.getSpan() != null) {
         span.getSpan().addKVAnnotation("file", p.toString());
       }
-      DFSLoggerInputStreams streams = DfsLogger.readHeaderAndReturnStream(input, conf);
-      return streams.getDecryptingInputStream();
+      return DfsLogger.getDecryptingStream(input, conf);
     }
   }