You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mm...@apache.org on 2021/06/21 13:53:45 UTC

[accumulo] branch main updated: Make sorted recovery write to RFiles (#2117)

This is an automated email from the ASF dual-hosted git repository.

mmiller pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 3e765b1  Make sorted recovery write to RFiles (#2117)
3e765b1 is described below

commit 3e765b1f3b487092206cee036097820c37b7907e
Author: Mike Miller <mm...@apache.org>
AuthorDate: Mon Jun 21 09:53:33 2021 -0400

    Make sorted recovery write to RFiles (#2117)
    
    * Change LogSorter and SortedLogRecovery to write/read RFile instead of MapFile
    * Rewrite LogSorter writeBuffer method and make static to test
    * Create toKey() and fromKey() methods in LogFileKey for converting WAL keys
    * Create toValue() and FromValue() methods in LogFileValue for converting WAL values
    * Byte encode the rows of the Key to sort WAL events properly
    * Make RecoveryLogsIterator convert Key Value pairs on next()
    
    Co-authored-by: Keith Turner <kt...@apache.org>
    Co-authored-by: Christopher Tubbs <ct...@apache.org>
---
 .../org/apache/accumulo/tserver/TabletServer.java  |   6 +-
 .../org/apache/accumulo/tserver/log/LogSorter.java |  64 +++++---
 .../accumulo/tserver/log/RecoveryLogReader.java    |   2 +-
 .../accumulo/tserver/log/RecoveryLogsIterator.java | 124 +++++++++++----
 .../accumulo/tserver/log/SortedLogRecovery.java    |  63 ++++----
 .../accumulo/tserver/log/TabletServerLogger.java   |   9 +-
 .../apache/accumulo/tserver/logger/LogFileKey.java | 167 +++++++++++++++++++++
 .../accumulo/tserver/logger/LogFileValue.java      |  28 ++++
 .../tserver/log/SortedLogRecoveryTest.java         |  79 +++++++---
 .../tserver/log/TestUpgradePathForWALogs.java      |   3 +
 10 files changed, 436 insertions(+), 109 deletions(-)

diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 6887f4a..2ab89fa 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -1133,7 +1133,7 @@ public class TabletServer extends AbstractServer {
 
   public void recover(VolumeManager fs, KeyExtent extent, List<LogEntry> logEntries,
       Set<String> tabletFiles, MutationReceiver mutationReceiver) throws IOException {
-    List<Path> recoveryLogs = new ArrayList<>();
+    List<Path> recoveryDirs = new ArrayList<>();
     List<LogEntry> sorted = new ArrayList<>(logEntries);
     sorted.sort((e1, e2) -> (int) (e1.timestamp - e2.timestamp));
     for (LogEntry entry : sorted) {
@@ -1148,9 +1148,9 @@ public class TabletServer extends AbstractServer {
         throw new IOException(
             "Unable to find recovery files for extent " + extent + " logEntry: " + entry);
       }
-      recoveryLogs.add(recovery);
+      recoveryDirs.add(recovery);
     }
-    logger.recover(fs, extent, recoveryLogs, tabletFiles, mutationReceiver);
+    logger.recover(getContext(), extent, recoveryDirs, tabletFiles, mutationReceiver);
   }
 
   public int createLogId() {
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
index 98fe935..293ff82 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
@@ -23,16 +23,20 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.TreeMap;
 import java.util.concurrent.ThreadPoolExecutor;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
 import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.file.FileOperations;
 import org.apache.accumulo.core.master.thrift.RecoveryStatus;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.threads.ThreadPools;
@@ -47,11 +51,12 @@ import org.apache.accumulo.tserver.logger.LogFileValue;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.MapFile;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+
 public class LogSorter {
 
   private static final Logger log = LoggerFactory.getLogger(LogSorter.class);
@@ -141,7 +146,7 @@ public class LogSorter {
         // Creating a 'finished' marker will cause recovery to proceed normally and the
         // empty file will be correctly ignored downstream.
         fs.mkdirs(new Path(destPath));
-        writeBuffer(destPath, Collections.emptyList(), part++);
+        writeBuffer(context, destPath, Collections.emptyList(), part++);
         fs.create(SortedLogState.getFinishedMarkerPath(destPath)).close();
         return;
       }
@@ -159,10 +164,10 @@ public class LogSorter {
             value.readFields(decryptingInput);
             buffer.add(new Pair<>(key, value));
           }
-          writeBuffer(destPath, buffer, part++);
+          writeBuffer(context, destPath, buffer, part++);
           buffer.clear();
         } catch (EOFException ex) {
-          writeBuffer(destPath, buffer, part++);
+          writeBuffer(context, destPath, buffer, part++);
           break;
         }
       }
@@ -171,21 +176,6 @@ public class LogSorter {
           getSortTime());
     }
 
-    private void writeBuffer(String destPath, List<Pair<LogFileKey,LogFileValue>> buffer, int part)
-        throws IOException {
-      Path path = new Path(destPath, String.format("part-r-%05d", part));
-      FileSystem ns = context.getVolumeManager().getFileSystemByPath(path);
-
-      try (MapFile.Writer output = new MapFile.Writer(ns.getConf(), ns.makeQualified(path),
-          MapFile.Writer.keyClass(LogFileKey.class),
-          MapFile.Writer.valueClass(LogFileValue.class))) {
-        buffer.sort(Comparator.comparing(Pair::getFirst));
-        for (Pair<LogFileKey,LogFileValue> entry : buffer) {
-          output.append(entry.getFirst(), entry.getSecond());
-        }
-      }
-    }
-
     synchronized void close() throws IOException {
       // If we receive an empty or malformed-header WAL, we won't
       // have input streams that need closing. Avoid the NPE.
@@ -224,6 +214,40 @@ public class LogSorter {
     this.walBlockSize = DfsLogger.getWalBlockSize(conf);
   }
 
+  @VisibleForTesting
+  public static void writeBuffer(ServerContext context, String destPath,
+      List<Pair<LogFileKey,LogFileValue>> buffer, int part) throws IOException {
+    String filename = String.format("part-r-%05d.rf", part);
+    Path path = new Path(destPath, filename);
+    FileSystem fs = context.getVolumeManager().getFileSystemByPath(path);
+    Path fullPath = fs.makeQualified(path);
+
+    // convert the LogFileKeys to Keys, sort and collect the mutations
+    Map<Key,List<Mutation>> keyListMap = new TreeMap<>();
+    for (Pair<LogFileKey,LogFileValue> pair : buffer) {
+      var logFileKey = pair.getFirst();
+      var logFileValue = pair.getSecond();
+      Key k = logFileKey.toKey();
+      var list = keyListMap.putIfAbsent(k, logFileValue.mutations);
+      if (list != null) {
+        var muts = new ArrayList<>(list);
+        muts.addAll(logFileValue.mutations);
+        keyListMap.put(logFileKey.toKey(), muts);
+      }
+    }
+
+    try (var writer = FileOperations.getInstance().newWriterBuilder()
+        .forFile(fullPath.toString(), fs, fs.getConf(), context.getCryptoService())
+        .withTableConfiguration(DefaultConfiguration.getInstance()).build()) {
+      writer.startDefaultLocalityGroup();
+      for (var entry : keyListMap.entrySet()) {
+        LogFileValue val = new LogFileValue();
+        val.mutations = entry.getValue();
+        writer.append(entry.getKey(), val.toValue());
+      }
+    }
+  }
+
   public void startWatchingForRecoveryLogs(ThreadPoolExecutor distWorkQThreadPool)
       throws KeeperException, InterruptedException {
     this.threadPool = distWorkQThreadPool;
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogReader.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogReader.java
index 4afa85d..bde5a1c 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogReader.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogReader.java
@@ -138,7 +138,7 @@ public class RecoveryLogReader implements CloseableIterator<Entry<LogFileKey,Log
     }
     if (!foundFinish)
       throw new IOException(
-          "Sort \"" + SortedLogState.FINISHED.getMarker() + "\" flag not found in " + directory);
+          "Sort '" + SortedLogState.FINISHED.getMarker() + "' flag not found in " + directory);
 
     iter = new SortCheckIterator(new RangeIterator(start, end));
   }
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
index d2128f5..0f5e259 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
@@ -19,61 +19,82 @@
 package org.apache.accumulo.tserver.log;
 
 import java.io.IOException;
+import java.util.AbstractMap;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
 
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ScannerBase;
+import org.apache.accumulo.core.client.rfile.RFile;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.log.SortedLogState;
+import org.apache.accumulo.tserver.logger.LogEvents;
 import org.apache.accumulo.tserver.logger.LogFileKey;
 import org.apache.accumulo.tserver.logger.LogFileValue;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Iterators;
-import com.google.common.collect.UnmodifiableIterator;
 
 /**
  * Iterates over multiple sorted recovery logs merging them into a single sorted stream.
  */
-public class RecoveryLogsIterator implements CloseableIterator<Entry<LogFileKey,LogFileValue>> {
+public class RecoveryLogsIterator
+    implements Iterator<Entry<LogFileKey,LogFileValue>>, AutoCloseable {
 
   private static final Logger LOG = LoggerFactory.getLogger(RecoveryLogsIterator.class);
 
-  List<CloseableIterator<Entry<LogFileKey,LogFileValue>>> iterators;
-  private UnmodifiableIterator<Entry<LogFileKey,LogFileValue>> iter;
+  private final List<Scanner> scanners;
+  private final Iterator<Entry<Key,Value>> iter;
 
   /**
-   * Iterates only over keys in the range [start,end].
+   * Scans the files in each recoveryLogDir over the range [start,end].
    */
-  RecoveryLogsIterator(VolumeManager fs, List<Path> recoveryLogPaths, LogFileKey start,
-      LogFileKey end) throws IOException {
+  RecoveryLogsIterator(ServerContext context, List<Path> recoveryLogDirs, LogFileKey start,
+      LogFileKey end, boolean checkFirstKey) throws IOException {
 
-    iterators = new ArrayList<>(recoveryLogPaths.size());
+    List<Iterator<Entry<Key,Value>>> iterators = new ArrayList<>(recoveryLogDirs.size());
+    scanners = new ArrayList<>();
+    Range range = LogFileKey.toRange(start, end);
+    var vm = context.getVolumeManager();
 
-    try {
-      for (Path log : recoveryLogPaths) {
-        LOG.debug("Opening recovery log {}", log.getName());
-        RecoveryLogReader rlr = new RecoveryLogReader(fs, log, start, end);
-        if (rlr.hasNext()) {
+    for (Path logDir : recoveryLogDirs) {
+      LOG.debug("Opening recovery log dir {}", logDir.getName());
+      List<Path> logFiles = getFiles(vm, logDir);
+      var fs = vm.getFileSystemByPath(logDir);
+
+      // only check the first key once to prevent extra iterator creation and seeking
+      if (checkFirstKey) {
+        validateFirstKey(context, fs, logFiles, logDir);
+      }
+
+      for (Path log : logFiles) {
+        var scanner = RFile.newScanner().from(log.toString()).withFileSystem(fs)
+            .withTableProperties(context.getConfiguration()).build();
+
+        scanner.setRange(range);
+        Iterator<Entry<Key,Value>> scanIter = scanner.iterator();
+
+        if (scanIter.hasNext()) {
           LOG.debug("Write ahead log {} has data in range {} {}", log.getName(), start, end);
-          iterators.add(rlr);
+          iterators.add(scanIter);
+          scanners.add(scanner);
         } else {
           LOG.debug("Write ahead log {} has no data in range {} {}", log.getName(), start, end);
-          rlr.close();
+          scanner.close();
         }
       }
-
-      iter = Iterators.mergeSorted(iterators, (o1, o2) -> o1.getKey().compareTo(o2.getKey()));
-
-    } catch (RuntimeException | IOException e) {
-      try {
-        close();
-      } catch (Exception e2) {
-        e.addSuppressed(e2);
-      }
-      throw e;
     }
+    iter = Iterators.mergeSorted(iterators, Entry.comparingByKey());
   }
 
   @Override
@@ -83,7 +104,9 @@ public class RecoveryLogsIterator implements CloseableIterator<Entry<LogFileKey,
 
   @Override
   public Entry<LogFileKey,LogFileValue> next() {
-    return iter.next();
+    Entry<Key,Value> e = iter.next();
+    return new AbstractMap.SimpleImmutableEntry<>(LogFileKey.fromKey(e.getKey()),
+        LogFileValue.fromValue(e.getValue()));
   }
 
   @Override
@@ -93,11 +116,50 @@ public class RecoveryLogsIterator implements CloseableIterator<Entry<LogFileKey,
 
   @Override
   public void close() {
-    for (CloseableIterator<?> reader : iterators) {
-      try {
-        reader.close();
-      } catch (IOException e) {
-        LOG.debug("Failed to close reader", e);
+    scanners.forEach(ScannerBase::close);
+  }
+
+  /**
+   * Check for sorting signal files (finished/failed) and get the logs in the provided directory.
+   */
+  private List<Path> getFiles(VolumeManager fs, Path directory) throws IOException {
+    boolean foundFinish = false;
+    List<Path> logFiles = new ArrayList<>();
+    for (FileStatus child : fs.listStatus(directory)) {
+      if (child.getPath().getName().startsWith("_"))
+        continue;
+      if (SortedLogState.isFinished(child.getPath().getName())) {
+        foundFinish = true;
+        continue;
+      }
+      if (SortedLogState.FAILED.getMarker().equals(child.getPath().getName())) {
+        continue;
+      }
+      FileSystem ns = fs.getFileSystemByPath(child.getPath());
+      Path fullLogPath = ns.makeQualified(child.getPath());
+      logFiles.add(fullLogPath);
+    }
+    if (!foundFinish)
+      throw new IOException(
+          "Sort '" + SortedLogState.FINISHED.getMarker() + "' flag not found in " + directory);
+    return logFiles;
+  }
+
+  /**
+   * Check that the first entry in the WAL is OPEN. Only need to do this once.
+   */
+  private void validateFirstKey(ServerContext context, FileSystem fs, List<Path> logFiles,
+      Path fullLogPath) {
+    try (var scanner =
+        RFile.newScanner().from(logFiles.stream().map(Path::toString).toArray(String[]::new))
+            .withFileSystem(fs).withTableProperties(context.getConfiguration()).build()) {
+      Iterator<Entry<Key,Value>> iterator = scanner.iterator();
+      if (iterator.hasNext()) {
+        Key firstKey = iterator.next().getKey();
+        LogFileKey key = LogFileKey.fromKey(firstKey);
+        if (key.event != LogEvents.OPEN) {
+          throw new IllegalStateException("First log entry is not OPEN " + fullLogPath);
+        }
       }
     }
   }
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
index 9fc425a..432b24a 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
@@ -19,6 +19,7 @@
 package org.apache.accumulo.tserver.log;
 
 import static com.google.common.base.Preconditions.checkState;
+import static java.util.Collections.max;
 import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_FINISH;
 import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_START;
 import static org.apache.accumulo.tserver.logger.LogEvents.DEFINE_TABLET;
@@ -30,6 +31,7 @@ import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -41,7 +43,7 @@ import java.util.Set;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.metadata.RootTable;
-import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.tserver.logger.LogEvents;
 import org.apache.accumulo.tserver.logger.LogFileKey;
 import org.apache.accumulo.tserver.logger.LogFileValue;
@@ -61,10 +63,10 @@ public class SortedLogRecovery {
 
   private static final Logger log = LoggerFactory.getLogger(SortedLogRecovery.class);
 
-  private VolumeManager fs;
+  private final ServerContext context;
 
-  public SortedLogRecovery(VolumeManager fs) {
-    this.fs = fs;
+  public SortedLogRecovery(ServerContext context) {
+    this.context = context;
   }
 
   static LogFileKey maxKey(LogEvents event) {
@@ -98,11 +100,11 @@ public class SortedLogRecovery {
     return key;
   }
 
-  private int findMaxTabletId(KeyExtent extent, List<Path> recoveryLogs) throws IOException {
+  private int findMaxTabletId(KeyExtent extent, List<Path> recoveryLogDirs) throws IOException {
     int tabletId = -1;
 
-    try (RecoveryLogsIterator rli =
-        new RecoveryLogsIterator(fs, recoveryLogs, minKey(DEFINE_TABLET), maxKey(DEFINE_TABLET))) {
+    try (var rli = new RecoveryLogsIterator(context, recoveryLogDirs, minKey(DEFINE_TABLET),
+        maxKey(DEFINE_TABLET), true)) {
 
       KeyExtent alternative = extent;
       if (extent.isRootTablet()) {
@@ -138,24 +140,24 @@ public class SortedLogRecovery {
    *         ID.
    */
   private Entry<Integer,List<Path>> findLogsThatDefineTablet(KeyExtent extent,
-      List<Path> recoveryLogs) throws IOException {
+      List<Path> recoveryDirs) throws IOException {
     Map<Integer,List<Path>> logsThatDefineTablet = new HashMap<>();
 
-    for (Path wal : recoveryLogs) {
-      int tabletId = findMaxTabletId(extent, Collections.singletonList(wal));
+    for (Path walDir : recoveryDirs) {
+      int tabletId = findMaxTabletId(extent, Collections.singletonList(walDir));
       if (tabletId == -1) {
-        log.debug("Did not find tablet {} in recovery log {}", extent, wal.getName());
+        log.debug("Did not find tablet {} in recovery log {}", extent, walDir.getName());
       } else {
-        logsThatDefineTablet.computeIfAbsent(tabletId, k -> new ArrayList<>()).add(wal);
-        log.debug("Found tablet {} with id {} in recovery log {}", extent, tabletId, wal.getName());
+        logsThatDefineTablet.computeIfAbsent(tabletId, k -> new ArrayList<>()).add(walDir);
+        log.debug("Found tablet {} with id {} in recovery log {}", extent, tabletId,
+            walDir.getName());
       }
     }
 
     if (logsThatDefineTablet.isEmpty()) {
-      return new AbstractMap.SimpleEntry<>(-1, Collections.<Path>emptyList());
+      return new AbstractMap.SimpleEntry<>(-1, Collections.emptyList());
     } else {
-      return Collections.max(logsThatDefineTablet.entrySet(),
-          (o1, o2) -> Integer.compare(o1.getKey(), o2.getKey()));
+      return max(logsThatDefineTablet.entrySet(), Comparator.comparingInt(Entry::getKey));
     }
   }
 
@@ -202,8 +204,8 @@ public class SortedLogRecovery {
     long lastFinish = 0;
     long recoverySeq = 0;
 
-    try (RecoveryLogsIterator rli = new RecoveryLogsIterator(fs, recoveryLogs,
-        minKey(COMPACTION_START, tabletId), maxKey(COMPACTION_START, tabletId))) {
+    try (RecoveryLogsIterator rli = new RecoveryLogsIterator(context, recoveryLogs,
+        minKey(COMPACTION_START, tabletId), maxKey(COMPACTION_START, tabletId), false)) {
 
       DeduplicatingIterator ddi = new DeduplicatingIterator(rli);
 
@@ -258,21 +260,22 @@ public class SortedLogRecovery {
 
     LogFileKey end = maxKey(MUTATION, tabletId);
 
-    try (RecoveryLogsIterator rli = new RecoveryLogsIterator(fs, recoveryLogs, start, end)) {
+    try (var rli = new RecoveryLogsIterator(context, recoveryLogs, start, end, false)) {
       while (rli.hasNext()) {
         Entry<LogFileKey,LogFileValue> entry = rli.next();
+        LogFileKey logFileKey = entry.getKey();
 
-        checkState(entry.getKey().tabletId == tabletId); // should only fail if bug elsewhere
-        checkState(entry.getKey().seq >= recoverySeq); // should only fail if bug elsewhere
+        checkState(logFileKey.tabletId == tabletId); // should only fail if bug elsewhere
+        checkState(logFileKey.seq >= recoverySeq); // should only fail if bug elsewhere
 
-        if (entry.getKey().event == MUTATION) {
-          mr.receive(entry.getValue().mutations.get(0));
-        } else if (entry.getKey().event == MANY_MUTATIONS) {
-          for (Mutation m : entry.getValue().mutations) {
+        LogFileValue val = entry.getValue();
+        if (logFileKey.event == MUTATION || logFileKey.event == MANY_MUTATIONS) {
+          log.debug("Recover {} mutation(s) for {}", val.mutations.size(), entry.getKey());
+          for (Mutation m : val.mutations) {
             mr.receive(m);
           }
         } else {
-          throw new IllegalStateException("Non mutation event seen " + entry.getKey().event);
+          throw new IllegalStateException("Non mutation event seen " + logFileKey.event);
         }
       }
     }
@@ -282,10 +285,10 @@ public class SortedLogRecovery {
     return Collections2.transform(recoveryLogs, Path::getName);
   }
 
-  public void recover(KeyExtent extent, List<Path> recoveryLogs, Set<String> tabletFiles,
+  public void recover(KeyExtent extent, List<Path> recoveryDirs, Set<String> tabletFiles,
       MutationReceiver mr) throws IOException {
 
-    Entry<Integer,List<Path>> maxEntry = findLogsThatDefineTablet(extent, recoveryLogs);
+    Entry<Integer,List<Path>> maxEntry = findLogsThatDefineTablet(extent, recoveryDirs);
 
     // A tablet may leave a tserver and then come back, in which case it would have a different and
     // higher tablet id. Only want to consider events in the log related to the last time the tablet
@@ -294,11 +297,11 @@ public class SortedLogRecovery {
     List<Path> logsThatDefineTablet = maxEntry.getValue();
 
     if (tabletId == -1) {
-      log.info("Tablet {} is not defined in recovery logs {} ", extent, asNames(recoveryLogs));
+      log.info("Tablet {} is not defined in recovery logs {} ", extent, asNames(recoveryDirs));
       return;
     } else {
       log.info("Found {} of {} logs with max id {} for tablet {}", logsThatDefineTablet.size(),
-          recoveryLogs.size(), tabletId, extent);
+          recoveryDirs.size(), tabletId, extent);
     }
 
     // Find the seq # for the last compaction that started and finished
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index 0c4e1c2..8d1a113 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@ -44,6 +44,7 @@ import org.apache.accumulo.core.util.Halt;
 import org.apache.accumulo.core.util.threads.ThreadPools;
 import org.apache.accumulo.fate.util.Retry;
 import org.apache.accumulo.fate.util.Retry.RetryFactory;
+import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.replication.StatusUtil;
 import org.apache.accumulo.server.replication.proto.Replication.Status;
@@ -530,11 +531,11 @@ public class TabletServerLogger {
     return seq;
   }
 
-  public void recover(VolumeManager fs, KeyExtent extent, List<Path> logs, Set<String> tabletFiles,
-      MutationReceiver mr) throws IOException {
+  public void recover(ServerContext context, KeyExtent extent, List<Path> recoveryDirs,
+      Set<String> tabletFiles, MutationReceiver mr) throws IOException {
     try {
-      SortedLogRecovery recovery = new SortedLogRecovery(fs);
-      recovery.recover(extent, logs, tabletFiles, mr);
+      SortedLogRecovery recovery = new SortedLogRecovery(context);
+      recovery.recover(extent, recoveryDirs, tabletFiles, mr);
     } catch (Exception e) {
       throw new IOException(e);
     }
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java
index 31dfce7..bf08ca4 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java
@@ -18,6 +18,7 @@
  */
 package org.apache.accumulo.tserver.logger;
 
+import static java.util.Arrays.copyOf;
 import static org.apache.accumulo.tserver.logger.LogEvents.DEFINE_TABLET;
 import static org.apache.accumulo.tserver.logger.LogEvents.MANY_MUTATIONS;
 import static org.apache.accumulo.tserver.logger.LogEvents.MUTATION;
@@ -26,10 +27,18 @@ import static org.apache.accumulo.tserver.logger.LogEvents.OPEN;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.io.UncheckedIOException;
 
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableComparable;
 
+import com.google.common.base.Preconditions;
+
 public class LogFileKey implements WritableComparable<LogFileKey> {
 
   public LogEvents event;
@@ -189,4 +198,162 @@ public class LogFileKey implements WritableComparable<LogFileKey> {
     }
     throw new RuntimeException("Unknown type of entry: " + event);
   }
+
+  /**
+   * Converts LogFileKey to Key. Creates a Key containing all of the LogFileKey fields. The fields
+   * are stored so the Key sorts maintaining the legacy sort order. The row of the Key is composed
+   * of 3 fields: EventNum + tabletID + seq. The EventNum is the byte returned by eventType(). The
+   * column family is always the event. The column qualifier is dependent of the type of event and
+   * could be empty.
+   *
+   * <pre>
+   *     Key Schema:
+   *     Row = EventNum + tabletID + seq
+   *     Family = event
+   *     Qualifier = tserverSession OR filename OR KeyExtent
+   * </pre>
+   */
+  public Key toKey() throws IOException {
+    byte[] formattedRow;
+    String family = event.name();
+    var kb = Key.builder();
+    switch (event) {
+      case OPEN:
+        formattedRow = formatRow(0, 0);
+        return kb.row(formattedRow).family(family).qualifier(tserverSession).build();
+      case COMPACTION_START:
+        formattedRow = formatRow(tabletId, seq);
+        return kb.row(formattedRow).family(family).qualifier(filename).build();
+      case MUTATION:
+      case MANY_MUTATIONS:
+      case COMPACTION_FINISH:
+        return kb.row(formatRow(tabletId, seq)).family(family).build();
+      case DEFINE_TABLET:
+        formattedRow = formatRow(tabletId, seq);
+        DataOutputBuffer buffer = new DataOutputBuffer();
+        tablet.writeTo(buffer);
+        var q = copyOf(buffer.getData(), buffer.getLength());
+        buffer.close();
+        return kb.row(formattedRow).family(family).qualifier(q).build();
+      default:
+        throw new AssertionError("Invalid event type in LogFileKey: " + event);
+    }
+  }
+
+  /**
+   * Get the first byte for the event. The only possible values are 0-4. This is used as the highest
+   * byte in the row.
+   */
+  private byte getEventByte() {
+    int evenTypeInteger = eventType(event);
+    return (byte) (evenTypeInteger & 0xff);
+  }
+
+  /**
+   * Get the byte encoded row for this LogFileKey as a Text object.
+   */
+  private Text formatRow() {
+    return new Text(formatRow(tabletId, seq));
+  }
+
+  /**
+   * Format the row using 13 bytes encoded to allow proper sorting of the RFile Key. The highest
+   * byte is for the event number, 4 bytes for the tabletId and 8 bytes for the sequence long.
+   */
+  private byte[] formatRow(int tabletId, long seq) {
+    byte eventNum = getEventByte();
+    // These will not sort properly when encoded if negative. Negative is not expected currently,
+    // defending against future changes and/or bugs.
+    Preconditions.checkArgument(eventNum >= 0 && seq >= 0);
+    byte[] row = new byte[13];
+    // encode the signed integer so negatives will sort properly for tabletId
+    int encodedTabletId = tabletId ^ 0x80000000;
+
+    row[0] = eventNum;
+    row[1] = (byte) ((encodedTabletId >>> 24) & 0xff);
+    row[2] = (byte) ((encodedTabletId >>> 16) & 0xff);
+    row[3] = (byte) ((encodedTabletId >>> 8) & 0xff);
+    row[4] = (byte) (encodedTabletId & 0xff);
+    row[5] = (byte) (seq >>> 56);
+    row[6] = (byte) (seq >>> 48);
+    row[7] = (byte) (seq >>> 40);
+    row[8] = (byte) (seq >>> 32);
+    row[9] = (byte) (seq >>> 24);
+    row[10] = (byte) (seq >>> 16);
+    row[11] = (byte) (seq >>> 8);
+    row[12] = (byte) (seq); // >>> 0
+    return row;
+  }
+
+  /**
+   * Extract the tabletId integer from the byte encoded Row.
+   */
+  private static int getTabletId(byte[] row) {
+    int encoded = ((row[1] << 24) + (row[2] << 16) + (row[3] << 8) + row[4]);
+    return encoded ^ 0x80000000;
+  }
+
+  /**
+   * Extract the sequence long from the byte encoded Row.
+   */
+  private static long getSequence(byte[] row) {
+    // @formatter:off
+    return (((long) row[5] << 56) +
+            ((long) (row[6] & 255) << 48) +
+            ((long) (row[7] & 255) << 40) +
+            ((long) (row[8] & 255) << 32) +
+            ((long) (row[9] & 255) << 24) +
+            ((row[10] & 255) << 16) +
+            ((row[11] & 255) << 8) +
+            ((row[12] & 255)));
+    // @formatter:on
+  }
+
+  public static Range toRange(LogFileKey start, LogFileKey end) {
+    return new Range(start.formatRow(), end.formatRow());
+  }
+
+  /**
+   * Create LogFileKey from row. Follows schema defined by {@link #toKey()}
+   */
+  public static LogFileKey fromKey(Key key) {
+    var logFileKey = new LogFileKey();
+    byte[] rowParts = key.getRow().getBytes();
+
+    logFileKey.tabletId = getTabletId(rowParts);
+    logFileKey.seq = getSequence(rowParts);
+    logFileKey.event = LogEvents.valueOf(key.getColumnFamilyData().toString());
+    // verify event number in row matches column family
+    if (eventType(logFileKey.event) != rowParts[0]) {
+      throw new AssertionError("Event in row differs from column family. Key: " + key);
+    }
+
+    // handle special cases of what is stored in the qualifier
+    switch (logFileKey.event) {
+      case OPEN:
+        logFileKey.tserverSession = key.getColumnQualifierData().toString();
+        break;
+      case COMPACTION_START:
+        logFileKey.filename = key.getColumnQualifierData().toString();
+        break;
+      case DEFINE_TABLET:
+        try (DataInputBuffer buffer = new DataInputBuffer()) {
+          byte[] bytes = key.getColumnQualifierData().toArray();
+          buffer.reset(bytes, bytes.length);
+          logFileKey.tablet = KeyExtent.readFrom(buffer);
+        } catch (IOException e) {
+          throw new UncheckedIOException(e);
+        }
+        break;
+      case COMPACTION_FINISH:
+      case MANY_MUTATIONS:
+      case MUTATION:
+        // nothing to do
+        break;
+      default:
+        throw new AssertionError("Invalid event type in key: " + key);
+    }
+
+    return logFileKey;
+  }
 }
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileValue.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileValue.java
index 53a5ce9..b4ee431 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileValue.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileValue.java
@@ -20,15 +20,21 @@ package org.apache.accumulo.tserver.logger;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
+import java.io.DataInputStream;
 import java.io.DataOutput;
+import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
 import org.apache.accumulo.core.data.ColumnUpdate;
 import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.server.data.ServerMutation;
 import org.apache.hadoop.io.Writable;
 
@@ -93,4 +99,26 @@ public class LogFileValue implements Writable {
     return format(this, 5);
   }
 
+  /**
+   * Convert list of mutations to a byte array and use to create a Value
+   */
+  public Value toValue() throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    write(new DataOutputStream(baos));
+    return new Value(baos.toByteArray());
+  }
+
+  /**
+   * Get the mutations from the Value
+   */
+  public static LogFileValue fromValue(Value value) {
+    LogFileValue logFileValue = new LogFileValue();
+    try (var bais = new ByteArrayInputStream(value.get())) {
+      logFileValue.readFields(new DataInputStream(bais));
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+    return logFileValue;
+  }
+
 }
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
index c6ce044..2121725 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
@@ -23,6 +23,10 @@ import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_START;
 import static org.apache.accumulo.tserver.logger.LogEvents.DEFINE_TABLET;
 import static org.apache.accumulo.tserver.logger.LogEvents.MUTATION;
 import static org.apache.accumulo.tserver.logger.LogEvents.OPEN;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.reset;
+import static org.easymock.EasyMock.verify;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
@@ -41,10 +45,14 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.TreeMap;
 
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.crypto.CryptoServiceFactory;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.data.ServerMutation;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
 import org.apache.accumulo.server.log.SortedLogState;
@@ -53,9 +61,9 @@ import org.apache.accumulo.tserver.logger.LogFileKey;
 import org.apache.accumulo.tserver.logger.LogFileValue;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.MapFile;
-import org.apache.hadoop.io.MapFile.Writer;
 import org.apache.hadoop.io.Text;
+import org.easymock.EasyMock;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -65,15 +73,22 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 @SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "paths not set by user input")
 public class SortedLogRecoveryTest {
 
+  static final int bufferSize = 5;
   static final KeyExtent extent = new KeyExtent(TableId.of("table"), null, null);
   static final Text cf = new Text("cf");
   static final Text cq = new Text("cq");
   static final Value value = new Value("value");
+  static ServerContext context;
 
   @Rule
   public TemporaryFolder tempFolder =
       new TemporaryFolder(new File(System.getProperty("user.dir") + "/target"));
 
+  @Before
+  public void setup() {
+    context = EasyMock.createMock(ServerContext.class);
+  }
+
   static class KeyValue implements Comparable<KeyValue> {
     public final LogFileKey key;
     public final LogFileValue value;
@@ -138,33 +153,46 @@ public class SortedLogRecoveryTest {
   }
 
   private List<Mutation> recover(Map<String,KeyValue[]> logs, KeyExtent extent) throws IOException {
-    return recover(logs, new HashSet<>(), extent);
+    return recover(logs, new HashSet<>(), extent, bufferSize);
   }
 
-  private List<Mutation> recover(Map<String,KeyValue[]> logs, Set<String> files, KeyExtent extent)
-      throws IOException {
+  private List<Mutation> recover(Map<String,KeyValue[]> logs, Set<String> files, KeyExtent extent,
+      int bufferSize) throws IOException {
+
     final String workdir = tempFolder.newFolder().getAbsolutePath();
     try (var fs = VolumeManagerImpl.getLocalForTesting(workdir)) {
+      expect(context.getVolumeManager()).andReturn(fs).anyTimes();
+      expect(context.getCryptoService()).andReturn(CryptoServiceFactory.newDefaultInstance())
+          .anyTimes();
+      expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes();
+      replay(context);
       final Path workdirPath = new Path("file://" + workdir);
       fs.deleteRecursively(workdirPath);
+
       ArrayList<Path> dirs = new ArrayList<>();
       for (Entry<String,KeyValue[]> entry : logs.entrySet()) {
-        String path = workdir + "/" + entry.getKey();
-        FileSystem ns = fs.getFileSystemByPath(new Path(path));
-        @SuppressWarnings("deprecation")
-        Writer map = new MapFile.Writer(ns.getConf(), ns, path + "/log1", LogFileKey.class,
-            LogFileValue.class);
-        for (KeyValue lfe : entry.getValue()) {
-          map.append(lfe.key, lfe.value);
+        String destPath = workdir + "/" + entry.getKey();
+        FileSystem ns = fs.getFileSystemByPath(new Path(destPath));
+        // convert test object to Pairs for LogSorter, flushing based on bufferSize
+        List<Pair<LogFileKey,LogFileValue>> buffer = new ArrayList<>();
+        int parts = 0;
+        for (KeyValue pair : entry.getValue()) {
+          buffer.add(new Pair<>(pair.key, pair.value));
+          if (buffer.size() >= bufferSize) {
+            LogSorter.writeBuffer(context, destPath, buffer, parts++);
+            buffer.clear();
+          }
         }
-        map.close();
-        ns.create(SortedLogState.getFinishedMarkerPath(path)).close();
-        dirs.add(new Path(path));
+        LogSorter.writeBuffer(context, destPath, buffer, parts);
+
+        ns.create(SortedLogState.getFinishedMarkerPath(destPath)).close();
+        dirs.add(new Path(destPath));
       }
       // Recover
-      SortedLogRecovery recovery = new SortedLogRecovery(fs);
+      SortedLogRecovery recovery = new SortedLogRecovery(context);
       CaptureMutations capture = new CaptureMutations();
       recovery.recover(extent, dirs, files, capture);
+      verify(context);
       return capture.result;
     }
   }
@@ -307,7 +335,6 @@ public class SortedLogRecoveryTest {
     List<Mutation> mutations = recover(logs, extent);
     // Verify recovered data
     assertEquals(0, mutations.size());
-
   }
 
   @Test
@@ -659,7 +686,7 @@ public class SortedLogRecoveryTest {
     Map<String,KeyValue[]> logs = new TreeMap<>();
     logs.put("entries", entries);
 
-    List<Mutation> mutations = recover(logs, Collections.singleton("/t/f1"), extent);
+    List<Mutation> mutations = recover(logs, Collections.singleton("/t/f1"), extent, bufferSize);
 
     assertEquals(0, mutations.size());
   }
@@ -682,7 +709,7 @@ public class SortedLogRecoveryTest {
     Map<String,KeyValue[]> logs = new TreeMap<>();
     logs.put("entries", entries);
 
-    List<Mutation> mutations = recover(logs, Collections.singleton("/t/f1"), extent);
+    List<Mutation> mutations = recover(logs, Collections.singleton("/t/f1"), extent, bufferSize);
 
     assertEquals(1, mutations.size());
     assertEquals(m, mutations.get(0));
@@ -752,6 +779,7 @@ public class SortedLogRecoveryTest {
     assertEquals(1, mutations1.size());
     assertEquals(m2, mutations1.get(0));
 
+    reset(context);
     List<Mutation> mutations2 = recover(logs, e2);
     assertEquals(2, mutations2.size());
     assertEquals(m3, mutations2.get(0));
@@ -762,6 +790,7 @@ public class SortedLogRecoveryTest {
     Arrays.sort(entries2);
     logs.put("entries2", entries2);
 
+    reset(context);
     mutations2 = recover(logs, e2);
     assertEquals(1, mutations2.size());
     assertEquals(m4, mutations2.get(0));
@@ -785,7 +814,7 @@ public class SortedLogRecoveryTest {
 
     HashSet<String> filesSet = new HashSet<>();
     filesSet.addAll(Arrays.asList(tabletFiles));
-    List<Mutation> mutations = recover(logs, filesSet, extent);
+    List<Mutation> mutations = recover(logs, filesSet, extent, bufferSize);
 
     if (startMatches) {
       assertEquals(1, mutations.size());
@@ -802,6 +831,7 @@ public class SortedLogRecoveryTest {
     // test having different paths for the same file. This can happen as a result of upgrade or user
     // changing configuration
     runPathTest(false, "/t1/f1", "/t1/f0");
+    reset(context);
     runPathTest(true, "/t1/f1", "/t1/f0", "/t1/f1");
 
     String[] aliases = {"/t1/f1", "hdfs://nn1/accumulo/tables/8/t1/f1",
@@ -812,9 +842,12 @@ public class SortedLogRecoveryTest {
 
     for (String alias1 : aliases) {
       for (String alias2 : aliases) {
+        reset(context);
         runPathTest(true, alias1, alias2);
         for (String other : others) {
+          reset(context);
           runPathTest(true, alias1, other, alias2);
+          reset(context);
           runPathTest(true, alias1, alias2, other);
         }
       }
@@ -822,6 +855,7 @@ public class SortedLogRecoveryTest {
 
     for (String alias1 : aliases) {
       for (String other : others) {
+        reset(context);
         runPathTest(false, alias1, other);
       }
     }
@@ -972,29 +1006,34 @@ public class SortedLogRecoveryTest {
 
     logs.put("entries2", entries2);
 
+    reset(context);
     mutations = recover(logs, extent);
     assertEquals(1, mutations.size());
     assertEquals(m1, mutations.get(0));
 
     logs.put("entries3", entries3);
 
+    reset(context);
     mutations = recover(logs, extent);
     assertEquals(1, mutations.size());
     assertEquals(m1, mutations.get(0));
 
     logs.put("entries4", entries4);
 
+    reset(context);
     mutations = recover(logs, extent);
     assertEquals(1, mutations.size());
     assertEquals(m1, mutations.get(0));
 
     logs.put("entries5", entries5);
 
+    reset(context);
     mutations = recover(logs, extent);
     assertEquals(0, mutations.size());
 
     logs.put("entries6", entries6);
 
+    reset(context);
     mutations = recover(logs, extent);
     assertEquals(1, mutations.size());
     assertEquals(m2, mutations.get(0));
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/TestUpgradePathForWALogs.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/TestUpgradePathForWALogs.java
index ed88901..23614bd 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/TestUpgradePathForWALogs.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/TestUpgradePathForWALogs.java
@@ -35,6 +35,7 @@ import java.io.OutputStream;
 
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.crypto.CryptoServiceFactory;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
@@ -73,6 +74,8 @@ public class TestUpgradePathForWALogs {
     String path = workDir.getAbsolutePath();
     assertTrue(workDir.delete());
     VolumeManager fs = VolumeManagerImpl.getLocalForTesting(path);
+    expect(context.getCryptoService()).andReturn(CryptoServiceFactory.newDefaultInstance())
+        .anyTimes();
     expect(context.getVolumeManager()).andReturn(fs).anyTimes();
     replay(context);
   }