You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mm...@apache.org on 2021/07/07 11:29:57 UTC
[accumulo] branch main updated: Refactor LogSorter to use config
(#2191)
This is an automated email from the ASF dual-hosted git repository.
mmiller pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 908105b Refactor LogSorter to use config (#2191)
908105b is described below
commit 908105bc8fddd64aa2d5907c22ccf2a232209306
Author: Mike Miller <mm...@apache.org>
AuthorDate: Wed Jul 7 07:29:44 2021 -0400
Refactor LogSorter to use config (#2191)
* Refactor LogSorter writeBuffer to allow use of system configuration
when writing out sorted rfiles. This will use configured settings on the
sorted files instead of only the defaults.
---
.../java/org/apache/accumulo/tserver/log/LogSorter.java | 13 ++++++-------
.../apache/accumulo/tserver/log/SortedLogRecoveryTest.java | 6 ++++--
2 files changed, 10 insertions(+), 9 deletions(-)
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
index 293ff82..bd299af 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
@@ -32,7 +32,6 @@ import java.util.concurrent.ThreadPoolExecutor;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
@@ -146,7 +145,7 @@ public class LogSorter {
// Creating a 'finished' marker will cause recovery to proceed normally and the
// empty file will be correctly ignored downstream.
fs.mkdirs(new Path(destPath));
- writeBuffer(context, destPath, Collections.emptyList(), part++);
+ writeBuffer(destPath, Collections.emptyList(), part++);
fs.create(SortedLogState.getFinishedMarkerPath(destPath)).close();
return;
}
@@ -164,10 +163,10 @@ public class LogSorter {
value.readFields(decryptingInput);
buffer.add(new Pair<>(key, value));
}
- writeBuffer(context, destPath, buffer, part++);
+ writeBuffer(destPath, buffer, part++);
buffer.clear();
} catch (EOFException ex) {
- writeBuffer(context, destPath, buffer, part++);
+ writeBuffer(destPath, buffer, part++);
break;
}
}
@@ -215,8 +214,8 @@ public class LogSorter {
}
@VisibleForTesting
- public static void writeBuffer(ServerContext context, String destPath,
- List<Pair<LogFileKey,LogFileValue>> buffer, int part) throws IOException {
+ void writeBuffer(String destPath, List<Pair<LogFileKey,LogFileValue>> buffer, int part)
+ throws IOException {
String filename = String.format("part-r-%05d.rf", part);
Path path = new Path(destPath, filename);
FileSystem fs = context.getVolumeManager().getFileSystemByPath(path);
@@ -238,7 +237,7 @@ public class LogSorter {
try (var writer = FileOperations.getInstance().newWriterBuilder()
.forFile(fullPath.toString(), fs, fs.getConf(), context.getCryptoService())
- .withTableConfiguration(DefaultConfiguration.getInstance()).build()) {
+ .withTableConfiguration(conf).build()) {
writer.startDefaultLocalityGroup();
for (var entry : keyListMap.entrySet()) {
LogFileValue val = new LogFileValue();
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
index 2121725..582b9bc 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
@@ -79,6 +79,7 @@ public class SortedLogRecoveryTest {
static final Text cq = new Text("cq");
static final Value value = new Value("value");
static ServerContext context;
+ static LogSorter logSorter;
@Rule
public TemporaryFolder tempFolder =
@@ -87,6 +88,7 @@ public class SortedLogRecoveryTest {
@Before
public void setup() {
context = EasyMock.createMock(ServerContext.class);
+ logSorter = new LogSorter(context, DefaultConfiguration.getInstance());
}
static class KeyValue implements Comparable<KeyValue> {
@@ -179,11 +181,11 @@ public class SortedLogRecoveryTest {
for (KeyValue pair : entry.getValue()) {
buffer.add(new Pair<>(pair.key, pair.value));
if (buffer.size() >= bufferSize) {
- LogSorter.writeBuffer(context, destPath, buffer, parts++);
+ logSorter.writeBuffer(destPath, buffer, parts++);
buffer.clear();
}
}
- LogSorter.writeBuffer(context, destPath, buffer, parts);
+ logSorter.writeBuffer(destPath, buffer, parts);
ns.create(SortedLogState.getFinishedMarkerPath(destPath)).close();
dirs.add(new Path(destPath));