You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ad...@apache.org on 2018/02/14 19:37:41 UTC

[accumulo] branch master updated (2a58165 -> 23cabbc)

This is an automated email from the ASF dual-hosted git repository.

adamjshook pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git.


    from 2a58165  ACCUMULO-4784 - New fluent API for creating Connector (#361)
     add 927e673  [ACCUMULO-4787] Close input stream in AccumuloReplicaSystem (#369)
     new 23cabbc  Merge branch '1.8'

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/accumulo/tserver/log/DfsLogger.java |   5 +-
 .../org/apache/accumulo/tserver/log/LogSorter.java |  68 ++---
 .../apache/accumulo/tserver/logger/LogReader.java  |  43 +--
 .../tserver/replication/AccumuloReplicaSystem.java | 297 ++++++++++-----------
 4 files changed, 207 insertions(+), 206 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
adamjshook@apache.org.

[accumulo] 01/01: Merge branch '1.8'

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

adamjshook pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 23cabbc7a487b8844cbbe48a89a26741b7a696d8
Merge: 2a58165 927e673
Author: Adam J. Shook <ad...@gmail.com>
AuthorDate: Wed Feb 14 14:26:12 2018 -0500

    Merge branch '1.8'

 .../org/apache/accumulo/tserver/log/DfsLogger.java |   5 +-
 .../org/apache/accumulo/tserver/log/LogSorter.java |  68 ++---
 .../apache/accumulo/tserver/logger/LogReader.java  |  43 +--
 .../tserver/replication/AccumuloReplicaSystem.java | 297 ++++++++++-----------
 4 files changed, 207 insertions(+), 206 deletions(-)

diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
index fc72b98,ba5e488..c35a315
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
@@@ -113,44 -113,46 +113,46 @@@ public class LogSorter 
          // the following call does not throw an exception if the file/dir does not exist
          fs.deleteRecursively(new Path(destPath));
  
-         DFSLoggerInputStreams inputStreams;
-         try {
-           inputStreams = DfsLogger.readHeaderAndReturnStream(fs, srcPath, conf);
-         } catch (LogHeaderIncompleteException e) {
-           log.warn("Could not read header from write-ahead log {}. Not sorting.", srcPath);
-           // Creating a 'finished' marker will cause recovery to proceed normally and the
-           // empty file will be correctly ignored downstream.
-           fs.mkdirs(new Path(destPath));
-           writeBuffer(destPath, Collections.<Pair<LogFileKey,LogFileValue>> emptyList(), part++);
-           fs.create(SortedLogState.getFinishedMarkerPath(destPath)).close();
-           return;
-         }
- 
-         this.input = inputStreams.getOriginalInput();
-         this.decryptingInput = inputStreams.getDecryptingInputStream();
- 
-         final long bufferSize = conf.getAsBytes(Property.TSERV_SORT_BUFFER_SIZE);
-         Thread.currentThread().setName("Sorting " + name + " for recovery");
-         while (true) {
-           final ArrayList<Pair<LogFileKey,LogFileValue>> buffer = new ArrayList<>();
+         try (final FSDataInputStream fsinput = fs.open(srcPath)) {
+           DFSLoggerInputStreams inputStreams;
            try {
-             long start = input.getPos();
-             while (input.getPos() - start < bufferSize) {
-               LogFileKey key = new LogFileKey();
-               LogFileValue value = new LogFileValue();
-               key.readFields(decryptingInput);
-               value.readFields(decryptingInput);
-               buffer.add(new Pair<>(key, value));
+             inputStreams = DfsLogger.readHeaderAndReturnStream(fsinput, conf);
+           } catch (LogHeaderIncompleteException e) {
 -            log.warn("Could not read header from write-ahead log " + srcPath + ". Not sorting.");
++            log.warn("Could not read header from write-ahead log {}. Not sorting.", srcPath);
+             // Creating a 'finished' marker will cause recovery to proceed normally and the
+             // empty file will be correctly ignored downstream.
+             fs.mkdirs(new Path(destPath));
+             writeBuffer(destPath, Collections.<Pair<LogFileKey,LogFileValue>> emptyList(), part++);
+             fs.create(SortedLogState.getFinishedMarkerPath(destPath)).close();
+             return;
+           }
+ 
+           this.input = inputStreams.getOriginalInput();
+           this.decryptingInput = inputStreams.getDecryptingInputStream();
+ 
 -          final long bufferSize = conf.getMemoryInBytes(Property.TSERV_SORT_BUFFER_SIZE);
++          final long bufferSize = conf.getAsBytes(Property.TSERV_SORT_BUFFER_SIZE);
+           Thread.currentThread().setName("Sorting " + name + " for recovery");
+           while (true) {
+             final ArrayList<Pair<LogFileKey,LogFileValue>> buffer = new ArrayList<>();
+             try {
+               long start = input.getPos();
+               while (input.getPos() - start < bufferSize) {
+                 LogFileKey key = new LogFileKey();
+                 LogFileValue value = new LogFileValue();
+                 key.readFields(decryptingInput);
+                 value.readFields(decryptingInput);
+                 buffer.add(new Pair<>(key, value));
+               }
+               writeBuffer(destPath, buffer, part++);
+               buffer.clear();
+             } catch (EOFException ex) {
+               writeBuffer(destPath, buffer, part++);
+               break;
              }
-             writeBuffer(destPath, buffer, part++);
-             buffer.clear();
-           } catch (EOFException ex) {
-             writeBuffer(destPath, buffer, part++);
-             break;
            }
+           fs.create(new Path(destPath, "finished")).close();
 -          log.info("Finished log sort " + name + " " + getBytesCopied() + " bytes " + part + " parts in " + getSortTime() + "ms");
++          log.info("Finished log sort {} {} bytes {} parts in {}ms", name, getBytesCopied(), part, getSortTime());
          }
-         fs.create(new Path(destPath, "finished")).close();
-         log.info("Finished log sort {} {} bytes {} parts in {}ms", name, getBytesCopied(), part, getSortTime());
        } catch (Throwable t) {
          try {
            // parent dir may not exist
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
index 479550f,fb286c4..76056b4
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
@@@ -101,28 -101,30 +102,30 @@@ public class LogReader 
        LogFileValue value = new LogFileValue();
  
        if (fs.isFile(path)) {
-         // read log entries from a simple hdfs file
-         DFSLoggerInputStreams streams;
-         try {
-           streams = DfsLogger.readHeaderAndReturnStream(fs, path, SiteConfiguration.getInstance());
-         } catch (LogHeaderIncompleteException e) {
-           log.warn("Could not read header for {}. Ignoring...", path);
-           continue;
-         }
-         DataInputStream input = streams.getDecryptingInputStream();
+         try (final FSDataInputStream fsinput = fs.open(path)) {
+           // read log entries from a simple hdfs file
+           DFSLoggerInputStreams streams;
+           try {
+             streams = DfsLogger.readHeaderAndReturnStream(fsinput, SiteConfiguration.getInstance());
+           } catch (LogHeaderIncompleteException e) {
 -            log.warn("Could not read header for " + path + ". Ignoring...");
++            log.warn("Could not read header for {} . Ignoring...", path);
+             continue;
+           }
+           DataInputStream input = streams.getDecryptingInputStream();
  
-         try {
-           while (true) {
-             try {
-               key.readFields(input);
-               value.readFields(input);
-             } catch (EOFException ex) {
-               break;
+           try {
+             while (true) {
+               try {
+                 key.readFields(input);
+                 value.readFields(input);
+               } catch (EOFException ex) {
+                 break;
+               }
+               printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations);
              }
-             printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations);
+           } finally {
+             input.close();
            }
-         } finally {
-           input.close();
          }
        } else {
          // read the log entries sorted in a map file

-- 
To stop receiving notification emails like this one, please contact
adamjshook@apache.org.