You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mm...@apache.org on 2021/07/07 11:29:57 UTC

[accumulo] branch main updated: Refactor LogSorter to use config (#2191)

This is an automated email from the ASF dual-hosted git repository.

mmiller pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 908105b  Refactor LogSorter to use config (#2191)
908105b is described below

commit 908105bc8fddd64aa2d5907c22ccf2a232209306
Author: Mike Miller <mm...@apache.org>
AuthorDate: Wed Jul 7 07:29:44 2021 -0400

    Refactor LogSorter to use config (#2191)
    
    * Refactor LogSorter writeBuffer to allow use of system configuration
    when writing out sorted rfiles. This will use configured settings on the
    sorted files instead of only the defaults.
---
 .../java/org/apache/accumulo/tserver/log/LogSorter.java     | 13 ++++++-------
 .../apache/accumulo/tserver/log/SortedLogRecoveryTest.java  |  6 ++++--
 2 files changed, 10 insertions(+), 9 deletions(-)

diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
index 293ff82..bd299af 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
@@ -32,7 +32,6 @@ import java.util.concurrent.ThreadPoolExecutor;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.conf.DefaultConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
@@ -146,7 +145,7 @@ public class LogSorter {
         // Creating a 'finished' marker will cause recovery to proceed normally and the
         // empty file will be correctly ignored downstream.
         fs.mkdirs(new Path(destPath));
-        writeBuffer(context, destPath, Collections.emptyList(), part++);
+        writeBuffer(destPath, Collections.emptyList(), part++);
         fs.create(SortedLogState.getFinishedMarkerPath(destPath)).close();
         return;
       }
@@ -164,10 +163,10 @@ public class LogSorter {
             value.readFields(decryptingInput);
             buffer.add(new Pair<>(key, value));
           }
-          writeBuffer(context, destPath, buffer, part++);
+          writeBuffer(destPath, buffer, part++);
           buffer.clear();
         } catch (EOFException ex) {
-          writeBuffer(context, destPath, buffer, part++);
+          writeBuffer(destPath, buffer, part++);
           break;
         }
       }
@@ -215,8 +214,8 @@ public class LogSorter {
   }
 
   @VisibleForTesting
-  public static void writeBuffer(ServerContext context, String destPath,
-      List<Pair<LogFileKey,LogFileValue>> buffer, int part) throws IOException {
+  void writeBuffer(String destPath, List<Pair<LogFileKey,LogFileValue>> buffer, int part)
+      throws IOException {
     String filename = String.format("part-r-%05d.rf", part);
     Path path = new Path(destPath, filename);
     FileSystem fs = context.getVolumeManager().getFileSystemByPath(path);
@@ -238,7 +237,7 @@ public class LogSorter {
 
     try (var writer = FileOperations.getInstance().newWriterBuilder()
         .forFile(fullPath.toString(), fs, fs.getConf(), context.getCryptoService())
-        .withTableConfiguration(DefaultConfiguration.getInstance()).build()) {
+        .withTableConfiguration(conf).build()) {
       writer.startDefaultLocalityGroup();
       for (var entry : keyListMap.entrySet()) {
         LogFileValue val = new LogFileValue();
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
index 2121725..582b9bc 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
@@ -79,6 +79,7 @@ public class SortedLogRecoveryTest {
   static final Text cq = new Text("cq");
   static final Value value = new Value("value");
   static ServerContext context;
+  static LogSorter logSorter;
 
   @Rule
   public TemporaryFolder tempFolder =
@@ -87,6 +88,7 @@ public class SortedLogRecoveryTest {
   @Before
   public void setup() {
     context = EasyMock.createMock(ServerContext.class);
+    logSorter = new LogSorter(context, DefaultConfiguration.getInstance());
   }
 
   static class KeyValue implements Comparable<KeyValue> {
@@ -179,11 +181,11 @@ public class SortedLogRecoveryTest {
         for (KeyValue pair : entry.getValue()) {
           buffer.add(new Pair<>(pair.key, pair.value));
           if (buffer.size() >= bufferSize) {
-            LogSorter.writeBuffer(context, destPath, buffer, parts++);
+            logSorter.writeBuffer(destPath, buffer, parts++);
             buffer.clear();
           }
         }
-        LogSorter.writeBuffer(context, destPath, buffer, parts);
+        logSorter.writeBuffer(destPath, buffer, parts);
 
         ns.create(SortedLogState.getFinishedMarkerPath(destPath)).close();
         dirs.add(new Path(destPath));