You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mm...@apache.org on 2021/08/20 14:44:14 UTC
[accumulo] branch main updated: Add new property to configure rfile
sorted recovery (#2236)
This is an automated email from the ASF dual-hosted git repository.
mmiller pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 3ea979d Add new property to configure rfile sorted recovery (#2236)
3ea979d is described below
commit 3ea979d5971016b9c2512a96cc5b6bd5d1969b2e
Author: Mike Miller <mm...@apache.org>
AuthorDate: Fri Aug 20 10:44:08 2021 -0400
Add new property to configure rfile sorted recovery (#2236)
* Create new property prefix "tserver.wal.sort.file." to configure the
rfiles written during sorted recovery
* Add method to LogSorter to convert the sort file properties to table
files properties
* Create new tests in SortedLogRecoveryTest
* Make method public in Compression to use in test
* Add property to MultiTableRecoveryIT to allow testing in an IT
---
.../org/apache/accumulo/core/conf/Property.java | 5 +
.../core/file/rfile/bcfile/Compression.java | 2 +-
.../org/apache/accumulo/tserver/log/LogSorter.java | 33 ++++--
.../tserver/log/SortedLogRecoveryTest.java | 112 +++++++++++++++++++++
.../apache/accumulo/test/MultiTableRecoveryIT.java | 2 +
5 files changed, 147 insertions(+), 7 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index f97e802..e41f898 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -675,6 +675,11 @@ public enum Property {
"The maximum number of threads to use to sort logs during" + " recovery", "1.5.0"),
TSERV_SORT_BUFFER_SIZE("tserver.sort.buffer.size", "10%", PropertyType.MEMORY,
"The amount of memory to use when sorting logs during recovery.", "1.5.0"),
+ TSERV_WAL_SORT_FILE_PREFIX("tserver.wal.sort.file.", null, PropertyType.PREFIX,
+ "The rfile properties to use when sorting logs during recovery. Most of the properties"
+ + " that begin with 'table.file' can be used here. For example, to set the compression"
+ + " of the sorted recovery files to snappy use 'tserver.wal.sort.file.compress.type=snappy'",
+ "2.1.0"),
TSERV_WORKQ_THREADS("tserver.workq.threads", "2", PropertyType.COUNT,
"The number of threads for the distributed work queue. These threads are"
+ " used for copying failed bulk import RFiles.",
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java
index dcd41a6..d4089df 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java
@@ -696,7 +696,7 @@ public final class Compression {
return supportedAlgorithms.toArray(new String[0]);
}
- static Algorithm getCompressionAlgorithmByName(final String name) {
+ public static Algorithm getCompressionAlgorithmByName(final String name) {
Algorithm[] algorithms = Algorithm.class.getEnumConstants();
for (Algorithm algorithm : algorithms) {
if (algorithm.getName().equals(name)) {
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
index bd299af..3d593dc 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.ConfigurationCopy;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
@@ -59,7 +60,7 @@ import com.google.common.annotations.VisibleForTesting;
public class LogSorter {
private static final Logger log = LoggerFactory.getLogger(LogSorter.class);
- AccumuloConfiguration conf;
+ AccumuloConfiguration sortedLogConf;
private final Map<String,LogProcessor> currentWork = Collections.synchronizedMap(new HashMap<>());
@@ -139,7 +140,7 @@ public class LogSorter {
input = fs.open(srcPath);
try {
- decryptingInput = DfsLogger.getDecryptingStream(input, conf);
+ decryptingInput = DfsLogger.getDecryptingStream(input, sortedLogConf);
} catch (LogHeaderIncompleteException e) {
log.warn("Could not read header from write-ahead log {}. Not sorting.", srcPath);
// Creating a 'finished' marker will cause recovery to proceed normally and the
@@ -150,7 +151,7 @@ public class LogSorter {
return;
}
- final long bufferSize = conf.getAsBytes(Property.TSERV_SORT_BUFFER_SIZE);
+ final long bufferSize = sortedLogConf.getAsBytes(Property.TSERV_SORT_BUFFER_SIZE);
Thread.currentThread().setName("Sorting " + name + " for recovery");
while (true) {
final ArrayList<Pair<LogFileKey,LogFileValue>> buffer = new ArrayList<>();
@@ -206,13 +207,33 @@ public class LogSorter {
public LogSorter(ServerContext context, AccumuloConfiguration conf) {
this.context = context;
- this.conf = conf;
+ this.sortedLogConf = extractSortedLogConfig(conf);
int threadPoolSize = conf.getCount(Property.TSERV_RECOVERY_MAX_CONCURRENT);
this.threadPool =
ThreadPools.createFixedThreadPool(threadPoolSize, this.getClass().getName(), false);
this.walBlockSize = DfsLogger.getWalBlockSize(conf);
}
+ /**
+ * Get the properties set with {@link Property#TSERV_WAL_SORT_FILE_PREFIX} and translate them to
+ * equivalent 'table.file' properties to be used when writing rfiles for sorted recovery.
+ */
+ private AccumuloConfiguration extractSortedLogConfig(AccumuloConfiguration conf) {
+ final String tablePrefix = "table.file.";
+ var props = conf.getAllPropertiesWithPrefixStripped(Property.TSERV_WAL_SORT_FILE_PREFIX);
+ ConfigurationCopy copy = new ConfigurationCopy(conf);
+ props.forEach((prop, val) -> {
+ String tableProp = tablePrefix + prop;
+ if (Property.isTablePropertyValid(tableProp, val)) {
+ log.debug("Using property for writing sorted files: {}={}", tableProp, val);
+ copy.set(tableProp, val);
+ } else {
+ throw new IllegalArgumentException("Invalid sort file property " + prop + "=" + val);
+ }
+ });
+ return copy;
+ }
+
@VisibleForTesting
void writeBuffer(String destPath, List<Pair<LogFileKey,LogFileValue>> buffer, int part)
throws IOException {
@@ -237,7 +258,7 @@ public class LogSorter {
try (var writer = FileOperations.getInstance().newWriterBuilder()
.forFile(fullPath.toString(), fs, fs.getConf(), context.getCryptoService())
- .withTableConfiguration(conf).build()) {
+ .withTableConfiguration(sortedLogConf).build()) {
writer.startDefaultLocalityGroup();
for (var entry : keyListMap.entrySet()) {
LogFileValue val = new LogFileValue();
@@ -250,7 +271,7 @@ public class LogSorter {
public void startWatchingForRecoveryLogs(ThreadPoolExecutor distWorkQThreadPool)
throws KeeperException, InterruptedException {
this.threadPool = distWorkQThreadPool;
- new DistributedWorkQueue(context.getZooKeeperRoot() + Constants.ZRECOVERY, conf)
+ new DistributedWorkQueue(context.getZooKeeperRoot() + Constants.ZRECOVERY, sortedLogConf)
.startProcessing(new LogProcessor(), this.threadPool);
}
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
index 582b9bc..0a2720b 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
@@ -45,12 +45,17 @@ import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
+import org.apache.accumulo.core.conf.ConfigurationCopy;
import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.crypto.CryptoServiceFactory;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.file.rfile.bcfile.Compression;
+import org.apache.accumulo.core.file.rfile.bcfile.Utils;
+import org.apache.accumulo.core.file.streams.SeekableDataInputStream;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.data.ServerMutation;
@@ -59,6 +64,7 @@ import org.apache.accumulo.server.log.SortedLogState;
import org.apache.accumulo.tserver.logger.LogEvents;
import org.apache.accumulo.tserver.logger.LogFileKey;
import org.apache.accumulo.tserver.logger.LogFileValue;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
@@ -1064,4 +1070,110 @@ public class SortedLogRecoveryTest {
var e = assertThrows(IllegalStateException.class, () -> recover(logs, extent));
assertTrue(e.getMessage().contains("not " + LogEvents.OPEN));
}
+
+ @Test
+ public void testInvalidLogSortedProperties() {
+ ConfigurationCopy testConfig = new ConfigurationCopy(DefaultConfiguration.getInstance());
+ // test all the possible properties for tserver.sort.file. prefix
+ String prop = Property.TSERV_WAL_SORT_FILE_PREFIX + "invalid";
+ testConfig.set(prop, "snappy");
+ try {
+ new LogSorter(context, testConfig);
+ fail("Did not throw IllegalArgumentException for " + prop);
+ } catch (IllegalArgumentException e) {
+ // valid for test
+ }
+ }
+
+ @Test
+ public void testLogSortedProperties() throws Exception {
+ Mutation ignored = new ServerMutation(new Text("ignored"));
+ ignored.put(cf, cq, value);
+ Mutation m = new ServerMutation(new Text("row1"));
+ m.put(cf, cq, value);
+ ConfigurationCopy testConfig = new ConfigurationCopy(DefaultConfiguration.getInstance());
+ String sortFileCompression = "none";
+ // test all the possible properties for tserver.sort.file. prefix
+ String prefix = Property.TSERV_WAL_SORT_FILE_PREFIX.toString();
+ testConfig.set(prefix + "compress.type", sortFileCompression);
+ testConfig.set(prefix + "compress.blocksize", "50K");
+ testConfig.set(prefix + "compress.blocksize.index", "56K");
+ testConfig.set(prefix + "blocksize", "256B");
+ testConfig.set(prefix + "replication", "3");
+ LogSorter sorter = new LogSorter(context, testConfig);
+
+ final String workdir = tempFolder.newFolder().getAbsolutePath();
+
+ try (var vm = VolumeManagerImpl.getLocalForTesting(workdir)) {
+ expect(context.getVolumeManager()).andReturn(vm).anyTimes();
+ expect(context.getCryptoService()).andReturn(CryptoServiceFactory.newDefaultInstance())
+ .anyTimes();
+ expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes();
+ replay(context);
+ final Path workdirPath = new Path("file://" + workdir);
+ vm.deleteRecursively(workdirPath);
+
+ KeyValue[] events =
+ {createKeyValue(OPEN, 0, -1, "1"), createKeyValue(DEFINE_TABLET, 1, 1, extent),
+ createKeyValue(COMPACTION_FINISH, 2, 1, null),
+ createKeyValue(COMPACTION_START, 4, 1, "/t1/f1"),
+ createKeyValue(COMPACTION_FINISH, 5, 1, null),
+ createKeyValue(MUTATION, 3, 1, ignored), createKeyValue(MUTATION, 5, 1, m)};
+ String dest = workdir + "/testLogSortedProperties";
+
+ List<Pair<LogFileKey,LogFileValue>> buffer = new ArrayList<>();
+ int parts = 0;
+ for (KeyValue pair : events) {
+ buffer.add(new Pair<>(pair.key, pair.value));
+ if (buffer.size() >= bufferSize) {
+ sorter.writeBuffer(dest, buffer, parts++);
+ buffer.clear();
+ }
+ }
+ sorter.writeBuffer(dest, buffer, parts);
+ FileSystem fs = vm.getFileSystemByPath(workdirPath);
+
+ // check contents of directory
+ for (var file : fs.listStatus(new Path(dest))) {
+ assertTrue(file.isFile());
+ try (var fileStream = fs.open(file.getPath())) {
+ var algo = getCompressionFromRFile(fileStream, file.getLen());
+ assertEquals(sortFileCompression, algo.getName());
+ }
+ }
+ }
+ }
+
+ /**
+ * Pulled from BCFile.Reader()
+ */
+ private final Utils.Version API_VERSION_3 = new Utils.Version((short) 3, (short) 0);
+ private final String defaultPrefix = "data:";
+
+ private Compression.Algorithm getCompressionFromRFile(FSDataInputStream fsin, long fileLength)
+ throws IOException {
+ try (var in = new SeekableDataInputStream(fsin)) {
+ int magicNumberSize = 16; // BCFile.Magic.size();
+ // Move the cursor to grab the version and the magic first
+ in.seek(fileLength - magicNumberSize - Utils.Version.size());
+ var version = new Utils.Version(in);
+ assertEquals(API_VERSION_3, version);
+ in.readFully(new byte[16]); // BCFile.Magic.readAndVerify(in); // 16 bytes
+ in.seek(fileLength - magicNumberSize - Utils.Version.size() - 16); // 2 * Long.BYTES = 16
+ long offsetIndexMeta = in.readLong();
+ long offsetCryptoParameters = in.readLong();
+ assertTrue(offsetCryptoParameters > 0);
+
+ // read meta index
+ in.seek(offsetIndexMeta);
+ int count = Utils.readVInt(in);
+ assertTrue(count > 0);
+
+ String fullMetaName = Utils.readString(in);
+ if (fullMetaName != null && !fullMetaName.startsWith(defaultPrefix)) {
+ throw new IOException("Corrupted Meta region Index");
+ }
+ return Compression.getCompressionAlgorithmByName(Utils.readString(in));
+ }
+ }
}
diff --git a/test/src/main/java/org/apache/accumulo/test/MultiTableRecoveryIT.java b/test/src/main/java/org/apache/accumulo/test/MultiTableRecoveryIT.java
index e66b380..4f91ec6 100644
--- a/test/src/main/java/org/apache/accumulo/test/MultiTableRecoveryIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/MultiTableRecoveryIT.java
@@ -57,6 +57,8 @@ public class MultiTableRecoveryIT extends ConfigurableMacBase {
// use raw local file system so walogs sync and flush will work
hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+ // test sorted rfile recovery options
+ cfg.setProperty(Property.TSERV_WAL_SORT_FILE_PREFIX + "compress.type", "none");
}
@Override