You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mm...@apache.org on 2021/06/21 13:53:45 UTC
[accumulo] branch main updated: Make sorted recovery write to
RFiles (#2117)
This is an automated email from the ASF dual-hosted git repository.
mmiller pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 3e765b1 Make sorted recovery write to RFiles (#2117)
3e765b1 is described below
commit 3e765b1f3b487092206cee036097820c37b7907e
Author: Mike Miller <mm...@apache.org>
AuthorDate: Mon Jun 21 09:53:33 2021 -0400
Make sorted recovery write to RFiles (#2117)
* Change LogSorter and SortedLogRecovery to write/read RFile instead of MapFile
* Rewrite LogSorter writeBuffer method and make static to test
* Create toKey() and fromKey() methods in LogFileKey for converting WAL keys
* Create toValue() and FromValue() methods in LogFileValue for converting WAL values
* Byte encode the rows of the Key to sort WAL events properly
* Make RecoveryLogsIterator convert Key Value pairs on next()
Co-authored-by: Keith Turner <kt...@apache.org>
Co-authored-by: Christopher Tubbs <ct...@apache.org>
---
.../org/apache/accumulo/tserver/TabletServer.java | 6 +-
.../org/apache/accumulo/tserver/log/LogSorter.java | 64 +++++---
.../accumulo/tserver/log/RecoveryLogReader.java | 2 +-
.../accumulo/tserver/log/RecoveryLogsIterator.java | 124 +++++++++++----
.../accumulo/tserver/log/SortedLogRecovery.java | 63 ++++----
.../accumulo/tserver/log/TabletServerLogger.java | 9 +-
.../apache/accumulo/tserver/logger/LogFileKey.java | 167 +++++++++++++++++++++
.../accumulo/tserver/logger/LogFileValue.java | 28 ++++
.../tserver/log/SortedLogRecoveryTest.java | 79 +++++++---
.../tserver/log/TestUpgradePathForWALogs.java | 3 +
10 files changed, 436 insertions(+), 109 deletions(-)
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 6887f4a..2ab89fa 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -1133,7 +1133,7 @@ public class TabletServer extends AbstractServer {
public void recover(VolumeManager fs, KeyExtent extent, List<LogEntry> logEntries,
Set<String> tabletFiles, MutationReceiver mutationReceiver) throws IOException {
- List<Path> recoveryLogs = new ArrayList<>();
+ List<Path> recoveryDirs = new ArrayList<>();
List<LogEntry> sorted = new ArrayList<>(logEntries);
sorted.sort((e1, e2) -> (int) (e1.timestamp - e2.timestamp));
for (LogEntry entry : sorted) {
@@ -1148,9 +1148,9 @@ public class TabletServer extends AbstractServer {
throw new IOException(
"Unable to find recovery files for extent " + extent + " logEntry: " + entry);
}
- recoveryLogs.add(recovery);
+ recoveryDirs.add(recovery);
}
- logger.recover(fs, extent, recoveryLogs, tabletFiles, mutationReceiver);
+ logger.recover(getContext(), extent, recoveryDirs, tabletFiles, mutationReceiver);
}
public int createLogId() {
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
index 98fe935..293ff82 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
@@ -23,16 +23,20 @@ import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.TreeMap;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.file.FileOperations;
import org.apache.accumulo.core.master.thrift.RecoveryStatus;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.threads.ThreadPools;
@@ -47,11 +51,12 @@ import org.apache.accumulo.tserver.logger.LogFileValue;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.MapFile;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
+
public class LogSorter {
private static final Logger log = LoggerFactory.getLogger(LogSorter.class);
@@ -141,7 +146,7 @@ public class LogSorter {
// Creating a 'finished' marker will cause recovery to proceed normally and the
// empty file will be correctly ignored downstream.
fs.mkdirs(new Path(destPath));
- writeBuffer(destPath, Collections.emptyList(), part++);
+ writeBuffer(context, destPath, Collections.emptyList(), part++);
fs.create(SortedLogState.getFinishedMarkerPath(destPath)).close();
return;
}
@@ -159,10 +164,10 @@ public class LogSorter {
value.readFields(decryptingInput);
buffer.add(new Pair<>(key, value));
}
- writeBuffer(destPath, buffer, part++);
+ writeBuffer(context, destPath, buffer, part++);
buffer.clear();
} catch (EOFException ex) {
- writeBuffer(destPath, buffer, part++);
+ writeBuffer(context, destPath, buffer, part++);
break;
}
}
@@ -171,21 +176,6 @@ public class LogSorter {
getSortTime());
}
- private void writeBuffer(String destPath, List<Pair<LogFileKey,LogFileValue>> buffer, int part)
- throws IOException {
- Path path = new Path(destPath, String.format("part-r-%05d", part));
- FileSystem ns = context.getVolumeManager().getFileSystemByPath(path);
-
- try (MapFile.Writer output = new MapFile.Writer(ns.getConf(), ns.makeQualified(path),
- MapFile.Writer.keyClass(LogFileKey.class),
- MapFile.Writer.valueClass(LogFileValue.class))) {
- buffer.sort(Comparator.comparing(Pair::getFirst));
- for (Pair<LogFileKey,LogFileValue> entry : buffer) {
- output.append(entry.getFirst(), entry.getSecond());
- }
- }
- }
-
synchronized void close() throws IOException {
// If we receive an empty or malformed-header WAL, we won't
// have input streams that need closing. Avoid the NPE.
@@ -224,6 +214,40 @@ public class LogSorter {
this.walBlockSize = DfsLogger.getWalBlockSize(conf);
}
+ @VisibleForTesting
+ public static void writeBuffer(ServerContext context, String destPath,
+ List<Pair<LogFileKey,LogFileValue>> buffer, int part) throws IOException {
+ String filename = String.format("part-r-%05d.rf", part);
+ Path path = new Path(destPath, filename);
+ FileSystem fs = context.getVolumeManager().getFileSystemByPath(path);
+ Path fullPath = fs.makeQualified(path);
+
+ // convert the LogFileKeys to Keys, sort and collect the mutations
+ Map<Key,List<Mutation>> keyListMap = new TreeMap<>();
+ for (Pair<LogFileKey,LogFileValue> pair : buffer) {
+ var logFileKey = pair.getFirst();
+ var logFileValue = pair.getSecond();
+ Key k = logFileKey.toKey();
+ var list = keyListMap.putIfAbsent(k, logFileValue.mutations);
+ if (list != null) {
+ var muts = new ArrayList<>(list);
+ muts.addAll(logFileValue.mutations);
+ keyListMap.put(logFileKey.toKey(), muts);
+ }
+ }
+
+ try (var writer = FileOperations.getInstance().newWriterBuilder()
+ .forFile(fullPath.toString(), fs, fs.getConf(), context.getCryptoService())
+ .withTableConfiguration(DefaultConfiguration.getInstance()).build()) {
+ writer.startDefaultLocalityGroup();
+ for (var entry : keyListMap.entrySet()) {
+ LogFileValue val = new LogFileValue();
+ val.mutations = entry.getValue();
+ writer.append(entry.getKey(), val.toValue());
+ }
+ }
+ }
+
public void startWatchingForRecoveryLogs(ThreadPoolExecutor distWorkQThreadPool)
throws KeeperException, InterruptedException {
this.threadPool = distWorkQThreadPool;
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogReader.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogReader.java
index 4afa85d..bde5a1c 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogReader.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogReader.java
@@ -138,7 +138,7 @@ public class RecoveryLogReader implements CloseableIterator<Entry<LogFileKey,Log
}
if (!foundFinish)
throw new IOException(
- "Sort \"" + SortedLogState.FINISHED.getMarker() + "\" flag not found in " + directory);
+ "Sort '" + SortedLogState.FINISHED.getMarker() + "' flag not found in " + directory);
iter = new SortCheckIterator(new RangeIterator(start, end));
}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
index d2128f5..0f5e259 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
@@ -19,61 +19,82 @@
package org.apache.accumulo.tserver.log;
import java.io.IOException;
+import java.util.AbstractMap;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ScannerBase;
+import org.apache.accumulo.core.client.rfile.RFile;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.log.SortedLogState;
+import org.apache.accumulo.tserver.logger.LogEvents;
import org.apache.accumulo.tserver.logger.LogFileKey;
import org.apache.accumulo.tserver.logger.LogFileValue;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Iterators;
-import com.google.common.collect.UnmodifiableIterator;
/**
* Iterates over multiple sorted recovery logs merging them into a single sorted stream.
*/
-public class RecoveryLogsIterator implements CloseableIterator<Entry<LogFileKey,LogFileValue>> {
+public class RecoveryLogsIterator
+ implements Iterator<Entry<LogFileKey,LogFileValue>>, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(RecoveryLogsIterator.class);
- List<CloseableIterator<Entry<LogFileKey,LogFileValue>>> iterators;
- private UnmodifiableIterator<Entry<LogFileKey,LogFileValue>> iter;
+ private final List<Scanner> scanners;
+ private final Iterator<Entry<Key,Value>> iter;
/**
- * Iterates only over keys in the range [start,end].
+ * Scans the files in each recoveryLogDir over the range [start,end].
*/
- RecoveryLogsIterator(VolumeManager fs, List<Path> recoveryLogPaths, LogFileKey start,
- LogFileKey end) throws IOException {
+ RecoveryLogsIterator(ServerContext context, List<Path> recoveryLogDirs, LogFileKey start,
+ LogFileKey end, boolean checkFirstKey) throws IOException {
- iterators = new ArrayList<>(recoveryLogPaths.size());
+ List<Iterator<Entry<Key,Value>>> iterators = new ArrayList<>(recoveryLogDirs.size());
+ scanners = new ArrayList<>();
+ Range range = LogFileKey.toRange(start, end);
+ var vm = context.getVolumeManager();
- try {
- for (Path log : recoveryLogPaths) {
- LOG.debug("Opening recovery log {}", log.getName());
- RecoveryLogReader rlr = new RecoveryLogReader(fs, log, start, end);
- if (rlr.hasNext()) {
+ for (Path logDir : recoveryLogDirs) {
+ LOG.debug("Opening recovery log dir {}", logDir.getName());
+ List<Path> logFiles = getFiles(vm, logDir);
+ var fs = vm.getFileSystemByPath(logDir);
+
+ // only check the first key once to prevent extra iterator creation and seeking
+ if (checkFirstKey) {
+ validateFirstKey(context, fs, logFiles, logDir);
+ }
+
+ for (Path log : logFiles) {
+ var scanner = RFile.newScanner().from(log.toString()).withFileSystem(fs)
+ .withTableProperties(context.getConfiguration()).build();
+
+ scanner.setRange(range);
+ Iterator<Entry<Key,Value>> scanIter = scanner.iterator();
+
+ if (scanIter.hasNext()) {
LOG.debug("Write ahead log {} has data in range {} {}", log.getName(), start, end);
- iterators.add(rlr);
+ iterators.add(scanIter);
+ scanners.add(scanner);
} else {
LOG.debug("Write ahead log {} has no data in range {} {}", log.getName(), start, end);
- rlr.close();
+ scanner.close();
}
}
-
- iter = Iterators.mergeSorted(iterators, (o1, o2) -> o1.getKey().compareTo(o2.getKey()));
-
- } catch (RuntimeException | IOException e) {
- try {
- close();
- } catch (Exception e2) {
- e.addSuppressed(e2);
- }
- throw e;
}
+ iter = Iterators.mergeSorted(iterators, Entry.comparingByKey());
}
@Override
@@ -83,7 +104,9 @@ public class RecoveryLogsIterator implements CloseableIterator<Entry<LogFileKey,
@Override
public Entry<LogFileKey,LogFileValue> next() {
- return iter.next();
+ Entry<Key,Value> e = iter.next();
+ return new AbstractMap.SimpleImmutableEntry<>(LogFileKey.fromKey(e.getKey()),
+ LogFileValue.fromValue(e.getValue()));
}
@Override
@@ -93,11 +116,50 @@ public class RecoveryLogsIterator implements CloseableIterator<Entry<LogFileKey,
@Override
public void close() {
- for (CloseableIterator<?> reader : iterators) {
- try {
- reader.close();
- } catch (IOException e) {
- LOG.debug("Failed to close reader", e);
+ scanners.forEach(ScannerBase::close);
+ }
+
+ /**
+ * Check for sorting signal files (finished/failed) and get the logs in the provided directory.
+ */
+ private List<Path> getFiles(VolumeManager fs, Path directory) throws IOException {
+ boolean foundFinish = false;
+ List<Path> logFiles = new ArrayList<>();
+ for (FileStatus child : fs.listStatus(directory)) {
+ if (child.getPath().getName().startsWith("_"))
+ continue;
+ if (SortedLogState.isFinished(child.getPath().getName())) {
+ foundFinish = true;
+ continue;
+ }
+ if (SortedLogState.FAILED.getMarker().equals(child.getPath().getName())) {
+ continue;
+ }
+ FileSystem ns = fs.getFileSystemByPath(child.getPath());
+ Path fullLogPath = ns.makeQualified(child.getPath());
+ logFiles.add(fullLogPath);
+ }
+ if (!foundFinish)
+ throw new IOException(
+ "Sort '" + SortedLogState.FINISHED.getMarker() + "' flag not found in " + directory);
+ return logFiles;
+ }
+
+ /**
+ * Check that the first entry in the WAL is OPEN. Only need to do this once.
+ */
+ private void validateFirstKey(ServerContext context, FileSystem fs, List<Path> logFiles,
+ Path fullLogPath) {
+ try (var scanner =
+ RFile.newScanner().from(logFiles.stream().map(Path::toString).toArray(String[]::new))
+ .withFileSystem(fs).withTableProperties(context.getConfiguration()).build()) {
+ Iterator<Entry<Key,Value>> iterator = scanner.iterator();
+ if (iterator.hasNext()) {
+ Key firstKey = iterator.next().getKey();
+ LogFileKey key = LogFileKey.fromKey(firstKey);
+ if (key.event != LogEvents.OPEN) {
+ throw new IllegalStateException("First log entry is not OPEN " + fullLogPath);
+ }
}
}
}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
index 9fc425a..432b24a 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
@@ -19,6 +19,7 @@
package org.apache.accumulo.tserver.log;
import static com.google.common.base.Preconditions.checkState;
+import static java.util.Collections.max;
import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_FINISH;
import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_START;
import static org.apache.accumulo.tserver.logger.LogEvents.DEFINE_TABLET;
@@ -30,6 +31,7 @@ import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -41,7 +43,7 @@ import java.util.Set;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.metadata.RootTable;
-import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.tserver.logger.LogEvents;
import org.apache.accumulo.tserver.logger.LogFileKey;
import org.apache.accumulo.tserver.logger.LogFileValue;
@@ -61,10 +63,10 @@ public class SortedLogRecovery {
private static final Logger log = LoggerFactory.getLogger(SortedLogRecovery.class);
- private VolumeManager fs;
+ private final ServerContext context;
- public SortedLogRecovery(VolumeManager fs) {
- this.fs = fs;
+ public SortedLogRecovery(ServerContext context) {
+ this.context = context;
}
static LogFileKey maxKey(LogEvents event) {
@@ -98,11 +100,11 @@ public class SortedLogRecovery {
return key;
}
- private int findMaxTabletId(KeyExtent extent, List<Path> recoveryLogs) throws IOException {
+ private int findMaxTabletId(KeyExtent extent, List<Path> recoveryLogDirs) throws IOException {
int tabletId = -1;
- try (RecoveryLogsIterator rli =
- new RecoveryLogsIterator(fs, recoveryLogs, minKey(DEFINE_TABLET), maxKey(DEFINE_TABLET))) {
+ try (var rli = new RecoveryLogsIterator(context, recoveryLogDirs, minKey(DEFINE_TABLET),
+ maxKey(DEFINE_TABLET), true)) {
KeyExtent alternative = extent;
if (extent.isRootTablet()) {
@@ -138,24 +140,24 @@ public class SortedLogRecovery {
* ID.
*/
private Entry<Integer,List<Path>> findLogsThatDefineTablet(KeyExtent extent,
- List<Path> recoveryLogs) throws IOException {
+ List<Path> recoveryDirs) throws IOException {
Map<Integer,List<Path>> logsThatDefineTablet = new HashMap<>();
- for (Path wal : recoveryLogs) {
- int tabletId = findMaxTabletId(extent, Collections.singletonList(wal));
+ for (Path walDir : recoveryDirs) {
+ int tabletId = findMaxTabletId(extent, Collections.singletonList(walDir));
if (tabletId == -1) {
- log.debug("Did not find tablet {} in recovery log {}", extent, wal.getName());
+ log.debug("Did not find tablet {} in recovery log {}", extent, walDir.getName());
} else {
- logsThatDefineTablet.computeIfAbsent(tabletId, k -> new ArrayList<>()).add(wal);
- log.debug("Found tablet {} with id {} in recovery log {}", extent, tabletId, wal.getName());
+ logsThatDefineTablet.computeIfAbsent(tabletId, k -> new ArrayList<>()).add(walDir);
+ log.debug("Found tablet {} with id {} in recovery log {}", extent, tabletId,
+ walDir.getName());
}
}
if (logsThatDefineTablet.isEmpty()) {
- return new AbstractMap.SimpleEntry<>(-1, Collections.<Path>emptyList());
+ return new AbstractMap.SimpleEntry<>(-1, Collections.emptyList());
} else {
- return Collections.max(logsThatDefineTablet.entrySet(),
- (o1, o2) -> Integer.compare(o1.getKey(), o2.getKey()));
+ return max(logsThatDefineTablet.entrySet(), Comparator.comparingInt(Entry::getKey));
}
}
@@ -202,8 +204,8 @@ public class SortedLogRecovery {
long lastFinish = 0;
long recoverySeq = 0;
- try (RecoveryLogsIterator rli = new RecoveryLogsIterator(fs, recoveryLogs,
- minKey(COMPACTION_START, tabletId), maxKey(COMPACTION_START, tabletId))) {
+ try (RecoveryLogsIterator rli = new RecoveryLogsIterator(context, recoveryLogs,
+ minKey(COMPACTION_START, tabletId), maxKey(COMPACTION_START, tabletId), false)) {
DeduplicatingIterator ddi = new DeduplicatingIterator(rli);
@@ -258,21 +260,22 @@ public class SortedLogRecovery {
LogFileKey end = maxKey(MUTATION, tabletId);
- try (RecoveryLogsIterator rli = new RecoveryLogsIterator(fs, recoveryLogs, start, end)) {
+ try (var rli = new RecoveryLogsIterator(context, recoveryLogs, start, end, false)) {
while (rli.hasNext()) {
Entry<LogFileKey,LogFileValue> entry = rli.next();
+ LogFileKey logFileKey = entry.getKey();
- checkState(entry.getKey().tabletId == tabletId); // should only fail if bug elsewhere
- checkState(entry.getKey().seq >= recoverySeq); // should only fail if bug elsewhere
+ checkState(logFileKey.tabletId == tabletId); // should only fail if bug elsewhere
+ checkState(logFileKey.seq >= recoverySeq); // should only fail if bug elsewhere
- if (entry.getKey().event == MUTATION) {
- mr.receive(entry.getValue().mutations.get(0));
- } else if (entry.getKey().event == MANY_MUTATIONS) {
- for (Mutation m : entry.getValue().mutations) {
+ LogFileValue val = entry.getValue();
+ if (logFileKey.event == MUTATION || logFileKey.event == MANY_MUTATIONS) {
+ log.debug("Recover {} mutation(s) for {}", val.mutations.size(), entry.getKey());
+ for (Mutation m : val.mutations) {
mr.receive(m);
}
} else {
- throw new IllegalStateException("Non mutation event seen " + entry.getKey().event);
+ throw new IllegalStateException("Non mutation event seen " + logFileKey.event);
}
}
}
@@ -282,10 +285,10 @@ public class SortedLogRecovery {
return Collections2.transform(recoveryLogs, Path::getName);
}
- public void recover(KeyExtent extent, List<Path> recoveryLogs, Set<String> tabletFiles,
+ public void recover(KeyExtent extent, List<Path> recoveryDirs, Set<String> tabletFiles,
MutationReceiver mr) throws IOException {
- Entry<Integer,List<Path>> maxEntry = findLogsThatDefineTablet(extent, recoveryLogs);
+ Entry<Integer,List<Path>> maxEntry = findLogsThatDefineTablet(extent, recoveryDirs);
// A tablet may leave a tserver and then come back, in which case it would have a different and
// higher tablet id. Only want to consider events in the log related to the last time the tablet
@@ -294,11 +297,11 @@ public class SortedLogRecovery {
List<Path> logsThatDefineTablet = maxEntry.getValue();
if (tabletId == -1) {
- log.info("Tablet {} is not defined in recovery logs {} ", extent, asNames(recoveryLogs));
+ log.info("Tablet {} is not defined in recovery logs {} ", extent, asNames(recoveryDirs));
return;
} else {
log.info("Found {} of {} logs with max id {} for tablet {}", logsThatDefineTablet.size(),
- recoveryLogs.size(), tabletId, extent);
+ recoveryDirs.size(), tabletId, extent);
}
// Find the seq # for the last compaction that started and finished
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index 0c4e1c2..8d1a113 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@ -44,6 +44,7 @@ import org.apache.accumulo.core.util.Halt;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.fate.util.Retry;
import org.apache.accumulo.fate.util.Retry.RetryFactory;
+import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.replication.StatusUtil;
import org.apache.accumulo.server.replication.proto.Replication.Status;
@@ -530,11 +531,11 @@ public class TabletServerLogger {
return seq;
}
- public void recover(VolumeManager fs, KeyExtent extent, List<Path> logs, Set<String> tabletFiles,
- MutationReceiver mr) throws IOException {
+ public void recover(ServerContext context, KeyExtent extent, List<Path> recoveryDirs,
+ Set<String> tabletFiles, MutationReceiver mr) throws IOException {
try {
- SortedLogRecovery recovery = new SortedLogRecovery(fs);
- recovery.recover(extent, logs, tabletFiles, mr);
+ SortedLogRecovery recovery = new SortedLogRecovery(context);
+ recovery.recover(extent, recoveryDirs, tabletFiles, mr);
} catch (Exception e) {
throw new IOException(e);
}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java
index 31dfce7..bf08ca4 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java
@@ -18,6 +18,7 @@
*/
package org.apache.accumulo.tserver.logger;
+import static java.util.Arrays.copyOf;
import static org.apache.accumulo.tserver.logger.LogEvents.DEFINE_TABLET;
import static org.apache.accumulo.tserver.logger.LogEvents.MANY_MUTATIONS;
import static org.apache.accumulo.tserver.logger.LogEvents.MUTATION;
@@ -26,10 +27,18 @@ import static org.apache.accumulo.tserver.logger.LogEvents.OPEN;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.io.UncheckedIOException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
+import com.google.common.base.Preconditions;
+
public class LogFileKey implements WritableComparable<LogFileKey> {
public LogEvents event;
@@ -189,4 +198,162 @@ public class LogFileKey implements WritableComparable<LogFileKey> {
}
throw new RuntimeException("Unknown type of entry: " + event);
}
+
+ /**
+ * Converts LogFileKey to Key. Creates a Key containing all of the LogFileKey fields. The fields
+ * are stored so the Key sorts maintaining the legacy sort order. The row of the Key is composed
+ * of 3 fields: EventNum + tabletID + seq. The EventNum is the byte returned by eventType(). The
+ * column family is always the event. The column qualifier is dependent of the type of event and
+ * could be empty.
+ *
+ * <pre>
+ * Key Schema:
+ * Row = EventNum + tabletID + seq
+ * Family = event
+ * Qualifier = tserverSession OR filename OR KeyExtent
+ * </pre>
+ */
+ public Key toKey() throws IOException {
+ byte[] formattedRow;
+ String family = event.name();
+ var kb = Key.builder();
+ switch (event) {
+ case OPEN:
+ formattedRow = formatRow(0, 0);
+ return kb.row(formattedRow).family(family).qualifier(tserverSession).build();
+ case COMPACTION_START:
+ formattedRow = formatRow(tabletId, seq);
+ return kb.row(formattedRow).family(family).qualifier(filename).build();
+ case MUTATION:
+ case MANY_MUTATIONS:
+ case COMPACTION_FINISH:
+ return kb.row(formatRow(tabletId, seq)).family(family).build();
+ case DEFINE_TABLET:
+ formattedRow = formatRow(tabletId, seq);
+ DataOutputBuffer buffer = new DataOutputBuffer();
+ tablet.writeTo(buffer);
+ var q = copyOf(buffer.getData(), buffer.getLength());
+ buffer.close();
+ return kb.row(formattedRow).family(family).qualifier(q).build();
+ default:
+ throw new AssertionError("Invalid event type in LogFileKey: " + event);
+ }
+ }
+
+ /**
+ * Get the first byte for the event. The only possible values are 0-4. This is used as the highest
+ * byte in the row.
+ */
+ private byte getEventByte() {
+ int evenTypeInteger = eventType(event);
+ return (byte) (evenTypeInteger & 0xff);
+ }
+
+ /**
+ * Get the byte encoded row for this LogFileKey as a Text object.
+ */
+ private Text formatRow() {
+ return new Text(formatRow(tabletId, seq));
+ }
+
+ /**
+ * Format the row using 13 bytes encoded to allow proper sorting of the RFile Key. The highest
+ * byte is for the event number, 4 bytes for the tabletId and 8 bytes for the sequence long.
+ */
+ private byte[] formatRow(int tabletId, long seq) {
+ byte eventNum = getEventByte();
+ // These will not sort properly when encoded if negative. Negative is not expected currently,
+ // defending against future changes and/or bugs.
+ Preconditions.checkArgument(eventNum >= 0 && seq >= 0);
+ byte[] row = new byte[13];
+ // encode the signed integer so negatives will sort properly for tabletId
+ int encodedTabletId = tabletId ^ 0x80000000;
+
+ row[0] = eventNum;
+ row[1] = (byte) ((encodedTabletId >>> 24) & 0xff);
+ row[2] = (byte) ((encodedTabletId >>> 16) & 0xff);
+ row[3] = (byte) ((encodedTabletId >>> 8) & 0xff);
+ row[4] = (byte) (encodedTabletId & 0xff);
+ row[5] = (byte) (seq >>> 56);
+ row[6] = (byte) (seq >>> 48);
+ row[7] = (byte) (seq >>> 40);
+ row[8] = (byte) (seq >>> 32);
+ row[9] = (byte) (seq >>> 24);
+ row[10] = (byte) (seq >>> 16);
+ row[11] = (byte) (seq >>> 8);
+ row[12] = (byte) (seq); // >>> 0
+ return row;
+ }
+
+ /**
+ * Extract the tabletId integer from the byte encoded Row.
+ */
+ private static int getTabletId(byte[] row) {
+ int encoded = ((row[1] << 24) + (row[2] << 16) + (row[3] << 8) + row[4]);
+ return encoded ^ 0x80000000;
+ }
+
+ /**
+ * Extract the sequence long from the byte encoded Row.
+ */
+ private static long getSequence(byte[] row) {
+ // @formatter:off
+ return (((long) row[5] << 56) +
+ ((long) (row[6] & 255) << 48) +
+ ((long) (row[7] & 255) << 40) +
+ ((long) (row[8] & 255) << 32) +
+ ((long) (row[9] & 255) << 24) +
+ ((row[10] & 255) << 16) +
+ ((row[11] & 255) << 8) +
+ ((row[12] & 255)));
+ // @formatter:on
+ }
+
+ public static Range toRange(LogFileKey start, LogFileKey end) {
+ return new Range(start.formatRow(), end.formatRow());
+ }
+
+ /**
+ * Create LogFileKey from row. Follows schema defined by {@link #toKey()}
+ */
+ public static LogFileKey fromKey(Key key) {
+ var logFileKey = new LogFileKey();
+ byte[] rowParts = key.getRow().getBytes();
+
+ logFileKey.tabletId = getTabletId(rowParts);
+ logFileKey.seq = getSequence(rowParts);
+ logFileKey.event = LogEvents.valueOf(key.getColumnFamilyData().toString());
+ // verify event number in row matches column family
+ if (eventType(logFileKey.event) != rowParts[0]) {
+ throw new AssertionError("Event in row differs from column family. Key: " + key);
+ }
+
+ // handle special cases of what is stored in the qualifier
+ switch (logFileKey.event) {
+ case OPEN:
+ logFileKey.tserverSession = key.getColumnQualifierData().toString();
+ break;
+ case COMPACTION_START:
+ logFileKey.filename = key.getColumnQualifierData().toString();
+ break;
+ case DEFINE_TABLET:
+ try (DataInputBuffer buffer = new DataInputBuffer()) {
+ byte[] bytes = key.getColumnQualifierData().toArray();
+ buffer.reset(bytes, bytes.length);
+ logFileKey.tablet = KeyExtent.readFrom(buffer);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ break;
+ case COMPACTION_FINISH:
+ case MANY_MUTATIONS:
+ case MUTATION:
+ // nothing to do
+ break;
+ default:
+ throw new AssertionError("Invalid event type in key: " + key);
+ }
+
+ return logFileKey;
+ }
}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileValue.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileValue.java
index 53a5ce9..b4ee431 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileValue.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileValue.java
@@ -20,15 +20,21 @@ package org.apache.accumulo.tserver.logger;
import static java.nio.charset.StandardCharsets.UTF_8;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.DataInput;
+import java.io.DataInputStream;
import java.io.DataOutput;
+import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.accumulo.core.data.ColumnUpdate;
import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.server.data.ServerMutation;
import org.apache.hadoop.io.Writable;
@@ -93,4 +99,26 @@ public class LogFileValue implements Writable {
return format(this, 5);
}
+ /**
+ * Convert list of mutations to a byte array and use to create a Value
+ */
+ public Value toValue() throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ write(new DataOutputStream(baos));
+ return new Value(baos.toByteArray());
+ }
+
+ /**
+ * Get the mutations from the Value
+ */
+ public static LogFileValue fromValue(Value value) {
+ LogFileValue logFileValue = new LogFileValue();
+ try (var bais = new ByteArrayInputStream(value.get())) {
+ logFileValue.readFields(new DataInputStream(bais));
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ return logFileValue;
+ }
+
}
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
index c6ce044..2121725 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
@@ -23,6 +23,10 @@ import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_START;
import static org.apache.accumulo.tserver.logger.LogEvents.DEFINE_TABLET;
import static org.apache.accumulo.tserver.logger.LogEvents.MUTATION;
import static org.apache.accumulo.tserver.logger.LogEvents.OPEN;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.reset;
+import static org.easymock.EasyMock.verify;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
@@ -41,10 +45,14 @@ import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.crypto.CryptoServiceFactory;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.data.ServerMutation;
import org.apache.accumulo.server.fs.VolumeManagerImpl;
import org.apache.accumulo.server.log.SortedLogState;
@@ -53,9 +61,9 @@ import org.apache.accumulo.tserver.logger.LogFileKey;
import org.apache.accumulo.tserver.logger.LogFileValue;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.MapFile;
-import org.apache.hadoop.io.MapFile.Writer;
import org.apache.hadoop.io.Text;
+import org.easymock.EasyMock;
+import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -65,15 +73,22 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
@SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "paths not set by user input")
public class SortedLogRecoveryTest {
+ static final int bufferSize = 5;
static final KeyExtent extent = new KeyExtent(TableId.of("table"), null, null);
static final Text cf = new Text("cf");
static final Text cq = new Text("cq");
static final Value value = new Value("value");
+ static ServerContext context;
@Rule
public TemporaryFolder tempFolder =
new TemporaryFolder(new File(System.getProperty("user.dir") + "/target"));
+ @Before
+ public void setup() {
+ context = EasyMock.createMock(ServerContext.class);
+ }
+
static class KeyValue implements Comparable<KeyValue> {
public final LogFileKey key;
public final LogFileValue value;
@@ -138,33 +153,46 @@ public class SortedLogRecoveryTest {
}
private List<Mutation> recover(Map<String,KeyValue[]> logs, KeyExtent extent) throws IOException {
- return recover(logs, new HashSet<>(), extent);
+ return recover(logs, new HashSet<>(), extent, bufferSize);
}
- private List<Mutation> recover(Map<String,KeyValue[]> logs, Set<String> files, KeyExtent extent)
- throws IOException {
+ private List<Mutation> recover(Map<String,KeyValue[]> logs, Set<String> files, KeyExtent extent,
+ int bufferSize) throws IOException {
+
final String workdir = tempFolder.newFolder().getAbsolutePath();
try (var fs = VolumeManagerImpl.getLocalForTesting(workdir)) {
+ expect(context.getVolumeManager()).andReturn(fs).anyTimes();
+ expect(context.getCryptoService()).andReturn(CryptoServiceFactory.newDefaultInstance())
+ .anyTimes();
+ expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes();
+ replay(context);
final Path workdirPath = new Path("file://" + workdir);
fs.deleteRecursively(workdirPath);
+
ArrayList<Path> dirs = new ArrayList<>();
for (Entry<String,KeyValue[]> entry : logs.entrySet()) {
- String path = workdir + "/" + entry.getKey();
- FileSystem ns = fs.getFileSystemByPath(new Path(path));
- @SuppressWarnings("deprecation")
- Writer map = new MapFile.Writer(ns.getConf(), ns, path + "/log1", LogFileKey.class,
- LogFileValue.class);
- for (KeyValue lfe : entry.getValue()) {
- map.append(lfe.key, lfe.value);
+ String destPath = workdir + "/" + entry.getKey();
+ FileSystem ns = fs.getFileSystemByPath(new Path(destPath));
+ // convert test object to Pairs for LogSorter, flushing based on bufferSize
+ List<Pair<LogFileKey,LogFileValue>> buffer = new ArrayList<>();
+ int parts = 0;
+ for (KeyValue pair : entry.getValue()) {
+ buffer.add(new Pair<>(pair.key, pair.value));
+ if (buffer.size() >= bufferSize) {
+ LogSorter.writeBuffer(context, destPath, buffer, parts++);
+ buffer.clear();
+ }
}
- map.close();
- ns.create(SortedLogState.getFinishedMarkerPath(path)).close();
- dirs.add(new Path(path));
+ LogSorter.writeBuffer(context, destPath, buffer, parts);
+
+ ns.create(SortedLogState.getFinishedMarkerPath(destPath)).close();
+ dirs.add(new Path(destPath));
}
// Recover
- SortedLogRecovery recovery = new SortedLogRecovery(fs);
+ SortedLogRecovery recovery = new SortedLogRecovery(context);
CaptureMutations capture = new CaptureMutations();
recovery.recover(extent, dirs, files, capture);
+ verify(context);
return capture.result;
}
}
@@ -307,7 +335,6 @@ public class SortedLogRecoveryTest {
List<Mutation> mutations = recover(logs, extent);
// Verify recovered data
assertEquals(0, mutations.size());
-
}
@Test
@@ -659,7 +686,7 @@ public class SortedLogRecoveryTest {
Map<String,KeyValue[]> logs = new TreeMap<>();
logs.put("entries", entries);
- List<Mutation> mutations = recover(logs, Collections.singleton("/t/f1"), extent);
+ List<Mutation> mutations = recover(logs, Collections.singleton("/t/f1"), extent, bufferSize);
assertEquals(0, mutations.size());
}
@@ -682,7 +709,7 @@ public class SortedLogRecoveryTest {
Map<String,KeyValue[]> logs = new TreeMap<>();
logs.put("entries", entries);
- List<Mutation> mutations = recover(logs, Collections.singleton("/t/f1"), extent);
+ List<Mutation> mutations = recover(logs, Collections.singleton("/t/f1"), extent, bufferSize);
assertEquals(1, mutations.size());
assertEquals(m, mutations.get(0));
@@ -752,6 +779,7 @@ public class SortedLogRecoveryTest {
assertEquals(1, mutations1.size());
assertEquals(m2, mutations1.get(0));
+ reset(context);
List<Mutation> mutations2 = recover(logs, e2);
assertEquals(2, mutations2.size());
assertEquals(m3, mutations2.get(0));
@@ -762,6 +790,7 @@ public class SortedLogRecoveryTest {
Arrays.sort(entries2);
logs.put("entries2", entries2);
+ reset(context);
mutations2 = recover(logs, e2);
assertEquals(1, mutations2.size());
assertEquals(m4, mutations2.get(0));
@@ -785,7 +814,7 @@ public class SortedLogRecoveryTest {
HashSet<String> filesSet = new HashSet<>();
filesSet.addAll(Arrays.asList(tabletFiles));
- List<Mutation> mutations = recover(logs, filesSet, extent);
+ List<Mutation> mutations = recover(logs, filesSet, extent, bufferSize);
if (startMatches) {
assertEquals(1, mutations.size());
@@ -802,6 +831,7 @@ public class SortedLogRecoveryTest {
// test having different paths for the same file. This can happen as a result of upgrade or user
// changing configuration
runPathTest(false, "/t1/f1", "/t1/f0");
+ reset(context);
runPathTest(true, "/t1/f1", "/t1/f0", "/t1/f1");
String[] aliases = {"/t1/f1", "hdfs://nn1/accumulo/tables/8/t1/f1",
@@ -812,9 +842,12 @@ public class SortedLogRecoveryTest {
for (String alias1 : aliases) {
for (String alias2 : aliases) {
+ reset(context);
runPathTest(true, alias1, alias2);
for (String other : others) {
+ reset(context);
runPathTest(true, alias1, other, alias2);
+ reset(context);
runPathTest(true, alias1, alias2, other);
}
}
@@ -822,6 +855,7 @@ public class SortedLogRecoveryTest {
for (String alias1 : aliases) {
for (String other : others) {
+ reset(context);
runPathTest(false, alias1, other);
}
}
@@ -972,29 +1006,34 @@ public class SortedLogRecoveryTest {
logs.put("entries2", entries2);
+ reset(context);
mutations = recover(logs, extent);
assertEquals(1, mutations.size());
assertEquals(m1, mutations.get(0));
logs.put("entries3", entries3);
+ reset(context);
mutations = recover(logs, extent);
assertEquals(1, mutations.size());
assertEquals(m1, mutations.get(0));
logs.put("entries4", entries4);
+ reset(context);
mutations = recover(logs, extent);
assertEquals(1, mutations.size());
assertEquals(m1, mutations.get(0));
logs.put("entries5", entries5);
+ reset(context);
mutations = recover(logs, extent);
assertEquals(0, mutations.size());
logs.put("entries6", entries6);
+ reset(context);
mutations = recover(logs, extent);
assertEquals(1, mutations.size());
assertEquals(m2, mutations.get(0));
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/TestUpgradePathForWALogs.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/TestUpgradePathForWALogs.java
index ed88901..23614bd 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/TestUpgradePathForWALogs.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/TestUpgradePathForWALogs.java
@@ -35,6 +35,7 @@ import java.io.OutputStream;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.crypto.CryptoServiceFactory;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.fs.VolumeManagerImpl;
@@ -73,6 +74,8 @@ public class TestUpgradePathForWALogs {
String path = workDir.getAbsolutePath();
assertTrue(workDir.delete());
VolumeManager fs = VolumeManagerImpl.getLocalForTesting(path);
+ expect(context.getCryptoService()).andReturn(CryptoServiceFactory.newDefaultInstance())
+ .anyTimes();
expect(context.getVolumeManager()).andReturn(fs).anyTimes();
replay(context);
}