You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2021/01/07 21:43:41 UTC
[accumulo] branch main updated: Remove error message on recovery
success (#1856)
This is an automated email from the ASF dual-hosted git repository.
ctubbsii pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 81e36d0 Remove error message on recovery success (#1856)
81e36d0 is described below
commit 81e36d04f1c4aeeedaed14c1b1fd78c73c15b4e9
Author: Christopher Tubbs <ct...@apache.org>
AuthorDate: Thu Jan 7 16:43:33 2021 -0500
Remove error message on recovery success (#1856)
Fix an error message that the stream is closed after a successful
recovery in LogSorter.java. This occurred because a try-with-resources
block automatically closed the file input stream, and then it was
attempted to be closed again in the finally block in the
LogSorter.LogProcessor.sort() method. This resulted in an error message
always being shown in the finally block, even if the try-with-resources
block successfully finished without error and closed the input stream
already. This change removes the use of try-with-resources, and relies
on the finally block to close the resources once.
Related changes include:
* Remove unnecessary wrapping class to carry the original input stream
alongside the decrypting wrapper input stream, since the original
input stream is already available to all callers, and it was only used
to get the decrypting wrapper input stream. The decrypting input
stream can now be retrieved directly instead of placing it in a new
holder object. The method to get the decrypting stream was renamed and
a javadoc was added to make it easier to understand what it does.
* Change LogHeaderIncompleteException to not be a subclass of
IOException, so that it can be explicitly caught, and not easily
overlooked when handling other IOExceptions. Its cause parameter type
was also narrowed to the EOFException, which is the only valid
exception to cause the LogHeaderIncompleteException to be thrown.
---
.../org/apache/accumulo/tserver/log/DfsLogger.java | 38 ++++--------
.../org/apache/accumulo/tserver/log/LogSorter.java | 70 ++++++++++------------
.../apache/accumulo/tserver/logger/LogReader.java | 33 ++++------
.../tserver/replication/AccumuloReplicaSystem.java | 7 +--
4 files changed, 60 insertions(+), 88 deletions(-)
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index c7382bb..a5420e3 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -112,34 +112,14 @@ public class DfsLogger implements Comparable<DfsLogger> {
* log. This exception is thrown when the header cannot be read from a WAL which should only
* happen when the tserver dies as described.
*/
- public static class LogHeaderIncompleteException extends IOException {
+ public static class LogHeaderIncompleteException extends Exception {
private static final long serialVersionUID = 1L;
- public LogHeaderIncompleteException(Throwable cause) {
+ public LogHeaderIncompleteException(EOFException cause) {
super(cause);
}
}
- public static class DFSLoggerInputStreams {
-
- private FSDataInputStream originalInput;
- private DataInputStream decryptingInputStream;
-
- public DFSLoggerInputStreams(FSDataInputStream originalInput,
- DataInputStream decryptingInputStream) {
- this.originalInput = originalInput;
- this.decryptingInputStream = decryptingInputStream;
- }
-
- public FSDataInputStream getOriginalInput() {
- return originalInput;
- }
-
- public DataInputStream getDecryptingInputStream() {
- return decryptingInputStream;
- }
- }
-
public interface ServerResources {
AccumuloConfiguration getConfiguration();
@@ -358,8 +338,15 @@ public class DfsLogger implements Comparable<DfsLogger> {
metaReference = meta;
}
- public static DFSLoggerInputStreams readHeaderAndReturnStream(FSDataInputStream input,
- AccumuloConfiguration conf) throws IOException {
+ /**
+ * Reads the WAL file header, and returns a decrypting stream which wraps the original stream. If
+ * the file is not encrypted, the original stream is returned.
+ *
+ * @throws LogHeaderIncompleteException
+ * if the header cannot be fully read (can happen if the tserver died before finishing)
+ */
+ public static DataInputStream getDecryptingStream(FSDataInputStream input,
+ AccumuloConfiguration conf) throws LogHeaderIncompleteException, IOException {
DataInputStream decryptingInput;
byte[] magic4 = DfsLogger.LOG_FILE_HEADER_V4.getBytes(UTF_8);
@@ -398,12 +385,11 @@ public class DfsLogger implements Comparable<DfsLogger> {
}
} catch (EOFException e) {
// Explicitly catch any exceptions that should be converted to LogHeaderIncompleteException
-
// A TabletServer might have died before the (complete) header was written
throw new LogHeaderIncompleteException(e);
}
- return new DFSLoggerInputStreams(input, decryptingInput);
+ return decryptingInput;
}
/**
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
index a024115..a9c47b35 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
@@ -41,7 +41,6 @@ import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.log.SortedLogState;
import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
import org.apache.accumulo.server.zookeeper.DistributedWorkQueue.Processor;
-import org.apache.accumulo.tserver.log.DfsLogger.DFSLoggerInputStreams;
import org.apache.accumulo.tserver.log.DfsLogger.LogHeaderIncompleteException;
import org.apache.accumulo.tserver.logger.LogFileKey;
import org.apache.accumulo.tserver.logger.LogFileValue;
@@ -112,47 +111,42 @@ public class LogSorter {
// the following call does not throw an exception if the file/dir does not exist
fs.deleteRecursively(new Path(destPath));
- try (final FSDataInputStream fsinput = fs.open(srcPath)) {
- DFSLoggerInputStreams inputStreams;
- try {
- inputStreams = DfsLogger.readHeaderAndReturnStream(fsinput, conf);
- } catch (LogHeaderIncompleteException e) {
- log.warn("Could not read header from write-ahead log {}. Not sorting.", srcPath);
- // Creating a 'finished' marker will cause recovery to proceed normally and the
- // empty file will be correctly ignored downstream.
- fs.mkdirs(new Path(destPath));
- writeBuffer(destPath, Collections.emptyList(), part++);
- fs.create(SortedLogState.getFinishedMarkerPath(destPath)).close();
- return;
- }
-
- this.input = inputStreams.getOriginalInput();
- this.decryptingInput = inputStreams.getDecryptingInputStream();
+ input = fs.open(srcPath);
+ try {
+ decryptingInput = DfsLogger.getDecryptingStream(input, conf);
+ } catch (LogHeaderIncompleteException e) {
+ log.warn("Could not read header from write-ahead log {}. Not sorting.", srcPath);
+ // Creating a 'finished' marker will cause recovery to proceed normally and the
+ // empty file will be correctly ignored downstream.
+ fs.mkdirs(new Path(destPath));
+ writeBuffer(destPath, Collections.emptyList(), part++);
+ fs.create(SortedLogState.getFinishedMarkerPath(destPath)).close();
+ return;
+ }
- final long bufferSize = conf.getAsBytes(Property.TSERV_SORT_BUFFER_SIZE);
- Thread.currentThread().setName("Sorting " + name + " for recovery");
- while (true) {
- final ArrayList<Pair<LogFileKey,LogFileValue>> buffer = new ArrayList<>();
- try {
- long start = input.getPos();
- while (input.getPos() - start < bufferSize) {
- LogFileKey key = new LogFileKey();
- LogFileValue value = new LogFileValue();
- key.readFields(decryptingInput);
- value.readFields(decryptingInput);
- buffer.add(new Pair<>(key, value));
- }
- writeBuffer(destPath, buffer, part++);
- buffer.clear();
- } catch (EOFException ex) {
- writeBuffer(destPath, buffer, part++);
- break;
+ final long bufferSize = conf.getAsBytes(Property.TSERV_SORT_BUFFER_SIZE);
+ Thread.currentThread().setName("Sorting " + name + " for recovery");
+ while (true) {
+ final ArrayList<Pair<LogFileKey,LogFileValue>> buffer = new ArrayList<>();
+ try {
+ long start = input.getPos();
+ while (input.getPos() - start < bufferSize) {
+ LogFileKey key = new LogFileKey();
+ LogFileValue value = new LogFileValue();
+ key.readFields(decryptingInput);
+ value.readFields(decryptingInput);
+ buffer.add(new Pair<>(key, value));
}
+ writeBuffer(destPath, buffer, part++);
+ buffer.clear();
+ } catch (EOFException ex) {
+ writeBuffer(destPath, buffer, part++);
+ break;
}
- fs.create(new Path(destPath, "finished")).close();
- log.info("Finished log sort {} {} bytes {} parts in {}ms", name, getBytesCopied(), part,
- getSortTime());
}
+ fs.create(new Path(destPath, "finished")).close();
+ log.info("Finished log sort {} {} bytes {} parts in {}ms", name, getBytesCopied(), part,
+ getSortTime());
} catch (Throwable t) {
try {
// parent dir may not exist
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
index 8456365..17415bc 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
@@ -38,7 +38,6 @@ import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.server.fs.VolumeManagerImpl;
import org.apache.accumulo.tserver.log.DfsLogger;
-import org.apache.accumulo.tserver.log.DfsLogger.DFSLoggerInputStreams;
import org.apache.accumulo.tserver.log.DfsLogger.LogHeaderIncompleteException;
import org.apache.accumulo.tserver.log.RecoveryLogReader;
import org.apache.hadoop.conf.Configuration;
@@ -108,27 +107,21 @@ public class LogReader {
LogFileValue value = new LogFileValue();
if (fs.getFileStatus(path).isFile()) {
- try (final FSDataInputStream fsinput = fs.open(path)) {
- // read log entries from a simple hdfs file
- DFSLoggerInputStreams streams;
- try {
- streams = DfsLogger.readHeaderAndReturnStream(fsinput, siteConfig);
- } catch (LogHeaderIncompleteException e) {
- log.warn("Could not read header for {} . Ignoring...", path);
- continue;
- }
-
- try (DataInputStream input = streams.getDecryptingInputStream()) {
- while (true) {
- try {
- key.readFields(input);
- value.readFields(input);
- } catch (EOFException ex) {
- break;
- }
- printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations);
+ // read log entries from a simple hdfs file
+ try (final FSDataInputStream fsinput = fs.open(path);
+ DataInputStream input = DfsLogger.getDecryptingStream(fsinput, siteConfig)) {
+ while (true) {
+ try {
+ key.readFields(input);
+ value.readFields(input);
+ } catch (EOFException ex) {
+ break;
}
+ printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations);
}
+ } catch (LogHeaderIncompleteException e) {
+ log.warn("Could not read header for {} . Ignoring...", path);
+ continue;
}
} else {
// read the log entries sorted in a map file
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
index afc90ff..b661e8d 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
@@ -68,7 +68,6 @@ import org.apache.accumulo.server.replication.ReplicaSystemHelper;
import org.apache.accumulo.server.replication.StatusUtil;
import org.apache.accumulo.server.replication.proto.Replication.Status;
import org.apache.accumulo.tserver.log.DfsLogger;
-import org.apache.accumulo.tserver.log.DfsLogger.DFSLoggerInputStreams;
import org.apache.accumulo.tserver.log.DfsLogger.LogHeaderIncompleteException;
import org.apache.accumulo.tserver.logger.LogFileKey;
import org.apache.accumulo.tserver.logger.LogFileValue;
@@ -618,13 +617,13 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
return desiredTids;
}
- public DataInputStream getWalStream(Path p, FSDataInputStream input) throws IOException {
+ public DataInputStream getWalStream(Path p, FSDataInputStream input)
+ throws LogHeaderIncompleteException, IOException {
try (TraceScope span = Trace.startSpan("Read WAL header")) {
if (span.getSpan() != null) {
span.getSpan().addKVAnnotation("file", p.toString());
}
- DFSLoggerInputStreams streams = DfsLogger.readHeaderAndReturnStream(input, conf);
- return streams.getDecryptingInputStream();
+ return DfsLogger.getDecryptingStream(input, conf);
}
}