You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@accumulo.apache.org by GitBox <gi...@apache.org> on 2021/05/25 17:42:37 UTC

[GitHub] [accumulo] milleruntime opened a new pull request #2117: Change sorted recovery to write to RFiles

milleruntime opened a new pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117


   * Change LogSorter and SortedLogRecovery to write/read RFile instead of MapFile
   * Rewrite LogSorter writeBuffer method and make static to test
   * Create toKey() and fromKey() methods in LogFileKey for converting WAL keys
   * Create toValue() and FromValue() methods in LogFileValue for converting WAL values
   * Make RecoveryLogsIterator iterate over Key Value pairs


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2117: Change sorted recovery to write to RFiles

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#discussion_r646835544



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java
##########
@@ -189,4 +195,90 @@ public String toString() {
     }
     throw new RuntimeException("Unknown type of entry: " + event);
   }
+
+  /**
+   * Converts LogFileKey to Key. Creates a Key containing all of the LogFileKey fields. The fields
+   * are stored so the Key sorts maintaining the legacy sort order. The row of the Key is composed
+   * of 3 fields: EventNum + tabletID + seq (separated by underscore). The EventNum is the integer
+   * returned by eventType(). The column family is always the event. The column qualifier is
+   * dependent of the type of event and could be empty.
+   *
+   * <pre>
+   *     Key Schema:
+   *     Row = EventNum_tabletID_seq
+   *     Family = event
+   *     Qualifier = tserverSession OR filename OR KeyExtent
+   * </pre>
+   */
+  public Key toKey() throws IOException {
+    String row = "";
+    int eventNum = eventType(event);
+    String family = event.name();
+    String qual = "";
+    switch (event) {
+      case OPEN:
+        row = formatRow(eventNum, 0, 0);
+        qual = tserverSession;
+        break;
+      case COMPACTION_START:
+        row = formatRow(eventNum, tabletId, seq);
+        if (filename != null)

Review comment:
       There was a comment I made last week about using only the row of the key to construct ranges to scan RFiles.  That may help avoid the problem you mentioned, not completely sure though.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2117: Change sorted recovery to write to RFiles

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#discussion_r646664500



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java
##########
@@ -189,4 +195,90 @@ public String toString() {
     }
     throw new RuntimeException("Unknown type of entry: " + event);
   }
+
+  /**
+   * Converts LogFileKey to Key. Creates a Key containing all of the LogFileKey fields. The fields
+   * are stored so the Key sorts maintaining the legacy sort order. The row of the Key is composed
+   * of 3 fields: EventNum + tabletID + seq (separated by underscore). The EventNum is the integer
+   * returned by eventType(). The column family is always the event. The column qualifier is
+   * dependent of the type of event and could be empty.
+   *
+   * <pre>
+   *     Key Schema:
+   *     Row = EventNum_tabletID_seq
+   *     Family = event
+   *     Qualifier = tserverSession OR filename OR KeyExtent
+   * </pre>
+   */
+  public Key toKey() throws IOException {
+    String row = "";
+    int eventNum = eventType(event);
+    String family = event.name();
+    String qual = "";
+    switch (event) {
+      case OPEN:
+        row = formatRow(eventNum, 0, 0);
+        qual = tserverSession;
+        break;
+      case COMPACTION_START:
+        row = formatRow(eventNum, tabletId, seq);
+        if (filename != null)
+          qual = filename;
+        break;
+      case MUTATION:
+      case MANY_MUTATIONS:
+      case COMPACTION_FINISH:
+        row = formatRow(eventNum, tabletId, seq);
+        break;
+      case DEFINE_TABLET:
+        row = formatRow(eventNum, tabletId, seq);
+        // Base64 encode KeyExtent
+        DataOutputBuffer buffer = new DataOutputBuffer();
+        tablet.writeTo(buffer);
+        qual = Base64.getEncoder().encodeToString(copyOf(buffer.getData(), buffer.getLength()));
+        buffer.close();
+        break;
+    }
+    return new Key(new Text(row), new Text(family), new Text(qual));
+  }
+
+  // format row = 1_000001_0000000001
+  private String formatRow(int eventNum, int tabletId, long seq) {
+    return String.format("%d_%06d_%010d", eventNum, tabletId, seq);
+  }

Review comment:
       I had commented above about `FastFormat` before seeing this comment. Something like this is probably better than FastFormat since I am making 2 zero padded strings. I also just arbitrarily chose the lengths of tabletId and sequence but this will allow use of all the digits. I will give this a try, thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2117: Change sorted recovery to write to RFiles

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#discussion_r648604403



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
##########
@@ -224,6 +214,40 @@ public LogSorter(ServerContext context, AccumuloConfiguration conf) {
     this.walBlockSize = DfsLogger.getWalBlockSize(conf);
   }
 
+  @VisibleForTesting
+  public static void writeBuffer(ServerContext context, String destPath,
+      List<Pair<LogFileKey,LogFileValue>> buffer, int part) throws IOException {
+    String filename = String.format("part-r-%05d.rf", part);
+    Path path = new Path(destPath, filename);
+    FileSystem fs = context.getVolumeManager().getFileSystemByPath(path);
+    Path fullPath = fs.makeQualified(path);
+
+    // convert the LogFileKeys to Keys, sort and collect the mutations
+    Map<Key,List<Mutation>> keyListMap = new TreeMap<>();
+    for (Pair<LogFileKey,LogFileValue> pair : buffer) {
+      var logFileKey = pair.getFirst();
+      var logFileValue = pair.getSecond();
+      Key k = logFileKey.toKey();
+      var list = keyListMap.putIfAbsent(k, logFileValue.mutations);
+      if (list != null) {
+        var muts = new ArrayList<>(list);
+        muts.addAll(logFileValue.mutations);
+        keyListMap.put(logFileKey.toKey(), muts);
+      }
+    }
+
+    try (var writer = FileOperations.getInstance().newWriterBuilder()
+        .forFile(fullPath.toString(), fs, fs.getConf(), context.getCryptoService())
+        .withTableConfiguration(DefaultConfiguration.getInstance()).build()) {

Review comment:
       I will open a follow on issue to explore using different compression for recovery.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2117: Change sorted recovery to write to RFiles

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#discussion_r646735312



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java
##########
@@ -189,4 +195,90 @@ public String toString() {
     }
     throw new RuntimeException("Unknown type of entry: " + event);
   }
+
+  /**
+   * Converts LogFileKey to Key. Creates a Key containing all of the LogFileKey fields. The fields
+   * are stored so the Key sorts maintaining the legacy sort order. The row of the Key is composed
+   * of 3 fields: EventNum + tabletID + seq (separated by underscore). The EventNum is the integer
+   * returned by eventType(). The column family is always the event. The column qualifier is
+   * dependent of the type of event and could be empty.
+   *
+   * <pre>
+   *     Key Schema:
+   *     Row = EventNum_tabletID_seq
+   *     Family = event
+   *     Qualifier = tserverSession OR filename OR KeyExtent
+   * </pre>
+   */
+  public Key toKey() throws IOException {
+    String row = "";
+    int eventNum = eventType(event);
+    String family = event.name();
+    String qual = "";
+    switch (event) {
+      case OPEN:
+        row = formatRow(eventNum, 0, 0);
+        qual = tserverSession;
+        break;
+      case COMPACTION_START:
+        row = formatRow(eventNum, tabletId, seq);
+        if (filename != null)

Review comment:
       I agree but I was using the minKey and maxKey methods in `SortedLogRecovery`. I guess I could set the filename to a dummy value to create the range. Otherwise I would have to create the start and end keys for the range manually. I wanted to use the same `toKey()` method for consistency.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2117: Change sorted recovery to write to RFiles

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#discussion_r646830017



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java
##########
@@ -189,4 +195,90 @@ public String toString() {
     }
     throw new RuntimeException("Unknown type of entry: " + event);
   }
+
+  /**
+   * Converts LogFileKey to Key. Creates a Key containing all of the LogFileKey fields. The fields
+   * are stored so the Key sorts maintaining the legacy sort order. The row of the Key is composed
+   * of 3 fields: EventNum + tabletID + seq (separated by underscore). The EventNum is the integer
+   * returned by eventType(). The column family is always the event. The column qualifier is
+   * dependent of the type of event and could be empty.
+   *
+   * <pre>
+   *     Key Schema:
+   *     Row = EventNum_tabletID_seq
+   *     Family = event
+   *     Qualifier = tserverSession OR filename OR KeyExtent
+   * </pre>
+   */
+  public Key toKey() throws IOException {
+    String row = "";
+    int eventNum = eventType(event);
+    String family = event.name();
+    String qual = "";
+    switch (event) {
+      case OPEN:
+        row = formatRow(eventNum, 0, 0);
+        qual = tserverSession;
+        break;
+      case COMPACTION_START:
+        row = formatRow(eventNum, tabletId, seq);
+        if (filename != null)
+          qual = filename;
+        break;
+      case MUTATION:
+      case MANY_MUTATIONS:
+      case COMPACTION_FINISH:
+        row = formatRow(eventNum, tabletId, seq);
+        break;
+      case DEFINE_TABLET:
+        row = formatRow(eventNum, tabletId, seq);
+        // Base64 encode KeyExtent
+        DataOutputBuffer buffer = new DataOutputBuffer();
+        tablet.writeTo(buffer);
+        qual = Base64.getEncoder().encodeToString(copyOf(buffer.getData(), buffer.getLength()));
+        buffer.close();
+        break;
+    }
+    return new Key(new Text(row), new Text(family), new Text(qual));
+  }
+
+  // format row = 1_000001_0000000001
+  private String formatRow(int eventNum, int tabletId, long seq) {
+    return String.format("%d_%06d_%010d", eventNum, tabletId, seq);
+  }

Review comment:
       If possible would be better to avoid Text and use byte[].  Could use Key.builder() if there is a need to mix types like byte[] and String when creating a Key.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on pull request #2117: Change sorted recovery to write to RFiles

Posted by GitBox <gi...@apache.org>.
milleruntime commented on pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#issuecomment-848184710


   Ran full ITs and saw timeout failures in: CreateInitialSplitsIT, SuspendedTabletsIT


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2117: Change sorted recovery to write to RFiles

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#discussion_r651808748



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileValue.java
##########
@@ -93,4 +98,23 @@ public String toString() {
     return format(this, 5);
   }
 
+  /**
+   * Convert list of mutations to a byte array and use to create a Value
+   */
+  public Value toValue() throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();

Review comment:
       @milleruntime  did you open an issue for this?  I looked around and did not see one.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2117: Change sorted recovery to write to RFiles

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#discussion_r646849389



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
##########
@@ -20,60 +20,85 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
 
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ScannerBase;
+import org.apache.accumulo.core.client.rfile.RFile;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.log.SortedLogState;
+import org.apache.accumulo.tserver.logger.LogEvents;
 import org.apache.accumulo.tserver.logger.LogFileKey;
-import org.apache.accumulo.tserver.logger.LogFileValue;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Iterators;
-import com.google.common.collect.UnmodifiableIterator;
 
 /**
  * Iterates over multiple sorted recovery logs merging them into a single sorted stream.
  */
-public class RecoveryLogsIterator implements CloseableIterator<Entry<LogFileKey,LogFileValue>> {
+public class RecoveryLogsIterator implements Iterator<Entry<Key,Value>>, AutoCloseable {

Review comment:
       > Can you point me to the code that does this?
   
   RecoveryLogsIterator.next() could convert from Key,Value, like the following.  Not sure if there is anything using RecoveryLogsIterator that requires a Key/Value though.
   
   ```java
   public Entry<Key,Value> next() {
       Entry<Key,Value> e = iter.next();
       return new AbstractMap.SimpleImmutableEntry(e.toKey(), e.toValue());
     }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on pull request #2117: Change sorted recovery to write to RFiles

Posted by GitBox <gi...@apache.org>.
milleruntime commented on pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#issuecomment-853329203


   I successfully tested the recovery locally using Uno with TestIngest of 50000 rows and using two tservers. Ran all the ITs again and `WriteAheadLogEncryptedIT` was the only one failing. I refactored to pass server context in order to get the configuration and it now passes. 
   
   There may be room for future performance improvements but I think the equivalent functionality of sorting recovery with RFile is ready to merge. This change should take care of #2076 as well but I want to do further testing with encryption since I don't think it works exactly how we want it to yet. I can do that as a follow on task.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2117: Change sorted recovery to write to RFiles

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#discussion_r648555766



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileValue.java
##########
@@ -93,4 +98,23 @@ public String toString() {
     return format(this, 5);
   }
 
+  /**
+   * Convert list of mutations to a byte array and use to create a Value
+   */
+  public Value toValue() throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();

Review comment:
       Or this one, which I think you wrote and I have used before haha: https://github.com/apache/accumulo/blob/83b171d9c4f071a39ef6da1158c34dcf70f83f6a/core/src/main/java/org/apache/accumulo/core/util/UnsynchronizedBuffer.java#L27-L32




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on pull request #2117: Change sorted recovery to write to RFiles

Posted by GitBox <gi...@apache.org>.
milleruntime commented on pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#issuecomment-861540936


   @keith-turner Did you want to take another look at these changes? I think this is ready to merge


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2117: Change sorted recovery to write to RFiles

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#discussion_r647556319



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
##########
@@ -61,56 +66,58 @@
 
   private static final Logger log = LoggerFactory.getLogger(SortedLogRecovery.class);
 
-  private VolumeManager fs;
+  private final ServerContext context;
 
-  public SortedLogRecovery(VolumeManager fs) {
-    this.fs = fs;
+  public SortedLogRecovery(ServerContext context) {
+    this.context = context;
   }
 
-  static LogFileKey maxKey(LogEvents event) {
+  static LogFileKey maxKey(LogEvents event, KeyExtent extent) {
     LogFileKey key = new LogFileKey();
     key.event = event;
     key.tabletId = Integer.MAX_VALUE;
     key.seq = Long.MAX_VALUE;
+    key.tablet = extent;

Review comment:
       I set a dummy value for filename to avoid the NPE.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2117: Change sorted recovery to write to RFiles

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#discussion_r646639714



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java
##########
@@ -189,4 +195,90 @@ public String toString() {
     }
     throw new RuntimeException("Unknown type of entry: " + event);
   }
+
+  /**
+   * Converts LogFileKey to Key. Creates a Key containing all of the LogFileKey fields. The fields
+   * are stored so the Key sorts maintaining the legacy sort order. The row of the Key is composed
+   * of 3 fields: EventNum + tabletID + seq (separated by underscore). The EventNum is the integer
+   * returned by eventType(). The column family is always the event. The column qualifier is
+   * dependent of the type of event and could be empty.
+   *
+   * <pre>
+   *     Key Schema:
+   *     Row = EventNum_tabletID_seq
+   *     Family = event
+   *     Qualifier = tserverSession OR filename OR KeyExtent
+   * </pre>
+   */
+  public Key toKey() throws IOException {
+    String row = "";
+    int eventNum = eventType(event);
+    String family = event.name();
+    String qual = "";
+    switch (event) {
+      case OPEN:
+        row = formatRow(eventNum, 0, 0);
+        qual = tserverSession;
+        break;
+      case COMPACTION_START:
+        row = formatRow(eventNum, tabletId, seq);
+        if (filename != null)
+          qual = filename;
+        break;
+      case MUTATION:
+      case MANY_MUTATIONS:
+      case COMPACTION_FINISH:
+        row = formatRow(eventNum, tabletId, seq);
+        break;
+      case DEFINE_TABLET:
+        row = formatRow(eventNum, tabletId, seq);
+        // Base64 encode KeyExtent
+        DataOutputBuffer buffer = new DataOutputBuffer();
+        tablet.writeTo(buffer);
+        qual = Base64.getEncoder().encodeToString(copyOf(buffer.getData(), buffer.getLength()));
+        buffer.close();
+        break;
+    }
+    return new Key(new Text(row), new Text(family), new Text(qual));
+  }
+
+  // format row = 1_000001_0000000001
+  private String formatRow(int eventNum, int tabletId, long seq) {
+    return String.format("%d_%06d_%010d", eventNum, tabletId, seq);

Review comment:
       Oh yeah I forgot about this. I will make it use `FastFormat`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2117: Change sorted recovery to write to RFiles

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#discussion_r648511140



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
##########
@@ -20,60 +20,85 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
 
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ScannerBase;
+import org.apache.accumulo.core.client.rfile.RFile;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.log.SortedLogState;
+import org.apache.accumulo.tserver.logger.LogEvents;
 import org.apache.accumulo.tserver.logger.LogFileKey;
-import org.apache.accumulo.tserver.logger.LogFileValue;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Iterators;
-import com.google.common.collect.UnmodifiableIterator;
 
 /**
  * Iterates over multiple sorted recovery logs merging them into a single sorted stream.
  */
-public class RecoveryLogsIterator implements CloseableIterator<Entry<LogFileKey,LogFileValue>> {
+public class RecoveryLogsIterator implements Iterator<Entry<Key,Value>>, AutoCloseable {

Review comment:
       I made this change in 6a94527457491cd0ff165122b93bfead6b9de85b. I put a try/catch in the method where the IOException was being thrown because I didn't want to have one for all cases. I am not sure the code is shorter but vaving the conversion only happen in one spot is nice.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2117: Change sorted recovery to write to RFiles

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#discussion_r640779241



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
##########
@@ -93,11 +107,48 @@ public void remove() {
 
   @Override
   public void close() {
-    for (CloseableIterator<?> reader : iterators) {
-      try {
-        reader.close();
-      } catch (IOException e) {
-        LOG.debug("Failed to close reader", e);
+    scanners.forEach(ScannerBase::close);
+  }
+
+  /**
+   * Check for sorting signal files (finished/failed) and get the logs in the provided directory.
+   */
+  private List<Path> getFiles(VolumeManager fs, Path directory) throws IOException {
+    boolean foundFinish = false;
+    List<Path> logFiles = new ArrayList<>();
+    for (FileStatus child : fs.listStatus(directory)) {
+      if (child.getPath().getName().startsWith("_"))
+        continue;
+      if (SortedLogState.isFinished(child.getPath().getName())) {
+        foundFinish = true;
+        continue;
+      }
+      if (SortedLogState.FAILED.getMarker().equals(child.getPath().getName())) {
+        continue;
+      }
+      FileSystem ns = fs.getFileSystemByPath(child.getPath());

Review comment:
       After looking at this some more, I don't think it is a good idea to close the FileSystem that is used here. It is coming from `ServerContext` so more than likely will be shared across the server. This could lead to issues in completely unrelated code using the same object that would be really difficult to track down.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2117: Change sorted recovery to write to RFiles

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#discussion_r647522831



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java
##########
@@ -189,4 +195,90 @@ public String toString() {
     }
     throw new RuntimeException("Unknown type of entry: " + event);
   }
+
+  /**
+   * Converts LogFileKey to Key. Creates a Key containing all of the LogFileKey fields. The fields
+   * are stored so the Key sorts maintaining the legacy sort order. The row of the Key is composed
+   * of 3 fields: EventNum + tabletID + seq (separated by underscore). The EventNum is the integer
+   * returned by eventType(). The column family is always the event. The column qualifier is
+   * dependent of the type of event and could be empty.
+   *
+   * <pre>
+   *     Key Schema:
+   *     Row = EventNum_tabletID_seq
+   *     Family = event
+   *     Qualifier = tserverSession OR filename OR KeyExtent
+   * </pre>
+   */
+  public Key toKey() throws IOException {
+    String row = "";
+    int eventNum = eventType(event);
+    String family = event.name();
+    String qual = "";
+    switch (event) {
+      case OPEN:
+        row = formatRow(eventNum, 0, 0);
+        qual = tserverSession;
+        break;
+      case COMPACTION_START:
+        row = formatRow(eventNum, tabletId, seq);
+        if (filename != null)
+          qual = filename;
+        break;
+      case MUTATION:
+      case MANY_MUTATIONS:
+      case COMPACTION_FINISH:
+        row = formatRow(eventNum, tabletId, seq);
+        break;
+      case DEFINE_TABLET:
+        row = formatRow(eventNum, tabletId, seq);
+        // Base64 encode KeyExtent
+        DataOutputBuffer buffer = new DataOutputBuffer();
+        tablet.writeTo(buffer);
+        qual = Base64.getEncoder().encodeToString(copyOf(buffer.getData(), buffer.getLength()));
+        buffer.close();
+        break;
+    }
+    return new Key(new Text(row), new Text(family), new Text(qual));
+  }
+
+  // format row = 1_000001_0000000001
+  private String formatRow(int eventNum, int tabletId, long seq) {
+    return String.format("%d_%06d_%010d", eventNum, tabletId, seq);
+  }

Review comment:
       FYI it seems to work if I change this value to 0. I am not sure if we use 0 elsewhere though to indicate an initialized state or something but if not, maybe 0 will work to check for this bug.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2117: Change sorted recovery to write to RFiles

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#discussion_r648513383



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
##########
@@ -20,60 +20,85 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
 
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ScannerBase;
+import org.apache.accumulo.core.client.rfile.RFile;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.log.SortedLogState;
+import org.apache.accumulo.tserver.logger.LogEvents;
 import org.apache.accumulo.tserver.logger.LogFileKey;
-import org.apache.accumulo.tserver.logger.LogFileValue;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Iterators;
-import com.google.common.collect.UnmodifiableIterator;
 
 /**
  * Iterates over multiple sorted recovery logs merging them into a single sorted stream.
  */
-public class RecoveryLogsIterator implements CloseableIterator<Entry<LogFileKey,LogFileValue>> {
+public class RecoveryLogsIterator implements Iterator<Entry<Key,Value>>, AutoCloseable {
 
   private static final Logger LOG = LoggerFactory.getLogger(RecoveryLogsIterator.class);
 
-  List<CloseableIterator<Entry<LogFileKey,LogFileValue>>> iterators;
-  private UnmodifiableIterator<Entry<LogFileKey,LogFileValue>> iter;
+  private final List<Scanner> scanners;
+  private final Iterator<Entry<Key,Value>> iter;
 
   /**
-   * Iterates only over keys in the range [start,end].
+   * Scans the files in each recoveryLogDir over the range [start,end].
    */
-  RecoveryLogsIterator(VolumeManager fs, List<Path> recoveryLogPaths, LogFileKey start,
-      LogFileKey end) throws IOException {
+  RecoveryLogsIterator(ServerContext context, List<Path> recoveryLogDirs, LogFileKey start,
+      LogFileKey end, boolean checkFirstKey, LogEvents... colFamToFetch) throws IOException {
 
-    iterators = new ArrayList<>(recoveryLogPaths.size());
+    List<Iterator<Entry<Key,Value>>> iterators = new ArrayList<>(recoveryLogDirs.size());
+    scanners = new ArrayList<>();
+    Key startKey = start.toKey();
+    Key endKey = end.toKey();
+    Range range = new Range(startKey, endKey);

Review comment:
       I did some refactoring in 6a94527457491cd0ff165122b93bfead6b9de85b so I could call formatRow for the range.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2117: Change sorted recovery to write to RFiles

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#discussion_r647334836



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java
##########
@@ -189,4 +195,90 @@ public String toString() {
     }
     throw new RuntimeException("Unknown type of entry: " + event);
   }
+
+  /**
+   * Converts LogFileKey to Key. Creates a Key containing all of the LogFileKey fields. The fields
+   * are stored so the Key sorts maintaining the legacy sort order. The row of the Key is composed
+   * of 3 fields: EventNum + tabletID + seq (separated by underscore). The EventNum is the integer
+   * returned by eventType(). The column family is always the event. The column qualifier is
+   * dependent of the type of event and could be empty.
+   *
+   * <pre>
+   *     Key Schema:
+   *     Row = EventNum_tabletID_seq
+   *     Family = event
+   *     Qualifier = tserverSession OR filename OR KeyExtent
+   * </pre>
+   */
+  public Key toKey() throws IOException {
+    String row = "";
+    int eventNum = eventType(event);
+    String family = event.name();
+    String qual = "";
+    switch (event) {
+      case OPEN:
+        row = formatRow(eventNum, 0, 0);
+        qual = tserverSession;
+        break;
+      case COMPACTION_START:
+        row = formatRow(eventNum, tabletId, seq);
+        if (filename != null)
+          qual = filename;
+        break;
+      case MUTATION:
+      case MANY_MUTATIONS:
+      case COMPACTION_FINISH:
+        row = formatRow(eventNum, tabletId, seq);
+        break;
+      case DEFINE_TABLET:
+        row = formatRow(eventNum, tabletId, seq);
+        // Base64 encode KeyExtent
+        DataOutputBuffer buffer = new DataOutputBuffer();
+        tablet.writeTo(buffer);
+        qual = Base64.getEncoder().encodeToString(copyOf(buffer.getData(), buffer.getLength()));
+        buffer.close();
+        break;
+    }
+    return new Key(new Text(row), new Text(family), new Text(qual));
+  }
+
+  // format row = 1_000001_0000000001
+  private String formatRow(int eventNum, int tabletId, long seq) {
+    return String.format("%d_%06d_%010d", eventNum, tabletId, seq);
+  }

Review comment:
       I am working on this change first and once its working, I will get to your other feedback.
   
   There is a `-1` set for the tabletId to check for a bug but it converts to `%ff;%ff;%ff;%ff`, screwing up the sorting. Maybe we don't need the `-1` since its being changed? 
   https://github.com/apache/accumulo/blob/7457fe5e0f753f6c9a1289963075c7caac89b45c/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java#L93-L96




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2117: Change sorted recovery to write to RFiles

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#discussion_r646830168



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
##########
@@ -20,60 +20,85 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
 
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ScannerBase;
+import org.apache.accumulo.core.client.rfile.RFile;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.log.SortedLogState;
+import org.apache.accumulo.tserver.logger.LogEvents;
 import org.apache.accumulo.tserver.logger.LogFileKey;
-import org.apache.accumulo.tserver.logger.LogFileValue;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Iterators;
-import com.google.common.collect.UnmodifiableIterator;
 
 /**
  * Iterates over multiple sorted recovery logs merging them into a single sorted stream.
  */
-public class RecoveryLogsIterator implements CloseableIterator<Entry<LogFileKey,LogFileValue>> {
+public class RecoveryLogsIterator implements Iterator<Entry<Key,Value>>, AutoCloseable {

Review comment:
       I was using what is returned when reading an RFile using `scanner.iterator()`. How would I iterate over the RFile using LogFileKey and LogFileValue?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] ctubbsii commented on a change in pull request #2117: Make sorted recovery write to RFiles

Posted by GitBox <gi...@apache.org>.
ctubbsii commented on a change in pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#discussion_r656625243



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java
##########
@@ -189,4 +198,162 @@ public String toString() {
     }
     throw new RuntimeException("Unknown type of entry: " + event);
   }
+
+  /**
+   * Converts LogFileKey to Key. Creates a Key containing all of the LogFileKey fields. The fields
+   * are stored so the Key sorts maintaining the legacy sort order. The row of the Key is composed
+   * of 3 fields: EventNum + tabletID + seq. The EventNum is the byte returned by eventType(). The
+   * column family is always the event. The column qualifier is dependent of the type of event and
+   * could be empty.
+   *
+   * <pre>
+   *     Key Schema:
+   *     Row = EventNum + tabletID + seq
+   *     Family = event
+   *     Qualifier = tserverSession OR filename OR KeyExtent
+   * </pre>
+   */
+  public Key toKey() throws IOException {
+    byte[] formattedRow;
+    String family = event.name();
+    var kb = Key.builder();
+    switch (event) {
+      case OPEN:
+        formattedRow = formatRow(0, 0);
+        return kb.row(formattedRow).family(family).qualifier(tserverSession).build();
+      case COMPACTION_START:
+        formattedRow = formatRow(tabletId, seq);
+        return kb.row(formattedRow).family(family).qualifier(filename).build();
+      case MUTATION:
+      case MANY_MUTATIONS:
+      case COMPACTION_FINISH:
+        return kb.row(formatRow(tabletId, seq)).family(family).build();
+      case DEFINE_TABLET:
+        formattedRow = formatRow(tabletId, seq);
+        DataOutputBuffer buffer = new DataOutputBuffer();
+        tablet.writeTo(buffer);
+        var q = copyOf(buffer.getData(), buffer.getLength());
+        buffer.close();
+        return kb.row(formattedRow).family(family).qualifier(q).build();
+      default:
+        throw new AssertionError("Invalid event type in LogFileKey: " + event);
+    }
+  }
+
+  /**
+   * Get the first byte for the event. The only possible values are 0-4. This is used as the highest
+   * byte in the row.
+   */
+  private byte getEventByte() {
+    int evenTypeInteger = eventType(event);
+    return (byte) (evenTypeInteger & 0xff);
+  }
+
+  /**
+   * Get the byte encoded row for this LogFileKey as a Text object.
+   */
+  private Text formatRow() {
+    return new Text(formatRow(tabletId, seq));
+  }
+
+  /**
+   * Format the row using 13 bytes encoded to allow proper sorting of the RFile Key. The highest
+   * byte is for the event number, 4 bytes for the tabletId and 8 bytes for the sequence long.
+   */
+  private byte[] formatRow(int tabletId, long seq) {
+    byte eventNum = getEventByte();
+    // These will not sort properly when encoded if negative. Negative is not expected currently,
+    // defending against future changes and/or bugs.
+    Preconditions.checkArgument(eventNum >= 0 && seq >= 0);
+    byte[] row = new byte[13];
+    // encode the signed integer so negatives will sort properly for tabletId
+    int encodedTabletId = tabletId ^ 0x80000000;
+
+    row[0] = eventNum;
+    row[1] = (byte) ((encodedTabletId >>> 24) & 0xff);
+    row[2] = (byte) ((encodedTabletId >>> 16) & 0xff);
+    row[3] = (byte) ((encodedTabletId >>> 8) & 0xff);
+    row[4] = (byte) (encodedTabletId & 0xff);
+    row[5] = (byte) (seq >>> 56);
+    row[6] = (byte) (seq >>> 48);
+    row[7] = (byte) (seq >>> 40);
+    row[8] = (byte) (seq >>> 32);
+    row[9] = (byte) (seq >>> 24);
+    row[10] = (byte) (seq >>> 16);
+    row[11] = (byte) (seq >>> 8);
+    row[12] = (byte) (seq); // >>> 0
+    return row;
+  }
+
+  /**
+   * Extract the tabletId integer from the byte encoded Row.
+   */
+  private static int getTabletId(byte[] row) {
+    int encoded = ((row[1] << 24) + (row[2] << 16) + (row[3] << 8) + row[4]);

Review comment:
       @milleruntime `(row[1] << 24)` puts the bits of `row[1]` in the leftmost byte of a 4-byte int, possibly making this a negative number, which, when added with `+`, will be incorrect. You should use bitwise `|` operators instead of `+` to ensure the combining these doesn't get messed up when one is negative.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2117: Change sorted recovery to write to RFiles

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#discussion_r647557667



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java
##########
@@ -189,4 +195,90 @@ public String toString() {
     }
     throw new RuntimeException("Unknown type of entry: " + event);
   }
+
+  /**
+   * Converts LogFileKey to Key. Creates a Key containing all of the LogFileKey fields. The fields
+   * are stored so the Key sorts maintaining the legacy sort order. The row of the Key is composed
+   * of 3 fields: EventNum + tabletID + seq (separated by underscore). The EventNum is the integer
+   * returned by eventType(). The column family is always the event. The column qualifier is
+   * dependent of the type of event and could be empty.
+   *
+   * <pre>
+   *     Key Schema:
+   *     Row = EventNum_tabletID_seq
+   *     Family = event
+   *     Qualifier = tserverSession OR filename OR KeyExtent
+   * </pre>
+   */
+  public Key toKey() throws IOException {
+    String row = "";
+    int eventNum = eventType(event);
+    String family = event.name();
+    String qual = "";
+    switch (event) {
+      case OPEN:
+        row = formatRow(eventNum, 0, 0);
+        qual = tserverSession;
+        break;
+      case COMPACTION_START:
+        row = formatRow(eventNum, tabletId, seq);
+        if (filename != null)

Review comment:
       I set a dummy value for the filename for now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2117: Change sorted recovery to write to RFiles

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#discussion_r645846717



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java
##########
@@ -189,4 +195,90 @@ public String toString() {
     }
     throw new RuntimeException("Unknown type of entry: " + event);
   }
+
+  /**
+   * Converts LogFileKey to Key. Creates a Key containing all of the LogFileKey fields. The fields
+   * are stored so the Key sorts maintaining the legacy sort order. The row of the Key is composed
+   * of 3 fields: EventNum + tabletID + seq (separated by underscore). The EventNum is the integer
+   * returned by eventType(). The column family is always the event. The column qualifier is
+   * dependent of the type of event and could be empty.
+   *
+   * <pre>
+   *     Key Schema:
+   *     Row = EventNum_tabletID_seq
+   *     Family = event
+   *     Qualifier = tserverSession OR filename OR KeyExtent
+   * </pre>
+   */
+  public Key toKey() throws IOException {
+    String row = "";
+    int eventNum = eventType(event);
+    String family = event.name();
+    String qual = "";
+    switch (event) {
+      case OPEN:
+        row = formatRow(eventNum, 0, 0);
+        qual = tserverSession;
+        break;
+      case COMPACTION_START:
+        row = formatRow(eventNum, tabletId, seq);
+        if (filename != null)
+          qual = filename;
+        break;
+      case MUTATION:
+      case MANY_MUTATIONS:
+      case COMPACTION_FINISH:
+        row = formatRow(eventNum, tabletId, seq);
+        break;
+      case DEFINE_TABLET:
+        row = formatRow(eventNum, tabletId, seq);
+        // Base64 encode KeyExtent
+        DataOutputBuffer buffer = new DataOutputBuffer();
+        tablet.writeTo(buffer);
+        qual = Base64.getEncoder().encodeToString(copyOf(buffer.getData(), buffer.getLength()));
+        buffer.close();
+        break;
+    }
+    return new Key(new Text(row), new Text(family), new Text(qual));
+  }
+
+  // format row = 1_000001_0000000001
+  private String formatRow(int eventNum, int tabletId, long seq) {
+    return String.format("%d_%06d_%010d", eventNum, tabletId, seq);
+  }

Review comment:
       Could do something like the following will be much faster and more compact.  This would also make decoding the row really fast.  Think it should sort the same.  Also the not sure how `%06` will handle tablet ids over 10 million.  I looked at the source code from DataOutputStream writeLong and writeInt to see what they did.
   
   ```java
   private byte[] formatRow(byte eventNum, int tabletId, long seq){
      byte[] row = new byte[13];
      
      row[0] = eventNum;
      row[1] = ((tabletId >>> 24) & 0xff);
      row[2] = ((tabletId >>> 16) & 0xff);
      row[3] = ((tabletId >>> 8) & 0xff);
      row[4] = (tabletId & 0xff);
      row[5] = (byte)(seq >>> 56);
      row[6] = (byte)(seq >>> 48);
      row[7] = (byte)(seq >>> 40);
      row[8] = (byte)(seq >>> 32);
      row[9] = (byte)(seq >>> 24);
      row[10] = (byte)(seq >>> 16);
      row[11] = (byte)(seq >>>  8);
      row[12] = (byte)(seq >>>  0);
      return row;
   }
   ```

##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java
##########
@@ -189,4 +195,90 @@ public String toString() {
     }
     throw new RuntimeException("Unknown type of entry: " + event);
   }
+
+  /**
+   * Converts LogFileKey to Key. Creates a Key containing all of the LogFileKey fields. The fields
+   * are stored so the Key sorts maintaining the legacy sort order. The row of the Key is composed
+   * of 3 fields: EventNum + tabletID + seq (separated by underscore). The EventNum is the integer
+   * returned by eventType(). The column family is always the event. The column qualifier is
+   * dependent of the type of event and could be empty.
+   *
+   * <pre>
+   *     Key Schema:
+   *     Row = EventNum_tabletID_seq
+   *     Family = event
+   *     Qualifier = tserverSession OR filename OR KeyExtent
+   * </pre>
+   */
+  public Key toKey() throws IOException {
+    String row = "";
+    int eventNum = eventType(event);
+    String family = event.name();
+    String qual = "";
+    switch (event) {
+      case OPEN:
+        row = formatRow(eventNum, 0, 0);
+        qual = tserverSession;
+        break;
+      case COMPACTION_START:
+        row = formatRow(eventNum, tabletId, seq);
+        if (filename != null)

Review comment:
       I think this being null is unexpected for this event type, so if it is null I would say an NPE happen later.  The code that serializes to a DataOutput would throw an NPE if it were null.  Should it happen to be null it would be better to fail before writing than to mask the issue.

##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
##########
@@ -20,60 +20,85 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
 
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ScannerBase;
+import org.apache.accumulo.core.client.rfile.RFile;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.log.SortedLogState;
+import org.apache.accumulo.tserver.logger.LogEvents;
 import org.apache.accumulo.tserver.logger.LogFileKey;
-import org.apache.accumulo.tserver.logger.LogFileValue;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Iterators;
-import com.google.common.collect.UnmodifiableIterator;
 
 /**
  * Iterates over multiple sorted recovery logs merging them into a single sorted stream.
  */
-public class RecoveryLogsIterator implements CloseableIterator<Entry<LogFileKey,LogFileValue>> {
+public class RecoveryLogsIterator implements Iterator<Entry<Key,Value>>, AutoCloseable {

Review comment:
       Could this implement `Iterator<Entry<LogFileKey,LogFileValue>>`?  Seems like the code that uses it always converts Key to LogFileKey.  If that conversion happened in this class maybe it would simplify the code a bit.

##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java
##########
@@ -189,4 +195,90 @@ public String toString() {
     }
     throw new RuntimeException("Unknown type of entry: " + event);
   }
+
+  /**
+   * Converts LogFileKey to Key. Creates a Key containing all of the LogFileKey fields. The fields
+   * are stored so the Key sorts maintaining the legacy sort order. The row of the Key is composed
+   * of 3 fields: EventNum + tabletID + seq (separated by underscore). The EventNum is the integer
+   * returned by eventType(). The column family is always the event. The column qualifier is
+   * dependent of the type of event and could be empty.
+   *
+   * <pre>
+   *     Key Schema:
+   *     Row = EventNum_tabletID_seq
+   *     Family = event
+   *     Qualifier = tserverSession OR filename OR KeyExtent
+   * </pre>
+   */
+  public Key toKey() throws IOException {
+    String row = "";
+    int eventNum = eventType(event);
+    String family = event.name();
+    String qual = "";
+    switch (event) {
+      case OPEN:
+        row = formatRow(eventNum, 0, 0);
+        qual = tserverSession;
+        break;
+      case COMPACTION_START:
+        row = formatRow(eventNum, tabletId, seq);
+        if (filename != null)
+          qual = filename;
+        break;
+      case MUTATION:
+      case MANY_MUTATIONS:
+      case COMPACTION_FINISH:
+        row = formatRow(eventNum, tabletId, seq);
+        break;
+      case DEFINE_TABLET:
+        row = formatRow(eventNum, tabletId, seq);
+        // Base64 encode KeyExtent
+        DataOutputBuffer buffer = new DataOutputBuffer();
+        tablet.writeTo(buffer);
+        qual = Base64.getEncoder().encodeToString(copyOf(buffer.getData(), buffer.getLength()));
+        buffer.close();
+        break;
+    }
+    return new Key(new Text(row), new Text(family), new Text(qual));
+  }
+
+  // format row = 1_000001_0000000001
+  private String formatRow(int eventNum, int tabletId, long seq) {
+    return String.format("%d_%06d_%010d", eventNum, tabletId, seq);
+  }
+
+  /**
+   * Create LogFileKey from row. Follows schema defined by {@link #toKey()}
+   */
+  public static LogFileKey fromKey(Key key) throws IOException {
+    var logFileKey = new LogFileKey();
+    String[] rowParts = key.getRow().toString().split("_");
+    int tabletId = Integer.parseInt(rowParts[1]);
+    long seq = Long.parseLong(rowParts[2]);
+    String qualifier = key.getColumnQualifier().toString();
+
+    logFileKey.tabletId = tabletId;
+    logFileKey.seq = seq;
+    logFileKey.event = LogEvents.valueOf(key.getColumnFamily().toString());
+
+    // handle special cases of what is stored in the qualifier
+    switch (logFileKey.event) {
+      case OPEN:
+        logFileKey.tserverSession = qualifier;
+        break;
+      case COMPACTION_START:
+        logFileKey.filename = qualifier;
+        break;
+      case DEFINE_TABLET:
+        // decode Base64 KeyExtent
+        DataInputBuffer buffer = new DataInputBuffer();
+        byte[] bytes = Base64.getDecoder().decode(qualifier);
+        buffer.reset(bytes, bytes.length);
+        logFileKey.tablet = KeyExtent.readFrom(buffer);
+        buffer.close();
+        break;
+    }

Review comment:
       Could do something like the following to cover all cases.
   
   ```suggestion
   	case COMPACTION_FINISH:
   	case MANY_MUTATIONS:
   	case MUTATION:
   	        // nothing to do
   		break;
   	default:
   		throw new AssertionError();
       }
   ```

##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
##########
@@ -61,56 +66,58 @@
 
   private static final Logger log = LoggerFactory.getLogger(SortedLogRecovery.class);
 
-  private VolumeManager fs;
+  private final ServerContext context;
 
-  public SortedLogRecovery(VolumeManager fs) {
-    this.fs = fs;
+  public SortedLogRecovery(ServerContext context) {
+    this.context = context;
   }
 
-  static LogFileKey maxKey(LogEvents event) {
+  static LogFileKey maxKey(LogEvents event, KeyExtent extent) {
     LogFileKey key = new LogFileKey();
     key.event = event;
     key.tabletId = Integer.MAX_VALUE;
     key.seq = Long.MAX_VALUE;
+    key.tablet = extent;

Review comment:
       If the range is constructed only using the row as I suggested elsewhere, then maybe this could be set to some dummy extent w/ a comment saying its only needed for serialization but is not needed for constructing a range.

##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java
##########
@@ -189,4 +195,90 @@ public String toString() {
     }
     throw new RuntimeException("Unknown type of entry: " + event);
   }
+
+  /**
+   * Converts LogFileKey to Key. Creates a Key containing all of the LogFileKey fields. The fields
+   * are stored so the Key sorts maintaining the legacy sort order. The row of the Key is composed
+   * of 3 fields: EventNum + tabletID + seq (separated by underscore). The EventNum is the integer
+   * returned by eventType(). The column family is always the event. The column qualifier is
+   * dependent of the type of event and could be empty.
+   *
+   * <pre>
+   *     Key Schema:
+   *     Row = EventNum_tabletID_seq
+   *     Family = event
+   *     Qualifier = tserverSession OR filename OR KeyExtent
+   * </pre>
+   */
+  public Key toKey() throws IOException {
+    String row = "";
+    int eventNum = eventType(event);
+    String family = event.name();
+    String qual = "";
+    switch (event) {
+      case OPEN:
+        row = formatRow(eventNum, 0, 0);
+        qual = tserverSession;
+        break;
+      case COMPACTION_START:
+        row = formatRow(eventNum, tabletId, seq);
+        if (filename != null)
+          qual = filename;
+        break;
+      case MUTATION:
+      case MANY_MUTATIONS:
+      case COMPACTION_FINISH:
+        row = formatRow(eventNum, tabletId, seq);
+        break;
+      case DEFINE_TABLET:
+        row = formatRow(eventNum, tabletId, seq);
+        // Base64 encode KeyExtent
+        DataOutputBuffer buffer = new DataOutputBuffer();
+        tablet.writeTo(buffer);
+        qual = Base64.getEncoder().encodeToString(copyOf(buffer.getData(), buffer.getLength()));

Review comment:
       Could put the bytes from encoding the extent in the qual, binary data is ok.  This would avoid base64.

##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileValue.java
##########
@@ -93,4 +98,23 @@ public String toString() {
     return format(this, 5);
   }
 
+  /**
+   * Convert list of mutations to a byte array and use to create a Value
+   */
+  public Value toValue() throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();

Review comment:
       There are not great alternatives, but these built in java types can kill performance because they are heavily synchronized.  Can end up calling lots of sync methods when serializing something. For cases like this where only a single thread will ever use this its annoying.  There may be something else to use, I will have to look around.

##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
##########
@@ -20,60 +20,85 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
 
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ScannerBase;
+import org.apache.accumulo.core.client.rfile.RFile;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.log.SortedLogState;
+import org.apache.accumulo.tserver.logger.LogEvents;
 import org.apache.accumulo.tserver.logger.LogFileKey;
-import org.apache.accumulo.tserver.logger.LogFileValue;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Iterators;
-import com.google.common.collect.UnmodifiableIterator;
 
 /**
  * Iterates over multiple sorted recovery logs merging them into a single sorted stream.
  */
-public class RecoveryLogsIterator implements CloseableIterator<Entry<LogFileKey,LogFileValue>> {
+public class RecoveryLogsIterator implements Iterator<Entry<Key,Value>>, AutoCloseable {
 
   private static final Logger LOG = LoggerFactory.getLogger(RecoveryLogsIterator.class);
 
-  List<CloseableIterator<Entry<LogFileKey,LogFileValue>>> iterators;
-  private UnmodifiableIterator<Entry<LogFileKey,LogFileValue>> iter;
+  private final List<Scanner> scanners;
+  private final Iterator<Entry<Key,Value>> iter;
 
   /**
-   * Iterates only over keys in the range [start,end].
+   * Scans the files in each recoveryLogDir over the range [start,end].
    */
-  RecoveryLogsIterator(VolumeManager fs, List<Path> recoveryLogPaths, LogFileKey start,
-      LogFileKey end) throws IOException {
+  RecoveryLogsIterator(ServerContext context, List<Path> recoveryLogDirs, LogFileKey start,
+      LogFileKey end, boolean checkFirstKey, LogEvents... colFamToFetch) throws IOException {
 
-    iterators = new ArrayList<>(recoveryLogPaths.size());
+    List<Iterator<Entry<Key,Value>>> iterators = new ArrayList<>(recoveryLogDirs.size());
+    scanners = new ArrayList<>();
+    Key startKey = start.toKey();
+    Key endKey = end.toKey();
+    Range range = new Range(startKey, endKey);

Review comment:
       It seems like all of the important information for sorting is encoded in the row and the information in row matches the LogFileKey.compareTo() function.  The other fields of the key have extra information that is not considered in LogFileKey.compareTo().  Therefore I am thinking the range should be consturcted using only the row, to get entire rows and not stop at some arbitrary point in a row based on whats in the key.  Not sure, but I Think this will make these changes behave in the exact same way as the older one.   Without the change below may need to make the maxKey function set an extent that greater than all extents so that the qual is large enough.
   
   ```suggestion
       Range range = new Range(startKey.getRow(), endKey.getRow());
   ```

##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java
##########
@@ -189,4 +195,90 @@ public String toString() {
     }
     throw new RuntimeException("Unknown type of entry: " + event);
   }
+
+  /**
+   * Converts LogFileKey to Key. Creates a Key containing all of the LogFileKey fields. The fields
+   * are stored so the Key sorts maintaining the legacy sort order. The row of the Key is composed
+   * of 3 fields: EventNum + tabletID + seq (separated by underscore). The EventNum is the integer
+   * returned by eventType(). The column family is always the event. The column qualifier is
+   * dependent of the type of event and could be empty.
+   *
+   * <pre>
+   *     Key Schema:
+   *     Row = EventNum_tabletID_seq
+   *     Family = event
+   *     Qualifier = tserverSession OR filename OR KeyExtent
+   * </pre>
+   */
+  public Key toKey() throws IOException {
+    String row = "";
+    int eventNum = eventType(event);
+    String family = event.name();
+    String qual = "";
+    switch (event) {
+      case OPEN:
+        row = formatRow(eventNum, 0, 0);
+        qual = tserverSession;
+        break;
+      case COMPACTION_START:
+        row = formatRow(eventNum, tabletId, seq);
+        if (filename != null)
+          qual = filename;
+        break;
+      case MUTATION:
+      case MANY_MUTATIONS:
+      case COMPACTION_FINISH:
+        row = formatRow(eventNum, tabletId, seq);
+        break;
+      case DEFINE_TABLET:
+        row = formatRow(eventNum, tabletId, seq);
+        // Base64 encode KeyExtent
+        DataOutputBuffer buffer = new DataOutputBuffer();
+        tablet.writeTo(buffer);
+        qual = Base64.getEncoder().encodeToString(copyOf(buffer.getData(), buffer.getLength()));
+        buffer.close();
+        break;
+    }
+    return new Key(new Text(row), new Text(family), new Text(qual));
+  }
+
+  // format row = 1_000001_0000000001
+  private String formatRow(int eventNum, int tabletId, long seq) {
+    return String.format("%d_%06d_%010d", eventNum, tabletId, seq);
+  }
+
+  /**
+   * Create LogFileKey from row. Follows schema defined by {@link #toKey()}
+   */
+  public static LogFileKey fromKey(Key key) throws IOException {
+    var logFileKey = new LogFileKey();
+    String[] rowParts = key.getRow().toString().split("_");
+    int tabletId = Integer.parseInt(rowParts[1]);
+    long seq = Long.parseLong(rowParts[2]);
+    String qualifier = key.getColumnQualifier().toString();
+
+    logFileKey.tabletId = tabletId;
+    logFileKey.seq = seq;
+    logFileKey.event = LogEvents.valueOf(key.getColumnFamily().toString());

Review comment:
       This avoids converting to text which does a copy.
   ```suggestion
       logFileKey.event = LogEvents.valueOf(key.getColumnFamilyData().toString());
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2117: Change sorted recovery to write to RFiles

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#discussion_r652001884



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
##########
@@ -19,61 +19,87 @@
 package org.apache.accumulo.tserver.log;
 
 import java.io.IOException;
+import java.util.AbstractMap;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
 
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ScannerBase;
+import org.apache.accumulo.core.client.rfile.RFile;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.log.SortedLogState;
+import org.apache.accumulo.tserver.logger.LogEvents;
 import org.apache.accumulo.tserver.logger.LogFileKey;
 import org.apache.accumulo.tserver.logger.LogFileValue;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Iterators;
-import com.google.common.collect.UnmodifiableIterator;
 
 /**
  * Iterates over multiple sorted recovery logs merging them into a single sorted stream.
  */
-public class RecoveryLogsIterator implements CloseableIterator<Entry<LogFileKey,LogFileValue>> {
+public class RecoveryLogsIterator
+    implements Iterator<Entry<LogFileKey,LogFileValue>>, AutoCloseable {
 
   private static final Logger LOG = LoggerFactory.getLogger(RecoveryLogsIterator.class);
 
-  List<CloseableIterator<Entry<LogFileKey,LogFileValue>>> iterators;
-  private UnmodifiableIterator<Entry<LogFileKey,LogFileValue>> iter;
+  private final List<Scanner> scanners;
+  private final Iterator<Entry<Key,Value>> iter;
 
   /**
-   * Iterates only over keys in the range [start,end].
+   * Scans the files in each recoveryLogDir over the range [start,end].
    */
-  RecoveryLogsIterator(VolumeManager fs, List<Path> recoveryLogPaths, LogFileKey start,
-      LogFileKey end) throws IOException {
+  RecoveryLogsIterator(ServerContext context, List<Path> recoveryLogDirs, LogFileKey start,
+      LogFileKey end, boolean checkFirstKey, LogEvents... colFamToFetch) throws IOException {
 
-    iterators = new ArrayList<>(recoveryLogPaths.size());
+    List<Iterator<Entry<Key,Value>>> iterators = new ArrayList<>(recoveryLogDirs.size());
+    scanners = new ArrayList<>();
+    Range range = new Range(start.formatRow(), end.formatRow());
+    var vm = context.getVolumeManager();
 
-    try {
-      for (Path log : recoveryLogPaths) {
-        LOG.debug("Opening recovery log {}", log.getName());
-        RecoveryLogReader rlr = new RecoveryLogReader(fs, log, start, end);
-        if (rlr.hasNext()) {
+    for (Path logDir : recoveryLogDirs) {
+      LOG.debug("Opening recovery log dir {}", logDir.getName());
+      List<Path> logFiles = getFiles(vm, logDir);
+      var fs = vm.getFileSystemByPath(logDir);
+
+      // only check the first key once to prevent extra iterator creation and seeking
+      if (checkFirstKey) {
+        validateFirstKey(
+            RFile.newScanner().from(logFiles.stream().map(Path::toString).toArray(String[]::new))

Review comment:
       Not sure if this scanner is closed.  
   
   This would be a follow on issue, thinking now that we are using RFile we could possibly have a dedicated little block cache for RFile recovery.  This could make opening the same rfiles multiple times for recovery purposes much faster.  These rfiles are opened here and later.  However they may also be opened between tablets. 

##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
##########
@@ -19,61 +19,87 @@
 package org.apache.accumulo.tserver.log;
 
 import java.io.IOException;
+import java.util.AbstractMap;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
 
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ScannerBase;
+import org.apache.accumulo.core.client.rfile.RFile;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.log.SortedLogState;
+import org.apache.accumulo.tserver.logger.LogEvents;
 import org.apache.accumulo.tserver.logger.LogFileKey;
 import org.apache.accumulo.tserver.logger.LogFileValue;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Iterators;
-import com.google.common.collect.UnmodifiableIterator;
 
 /**
  * Iterates over multiple sorted recovery logs merging them into a single sorted stream.
  */
-public class RecoveryLogsIterator implements CloseableIterator<Entry<LogFileKey,LogFileValue>> {
+public class RecoveryLogsIterator
+    implements Iterator<Entry<LogFileKey,LogFileValue>>, AutoCloseable {
 
   private static final Logger LOG = LoggerFactory.getLogger(RecoveryLogsIterator.class);
 
-  List<CloseableIterator<Entry<LogFileKey,LogFileValue>>> iterators;
-  private UnmodifiableIterator<Entry<LogFileKey,LogFileValue>> iter;
+  private final List<Scanner> scanners;
+  private final Iterator<Entry<Key,Value>> iter;
 
   /**
-   * Iterates only over keys in the range [start,end].
+   * Scans the files in each recoveryLogDir over the range [start,end].
    */
-  RecoveryLogsIterator(VolumeManager fs, List<Path> recoveryLogPaths, LogFileKey start,
-      LogFileKey end) throws IOException {
+  RecoveryLogsIterator(ServerContext context, List<Path> recoveryLogDirs, LogFileKey start,
+      LogFileKey end, boolean checkFirstKey, LogEvents... colFamToFetch) throws IOException {
 
-    iterators = new ArrayList<>(recoveryLogPaths.size());
+    List<Iterator<Entry<Key,Value>>> iterators = new ArrayList<>(recoveryLogDirs.size());
+    scanners = new ArrayList<>();
+    Range range = new Range(start.formatRow(), end.formatRow());

Review comment:
       Could make formatRow private and add a static method `Range LogFileKey.toRange(LogFileKey start, LogFileKey end)`.  My thinking is that this brings all the code for translating into one place in the LogFileKey class.

##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java
##########
@@ -189,4 +196,158 @@ public String toString() {
     }
     throw new RuntimeException("Unknown type of entry: " + event);
   }
+
+  /**
+   * Converts LogFileKey to Key. Creates a Key containing all of the LogFileKey fields. The fields
+   * are stored so the Key sorts maintaining the legacy sort order. The row of the Key is composed
+   * of 3 fields: EventNum + tabletID + seq. The EventNum is the byte returned by eventType(). The
+   * column family is always the event. The column qualifier is dependent of the type of event and
+   * could be empty.
+   *
+   * <pre>
+   *     Key Schema:
+   *     Row = EventNum + tabletID + seq
+   *     Family = event
+   *     Qualifier = tserverSession OR filename OR KeyExtent
+   * </pre>
+   */
+  public Key toKey() throws IOException {
+    byte[] formattedRow;
+    Text family = new Text(event.name());
+    var kb = Key.builder();
+    switch (event) {
+      case OPEN:
+        formattedRow = formatRow(0, 0);
+        return kb.row(formattedRow).family(family).qualifier(new Text(tserverSession)).build();
+      case COMPACTION_START:
+        formattedRow = formatRow(tabletId, seq);
+        return kb.row(formattedRow).family(family).qualifier(new Text(filename)).build();

Review comment:
       ```suggestion
           return kb.row(formattedRow).family(family).qualifier(filename).build();
   ```

##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java
##########
@@ -189,4 +196,158 @@ public String toString() {
     }
     throw new RuntimeException("Unknown type of entry: " + event);
   }
+
+  /**
+   * Converts LogFileKey to Key. Creates a Key containing all of the LogFileKey fields. The fields
+   * are stored so the Key sorts maintaining the legacy sort order. The row of the Key is composed
+   * of 3 fields: EventNum + tabletID + seq. The EventNum is the byte returned by eventType(). The
+   * column family is always the event. The column qualifier is dependent of the type of event and
+   * could be empty.
+   *
+   * <pre>
+   *     Key Schema:
+   *     Row = EventNum + tabletID + seq
+   *     Family = event
+   *     Qualifier = tserverSession OR filename OR KeyExtent
+   * </pre>
+   */
+  public Key toKey() throws IOException {
+    byte[] formattedRow;
+    Text family = new Text(event.name());
+    var kb = Key.builder();
+    switch (event) {
+      case OPEN:
+        formattedRow = formatRow(0, 0);
+        return kb.row(formattedRow).family(family).qualifier(new Text(tserverSession)).build();
+      case COMPACTION_START:
+        formattedRow = formatRow(tabletId, seq);
+        return kb.row(formattedRow).family(family).qualifier(new Text(filename)).build();
+      case MUTATION:
+      case MANY_MUTATIONS:
+      case COMPACTION_FINISH:
+        return kb.row(formatRow(tabletId, seq)).family(family).build();
+      case DEFINE_TABLET:
+        formattedRow = formatRow(tabletId, seq);
+        DataOutputBuffer buffer = new DataOutputBuffer();
+        tablet.writeTo(buffer);
+        var q = copyOf(buffer.getData(), buffer.getLength());
+        buffer.close();
+        return kb.row(formattedRow).family(family).qualifier(q).build();
+      default:
+        throw new AssertionError("Invalid event type in LogFileKey: " + event);
+    }
+  }
+
+  /**
+   * Get the first byte for the event. The only possible values are 0-4. This is used as the highest
+   * byte in the row.
+   */
+  private byte getEventByte() {
+    int evenTypeInteger = eventType(event);
+    return (byte) (evenTypeInteger & 0xff);
+  }
+
+  /**
+   * Get the byte encoded row for this LogFileKey as a Text object.
+   */
+  public Text formatRow() {
+    return new Text(formatRow(tabletId, seq));
+  }
+
+  /**
+   * Format the row using 13 bytes encoded to allow proper sorting of the RFile Key. The highest
+   * byte is for the event number, 4 bytes for the tabletId and 8 bytes for the sequence long.
+   */
+  private byte[] formatRow(int tabletId, long seq) {
+    byte eventNum = getEventByte();
+    // These will not sort properly when encoded if negative. Negative is not expected currently,
+    // defending against future changes and/or bugs.
+    Preconditions.checkArgument(eventNum >= 0 && seq >= 0);
+    byte[] row = new byte[13];
+    // encode the signed integer so negatives will sort properly for tabletId
+    int encodedTabletId = tabletId ^ 0x80000000;
+
+    row[0] = eventNum;
+    row[1] = (byte) ((encodedTabletId >>> 24) & 0xff);
+    row[2] = (byte) ((encodedTabletId >>> 16) & 0xff);
+    row[3] = (byte) ((encodedTabletId >>> 8) & 0xff);
+    row[4] = (byte) (encodedTabletId & 0xff);
+    row[5] = (byte) (seq >>> 56);
+    row[6] = (byte) (seq >>> 48);
+    row[7] = (byte) (seq >>> 40);
+    row[8] = (byte) (seq >>> 32);
+    row[9] = (byte) (seq >>> 24);
+    row[10] = (byte) (seq >>> 16);
+    row[11] = (byte) (seq >>> 8);
+    row[12] = (byte) (seq); // >>> 0
+    return row;
+  }
+
+  /**
+   * Extract the tabletId integer from the byte encoded Row.
+   */
+  private static int getTabletId(byte[] row) {
+    int encoded = ((row[1] << 24) + (row[2] << 16) + (row[3] << 8) + row[4]);
+    return encoded ^ 0x80000000;
+  }
+
+  /**
+   * Extract the sequence long from the byte encoded Row.
+   */
+  private static long getSequence(byte[] row) {
+    // @formatter:off
+    return (((long) row[5] << 56) +
+            ((long) (row[6] & 255) << 48) +
+            ((long) (row[7] & 255) << 40) +
+            ((long) (row[8] & 255) << 32) +
+            ((long) (row[9] & 255) << 24) +
+            ((row[10] & 255) << 16) +
+            ((row[11] & 255) << 8) +
+            ((row[12] & 255)));
+    // @formatter:on
+  }
+
+  /**
+   * Create LogFileKey from row. Follows schema defined by {@link #toKey()}
+   */
+  public static LogFileKey fromKey(Key key) {
+    var logFileKey = new LogFileKey();
+    byte[] rowParts = key.getRow().getBytes();
+
+    logFileKey.tabletId = getTabletId(rowParts);
+    logFileKey.seq = getSequence(rowParts);
+    logFileKey.event = LogEvents.valueOf(key.getColumnFamilyData().toString());
+    // verify event number in row matches column family
+    if (eventType(logFileKey.event) != rowParts[0]) {
+      throw new AssertionError("Event in row differs from column family. Key: " + key);
+    }
+
+    // handle special cases of what is stored in the qualifier
+    switch (logFileKey.event) {
+      case OPEN:
+        logFileKey.tserverSession = key.getColumnQualifier().toString();
+        break;
+      case COMPACTION_START:
+        logFileKey.filename = key.getColumnQualifier().toString();

Review comment:
       ```suggestion
           logFileKey.filename = key.getColumnQualifierData().toString();
   ```

##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
##########
@@ -19,61 +19,87 @@
 package org.apache.accumulo.tserver.log;
 
 import java.io.IOException;
+import java.util.AbstractMap;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
 
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ScannerBase;
+import org.apache.accumulo.core.client.rfile.RFile;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.log.SortedLogState;
+import org.apache.accumulo.tserver.logger.LogEvents;
 import org.apache.accumulo.tserver.logger.LogFileKey;
 import org.apache.accumulo.tserver.logger.LogFileValue;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Iterators;
-import com.google.common.collect.UnmodifiableIterator;
 
 /**
  * Iterates over multiple sorted recovery logs merging them into a single sorted stream.
  */
-public class RecoveryLogsIterator implements CloseableIterator<Entry<LogFileKey,LogFileValue>> {
+public class RecoveryLogsIterator
+    implements Iterator<Entry<LogFileKey,LogFileValue>>, AutoCloseable {
 
   private static final Logger LOG = LoggerFactory.getLogger(RecoveryLogsIterator.class);
 
-  List<CloseableIterator<Entry<LogFileKey,LogFileValue>>> iterators;
-  private UnmodifiableIterator<Entry<LogFileKey,LogFileValue>> iter;
+  private final List<Scanner> scanners;
+  private final Iterator<Entry<Key,Value>> iter;
 
   /**
-   * Iterates only over keys in the range [start,end].
+   * Scans the files in each recoveryLogDir over the range [start,end].
    */
-  RecoveryLogsIterator(VolumeManager fs, List<Path> recoveryLogPaths, LogFileKey start,
-      LogFileKey end) throws IOException {
+  RecoveryLogsIterator(ServerContext context, List<Path> recoveryLogDirs, LogFileKey start,
+      LogFileKey end, boolean checkFirstKey, LogEvents... colFamToFetch) throws IOException {

Review comment:
       Could colFamToFetch be removed?  The first byte in the row is derived from the event type.  Not sure, but it seems like the columns fetched always correspond to the ones that map to this first byte of the row.  If so then the request to fetch columns is never filtering anything.  If its not needed, removing it would simplify the code.

##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java
##########
@@ -189,4 +196,158 @@ public String toString() {
     }
     throw new RuntimeException("Unknown type of entry: " + event);
   }
+
+  /**
+   * Converts LogFileKey to Key. Creates a Key containing all of the LogFileKey fields. The fields
+   * are stored so the Key sorts maintaining the legacy sort order. The row of the Key is composed
+   * of 3 fields: EventNum + tabletID + seq. The EventNum is the byte returned by eventType(). The
+   * column family is always the event. The column qualifier is dependent of the type of event and
+   * could be empty.
+   *
+   * <pre>
+   *     Key Schema:
+   *     Row = EventNum + tabletID + seq
+   *     Family = event
+   *     Qualifier = tserverSession OR filename OR KeyExtent
+   * </pre>
+   */
+  public Key toKey() throws IOException {
+    byte[] formattedRow;
+    Text family = new Text(event.name());

Review comment:
       Thinking the key builder can handle string
   
   ```suggestion
       String family = event.name();
   ```

##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java
##########
@@ -189,4 +196,158 @@ public String toString() {
     }
     throw new RuntimeException("Unknown type of entry: " + event);
   }
+
+  /**
+   * Converts LogFileKey to Key. Creates a Key containing all of the LogFileKey fields. The fields
+   * are stored so the Key sorts maintaining the legacy sort order. The row of the Key is composed
+   * of 3 fields: EventNum + tabletID + seq. The EventNum is the byte returned by eventType(). The
+   * column family is always the event. The column qualifier is dependent of the type of event and
+   * could be empty.
+   *
+   * <pre>
+   *     Key Schema:
+   *     Row = EventNum + tabletID + seq
+   *     Family = event
+   *     Qualifier = tserverSession OR filename OR KeyExtent
+   * </pre>
+   */
+  public Key toKey() throws IOException {
+    byte[] formattedRow;
+    Text family = new Text(event.name());
+    var kb = Key.builder();
+    switch (event) {
+      case OPEN:
+        formattedRow = formatRow(0, 0);
+        return kb.row(formattedRow).family(family).qualifier(new Text(tserverSession)).build();

Review comment:
       ```suggestion
           return kb.row(formattedRow).family(family).qualifier(tserverSession).build();
   ```

##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
##########
@@ -93,11 +121,45 @@ public void remove() {
 
   @Override
   public void close() {
-    for (CloseableIterator<?> reader : iterators) {
-      try {
-        reader.close();
-      } catch (IOException e) {
-        LOG.debug("Failed to close reader", e);
+    scanners.forEach(ScannerBase::close);
+  }
+
+  /**
+   * Check for sorting signal files (finished/failed) and get the logs in the provided directory.
+   */
+  private List<Path> getFiles(VolumeManager fs, Path directory) throws IOException {
+    boolean foundFinish = false;
+    List<Path> logFiles = new ArrayList<>();
+    for (FileStatus child : fs.listStatus(directory)) {
+      if (child.getPath().getName().startsWith("_"))
+        continue;
+      if (SortedLogState.isFinished(child.getPath().getName())) {
+        foundFinish = true;
+        continue;
+      }
+      if (SortedLogState.FAILED.getMarker().equals(child.getPath().getName())) {

Review comment:
       This seems to just ignore failed markers.  Is this what it did before?  I have a vague memory of an issue around this and we switched to ignoring it, but not sure.

##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java
##########
@@ -189,4 +196,158 @@ public String toString() {
     }
     throw new RuntimeException("Unknown type of entry: " + event);
   }
+
+  /**
+   * Converts LogFileKey to Key. Creates a Key containing all of the LogFileKey fields. The fields
+   * are stored so the Key sorts maintaining the legacy sort order. The row of the Key is composed
+   * of 3 fields: EventNum + tabletID + seq. The EventNum is the byte returned by eventType(). The
+   * column family is always the event. The column qualifier is dependent of the type of event and
+   * could be empty.
+   *
+   * <pre>
+   *     Key Schema:
+   *     Row = EventNum + tabletID + seq
+   *     Family = event
+   *     Qualifier = tserverSession OR filename OR KeyExtent
+   * </pre>
+   */
+  public Key toKey() throws IOException {
+    byte[] formattedRow;
+    Text family = new Text(event.name());
+    var kb = Key.builder();
+    switch (event) {
+      case OPEN:
+        formattedRow = formatRow(0, 0);
+        return kb.row(formattedRow).family(family).qualifier(new Text(tserverSession)).build();
+      case COMPACTION_START:
+        formattedRow = formatRow(tabletId, seq);
+        return kb.row(formattedRow).family(family).qualifier(new Text(filename)).build();
+      case MUTATION:
+      case MANY_MUTATIONS:
+      case COMPACTION_FINISH:
+        return kb.row(formatRow(tabletId, seq)).family(family).build();
+      case DEFINE_TABLET:
+        formattedRow = formatRow(tabletId, seq);
+        DataOutputBuffer buffer = new DataOutputBuffer();
+        tablet.writeTo(buffer);
+        var q = copyOf(buffer.getData(), buffer.getLength());
+        buffer.close();
+        return kb.row(formattedRow).family(family).qualifier(q).build();
+      default:
+        throw new AssertionError("Invalid event type in LogFileKey: " + event);
+    }
+  }
+
+  /**
+   * Get the first byte for the event. The only possible values are 0-4. This is used as the highest
+   * byte in the row.
+   */
+  private byte getEventByte() {
+    int evenTypeInteger = eventType(event);
+    return (byte) (evenTypeInteger & 0xff);
+  }
+
+  /**
+   * Get the byte encoded row for this LogFileKey as a Text object.
+   */
+  public Text formatRow() {
+    return new Text(formatRow(tabletId, seq));
+  }
+
+  /**
+   * Format the row using 13 bytes encoded to allow proper sorting of the RFile Key. The highest
+   * byte is for the event number, 4 bytes for the tabletId and 8 bytes for the sequence long.
+   */
+  private byte[] formatRow(int tabletId, long seq) {
+    byte eventNum = getEventByte();
+    // These will not sort properly when encoded if negative. Negative is not expected currently,
+    // defending against future changes and/or bugs.
+    Preconditions.checkArgument(eventNum >= 0 && seq >= 0);
+    byte[] row = new byte[13];
+    // encode the signed integer so negatives will sort properly for tabletId
+    int encodedTabletId = tabletId ^ 0x80000000;
+
+    row[0] = eventNum;
+    row[1] = (byte) ((encodedTabletId >>> 24) & 0xff);
+    row[2] = (byte) ((encodedTabletId >>> 16) & 0xff);
+    row[3] = (byte) ((encodedTabletId >>> 8) & 0xff);
+    row[4] = (byte) (encodedTabletId & 0xff);
+    row[5] = (byte) (seq >>> 56);
+    row[6] = (byte) (seq >>> 48);
+    row[7] = (byte) (seq >>> 40);
+    row[8] = (byte) (seq >>> 32);
+    row[9] = (byte) (seq >>> 24);
+    row[10] = (byte) (seq >>> 16);
+    row[11] = (byte) (seq >>> 8);
+    row[12] = (byte) (seq); // >>> 0
+    return row;
+  }
+
+  /**
+   * Extract the tabletId integer from the byte encoded Row.
+   */
+  private static int getTabletId(byte[] row) {
+    int encoded = ((row[1] << 24) + (row[2] << 16) + (row[3] << 8) + row[4]);
+    return encoded ^ 0x80000000;
+  }
+
+  /**
+   * Extract the sequence long from the byte encoded Row.
+   */
+  private static long getSequence(byte[] row) {
+    // @formatter:off
+    return (((long) row[5] << 56) +
+            ((long) (row[6] & 255) << 48) +
+            ((long) (row[7] & 255) << 40) +
+            ((long) (row[8] & 255) << 32) +
+            ((long) (row[9] & 255) << 24) +
+            ((row[10] & 255) << 16) +
+            ((row[11] & 255) << 8) +
+            ((row[12] & 255)));
+    // @formatter:on
+  }
+
+  /**
+   * Create LogFileKey from row. Follows schema defined by {@link #toKey()}
+   */
+  public static LogFileKey fromKey(Key key) {
+    var logFileKey = new LogFileKey();
+    byte[] rowParts = key.getRow().getBytes();
+
+    logFileKey.tabletId = getTabletId(rowParts);
+    logFileKey.seq = getSequence(rowParts);
+    logFileKey.event = LogEvents.valueOf(key.getColumnFamilyData().toString());
+    // verify event number in row matches column family
+    if (eventType(logFileKey.event) != rowParts[0]) {
+      throw new AssertionError("Event in row differs from column family. Key: " + key);
+    }
+
+    // handle special cases of what is stored in the qualifier
+    switch (logFileKey.event) {
+      case OPEN:
+        logFileKey.tserverSession = key.getColumnQualifier().toString();

Review comment:
       ```suggestion
           logFileKey.tserverSession = key.getColumnQualifierData().toString();
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2117: Change sorted recovery to write to RFiles

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#discussion_r646824439



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java
##########
@@ -189,4 +195,90 @@ public String toString() {
     }
     throw new RuntimeException("Unknown type of entry: " + event);
   }
+
+  /**
+   * Converts LogFileKey to Key. Creates a Key containing all of the LogFileKey fields. The fields
+   * are stored so the Key sorts maintaining the legacy sort order. The row of the Key is composed
+   * of 3 fields: EventNum + tabletID + seq (separated by underscore). The EventNum is the integer
+   * returned by eventType(). The column family is always the event. The column qualifier is
+   * dependent of the type of event and could be empty.
+   *
+   * <pre>
+   *     Key Schema:
+   *     Row = EventNum_tabletID_seq
+   *     Family = event
+   *     Qualifier = tserverSession OR filename OR KeyExtent
+   * </pre>
+   */
+  public Key toKey() throws IOException {
+    String row = "";
+    int eventNum = eventType(event);
+    String family = event.name();
+    String qual = "";
+    switch (event) {
+      case OPEN:
+        row = formatRow(eventNum, 0, 0);
+        qual = tserverSession;
+        break;
+      case COMPACTION_START:
+        row = formatRow(eventNum, tabletId, seq);
+        if (filename != null)
+          qual = filename;
+        break;
+      case MUTATION:
+      case MANY_MUTATIONS:
+      case COMPACTION_FINISH:
+        row = formatRow(eventNum, tabletId, seq);
+        break;
+      case DEFINE_TABLET:
+        row = formatRow(eventNum, tabletId, seq);
+        // Base64 encode KeyExtent
+        DataOutputBuffer buffer = new DataOutputBuffer();
+        tablet.writeTo(buffer);
+        qual = Base64.getEncoder().encodeToString(copyOf(buffer.getData(), buffer.getLength()));
+        buffer.close();
+        break;
+    }
+    return new Key(new Text(row), new Text(family), new Text(qual));
+  }
+
+  // format row = 1_000001_0000000001
+  private String formatRow(int eventNum, int tabletId, long seq) {
+    return String.format("%d_%06d_%010d", eventNum, tabletId, seq);

Review comment:
       See newer comments.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2117: Change sorted recovery to write to RFiles

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#discussion_r648512335



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
##########
@@ -61,56 +66,58 @@
 
   private static final Logger log = LoggerFactory.getLogger(SortedLogRecovery.class);
 
-  private VolumeManager fs;
+  private final ServerContext context;
 
-  public SortedLogRecovery(VolumeManager fs) {
-    this.fs = fs;
+  public SortedLogRecovery(ServerContext context) {
+    this.context = context;
   }
 
-  static LogFileKey maxKey(LogEvents event) {
+  static LogFileKey maxKey(LogEvents event, KeyExtent extent) {
     LogFileKey key = new LogFileKey();
     key.event = event;
     key.tabletId = Integer.MAX_VALUE;
     key.seq = Long.MAX_VALUE;
+    key.tablet = extent;

Review comment:
       I did some refactoring in 6a94527457491cd0ff165122b93bfead6b9de85b so I could call formatRow for the range as well. Then I was able to drop the dummy value.

##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java
##########
@@ -189,4 +195,90 @@ public String toString() {
     }
     throw new RuntimeException("Unknown type of entry: " + event);
   }
+
+  /**
+   * Converts LogFileKey to Key. Creates a Key containing all of the LogFileKey fields. The fields
+   * are stored so the Key sorts maintaining the legacy sort order. The row of the Key is composed
+   * of 3 fields: EventNum + tabletID + seq (separated by underscore). The EventNum is the integer
+   * returned by eventType(). The column family is always the event. The column qualifier is
+   * dependent of the type of event and could be empty.
+   *
+   * <pre>
+   *     Key Schema:
+   *     Row = EventNum_tabletID_seq
+   *     Family = event
+   *     Qualifier = tserverSession OR filename OR KeyExtent
+   * </pre>
+   */
+  public Key toKey() throws IOException {
+    String row = "";
+    int eventNum = eventType(event);
+    String family = event.name();
+    String qual = "";
+    switch (event) {
+      case OPEN:
+        row = formatRow(eventNum, 0, 0);
+        qual = tserverSession;
+        break;
+      case COMPACTION_START:
+        row = formatRow(eventNum, tabletId, seq);
+        if (filename != null)

Review comment:
       I did some refactoring in 6a94527457491cd0ff165122b93bfead6b9de85b so I could call formatRow for the range as well. Then I was able to drop the dummy value.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2117: Make sorted recovery write to RFiles

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#discussion_r657005813



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java
##########
@@ -189,4 +198,162 @@ public String toString() {
     }
     throw new RuntimeException("Unknown type of entry: " + event);
   }
+
+  /**
+   * Converts LogFileKey to Key. Creates a Key containing all of the LogFileKey fields. The fields
+   * are stored so the Key sorts maintaining the legacy sort order. The row of the Key is composed
+   * of 3 fields: EventNum + tabletID + seq. The EventNum is the byte returned by eventType(). The
+   * column family is always the event. The column qualifier is dependent of the type of event and
+   * could be empty.
+   *
+   * <pre>
+   *     Key Schema:
+   *     Row = EventNum + tabletID + seq
+   *     Family = event
+   *     Qualifier = tserverSession OR filename OR KeyExtent
+   * </pre>
+   */
+  public Key toKey() throws IOException {
+    byte[] formattedRow;
+    String family = event.name();
+    var kb = Key.builder();
+    switch (event) {
+      case OPEN:
+        formattedRow = formatRow(0, 0);
+        return kb.row(formattedRow).family(family).qualifier(tserverSession).build();
+      case COMPACTION_START:
+        formattedRow = formatRow(tabletId, seq);
+        return kb.row(formattedRow).family(family).qualifier(filename).build();
+      case MUTATION:
+      case MANY_MUTATIONS:
+      case COMPACTION_FINISH:
+        return kb.row(formatRow(tabletId, seq)).family(family).build();
+      case DEFINE_TABLET:
+        formattedRow = formatRow(tabletId, seq);
+        DataOutputBuffer buffer = new DataOutputBuffer();
+        tablet.writeTo(buffer);
+        var q = copyOf(buffer.getData(), buffer.getLength());
+        buffer.close();
+        return kb.row(formattedRow).family(family).qualifier(q).build();
+      default:
+        throw new AssertionError("Invalid event type in LogFileKey: " + event);
+    }
+  }
+
+  /**
+   * Get the first byte for the event. The only possible values are 0-4. This is used as the highest
+   * byte in the row.
+   */
+  private byte getEventByte() {
+    int evenTypeInteger = eventType(event);
+    return (byte) (evenTypeInteger & 0xff);
+  }
+
+  /**
+   * Get the byte encoded row for this LogFileKey as a Text object.
+   */
+  private Text formatRow() {
+    return new Text(formatRow(tabletId, seq));
+  }
+
+  /**
+   * Format the row using 13 bytes encoded to allow proper sorting of the RFile Key. The highest
+   * byte is for the event number, 4 bytes for the tabletId and 8 bytes for the sequence long.
+   */
+  private byte[] formatRow(int tabletId, long seq) {
+    byte eventNum = getEventByte();
+    // These will not sort properly when encoded if negative. Negative is not expected currently,
+    // defending against future changes and/or bugs.
+    Preconditions.checkArgument(eventNum >= 0 && seq >= 0);
+    byte[] row = new byte[13];
+    // encode the signed integer so negatives will sort properly for tabletId
+    int encodedTabletId = tabletId ^ 0x80000000;
+
+    row[0] = eventNum;
+    row[1] = (byte) ((encodedTabletId >>> 24) & 0xff);
+    row[2] = (byte) ((encodedTabletId >>> 16) & 0xff);
+    row[3] = (byte) ((encodedTabletId >>> 8) & 0xff);
+    row[4] = (byte) (encodedTabletId & 0xff);
+    row[5] = (byte) (seq >>> 56);
+    row[6] = (byte) (seq >>> 48);
+    row[7] = (byte) (seq >>> 40);
+    row[8] = (byte) (seq >>> 32);
+    row[9] = (byte) (seq >>> 24);
+    row[10] = (byte) (seq >>> 16);
+    row[11] = (byte) (seq >>> 8);
+    row[12] = (byte) (seq); // >>> 0
+    return row;
+  }
+
+  /**
+   * Extract the tabletId integer from the byte encoded Row.
+   */
+  private static int getTabletId(byte[] row) {
+    int encoded = ((row[1] << 24) + (row[2] << 16) + (row[3] << 8) + row[4]);

Review comment:
       Thanks for looking at this failure. There was definitely a problem with the bytes here. I have a fix I will push shortly but I _think_ the way it is done we can use `+` and not `|`. Keith and I based the encoding on what is done in DataInputStream. https://github.com/openjdk-mirror/jdk7u-jdk/blob/master/src/share/classes/java/io/DataInputStream.java#L386-L394




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2117: Change sorted recovery to write to RFiles

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#discussion_r646849389



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
##########
@@ -20,60 +20,85 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
 
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ScannerBase;
+import org.apache.accumulo.core.client.rfile.RFile;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.log.SortedLogState;
+import org.apache.accumulo.tserver.logger.LogEvents;
 import org.apache.accumulo.tserver.logger.LogFileKey;
-import org.apache.accumulo.tserver.logger.LogFileValue;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Iterators;
-import com.google.common.collect.UnmodifiableIterator;
 
 /**
  * Iterates over multiple sorted recovery logs merging them into a single sorted stream.
  */
-public class RecoveryLogsIterator implements CloseableIterator<Entry<LogFileKey,LogFileValue>> {
+public class RecoveryLogsIterator implements Iterator<Entry<Key,Value>>, AutoCloseable {

Review comment:
       > Can you point me to the code that does this?
   
   RecoveryLogsIterator.next() could convert from Key,Value, like the following.  Not sure if there is anything using RecoveryLogsIterator that requires a Key/Value though.
   
   ```java
   public Entry<LogFileKey,LogFileValue> next() {
       Entry<Key,Value> e = iter.next();
       return new AbstractMap.SimpleImmutableEntry(e.toKey(), e.toValue());
     }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2117: Change sorted recovery to write to RFiles

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#discussion_r646849389



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
##########
@@ -20,60 +20,85 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
 
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ScannerBase;
+import org.apache.accumulo.core.client.rfile.RFile;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.log.SortedLogState;
+import org.apache.accumulo.tserver.logger.LogEvents;
 import org.apache.accumulo.tserver.logger.LogFileKey;
-import org.apache.accumulo.tserver.logger.LogFileValue;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Iterators;
-import com.google.common.collect.UnmodifiableIterator;
 
 /**
  * Iterates over multiple sorted recovery logs merging them into a single sorted stream.
  */
-public class RecoveryLogsIterator implements CloseableIterator<Entry<LogFileKey,LogFileValue>> {
+public class RecoveryLogsIterator implements Iterator<Entry<Key,Value>>, AutoCloseable {

Review comment:
       > Can you point me to the code that does this?
   
   RecoveryLogsIterator.next() could convert from Key,Value, like the following.  Not sure if there is anything using RecoveryLogsIterator that requires a Key/Value though.
   
   ```java
   public Entry<LogFileKey,LogFileValue> next() {
       Entry<Key,Value> e = iter.next();
       return new AbstractMap.SimpleImmutableEntry(LogFileKey.fromKey(e.getKey()), LogFileValue.fromValue(e.getValue()));
     }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2117: Change sorted recovery to write to RFiles

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#discussion_r645880805



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
##########
@@ -20,60 +20,85 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
 
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ScannerBase;
+import org.apache.accumulo.core.client.rfile.RFile;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.log.SortedLogState;
+import org.apache.accumulo.tserver.logger.LogEvents;
 import org.apache.accumulo.tserver.logger.LogFileKey;
-import org.apache.accumulo.tserver.logger.LogFileValue;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Iterators;
-import com.google.common.collect.UnmodifiableIterator;
 
 /**
  * Iterates over multiple sorted recovery logs merging them into a single sorted stream.
  */
-public class RecoveryLogsIterator implements CloseableIterator<Entry<LogFileKey,LogFileValue>> {
+public class RecoveryLogsIterator implements Iterator<Entry<Key,Value>>, AutoCloseable {
 
   private static final Logger LOG = LoggerFactory.getLogger(RecoveryLogsIterator.class);
 
-  List<CloseableIterator<Entry<LogFileKey,LogFileValue>>> iterators;
-  private UnmodifiableIterator<Entry<LogFileKey,LogFileValue>> iter;
+  private final List<Scanner> scanners;
+  private final Iterator<Entry<Key,Value>> iter;
 
   /**
-   * Iterates only over keys in the range [start,end].
+   * Scans the files in each recoveryLogDir over the range [start,end].
    */
-  RecoveryLogsIterator(VolumeManager fs, List<Path> recoveryLogPaths, LogFileKey start,
-      LogFileKey end) throws IOException {
+  RecoveryLogsIterator(ServerContext context, List<Path> recoveryLogDirs, LogFileKey start,
+      LogFileKey end, boolean checkFirstKey, LogEvents... colFamToFetch) throws IOException {
 
-    iterators = new ArrayList<>(recoveryLogPaths.size());
+    List<Iterator<Entry<Key,Value>>> iterators = new ArrayList<>(recoveryLogDirs.size());
+    scanners = new ArrayList<>();
+    Key startKey = start.toKey();
+    Key endKey = end.toKey();
+    Range range = new Range(startKey, endKey);

Review comment:
       Could poassible call LogFileKey.formatRow here, then would not need to create entire key.  Then maybe an extent would never need to be set in maxKey() function.

##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
##########
@@ -20,60 +20,85 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
 
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ScannerBase;
+import org.apache.accumulo.core.client.rfile.RFile;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.log.SortedLogState;
+import org.apache.accumulo.tserver.logger.LogEvents;
 import org.apache.accumulo.tserver.logger.LogFileKey;
-import org.apache.accumulo.tserver.logger.LogFileValue;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Iterators;
-import com.google.common.collect.UnmodifiableIterator;
 
 /**
  * Iterates over multiple sorted recovery logs merging them into a single sorted stream.
  */
-public class RecoveryLogsIterator implements CloseableIterator<Entry<LogFileKey,LogFileValue>> {
+public class RecoveryLogsIterator implements Iterator<Entry<Key,Value>>, AutoCloseable {
 
   private static final Logger LOG = LoggerFactory.getLogger(RecoveryLogsIterator.class);
 
-  List<CloseableIterator<Entry<LogFileKey,LogFileValue>>> iterators;
-  private UnmodifiableIterator<Entry<LogFileKey,LogFileValue>> iter;
+  private final List<Scanner> scanners;
+  private final Iterator<Entry<Key,Value>> iter;
 
   /**
-   * Iterates only over keys in the range [start,end].
+   * Scans the files in each recoveryLogDir over the range [start,end].
    */
-  RecoveryLogsIterator(VolumeManager fs, List<Path> recoveryLogPaths, LogFileKey start,
-      LogFileKey end) throws IOException {
+  RecoveryLogsIterator(ServerContext context, List<Path> recoveryLogDirs, LogFileKey start,
+      LogFileKey end, boolean checkFirstKey, LogEvents... colFamToFetch) throws IOException {
 
-    iterators = new ArrayList<>(recoveryLogPaths.size());
+    List<Iterator<Entry<Key,Value>>> iterators = new ArrayList<>(recoveryLogDirs.size());
+    scanners = new ArrayList<>();
+    Key startKey = start.toKey();
+    Key endKey = end.toKey();
+    Range range = new Range(startKey, endKey);

Review comment:
       Could posibly call LogFileKey.formatRow here, then would not need to create entire key.  Then maybe an extent would never need to be set in maxKey() function.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] DomGarguilo commented on a change in pull request #2117: Change sorted recovery to write to RFiles

Posted by GitBox <gi...@apache.org>.
DomGarguilo commented on a change in pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#discussion_r639858638



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
##########
@@ -224,6 +214,40 @@ public LogSorter(ServerContext context, AccumuloConfiguration conf) {
     this.walBlockSize = DfsLogger.getWalBlockSize(conf);
   }
 
+  @VisibleForTesting
+  public static void writeBuffer(ServerContext context, String destPath,
+      List<Pair<LogFileKey,LogFileValue>> buffer, int part) throws IOException {
+    String filename = String.format("part-r-%05d.rf", part);
+    Path path = new Path(destPath, filename);
+    FileSystem fs = context.getVolumeManager().getFileSystemByPath(path);

Review comment:
       This could be closed after it is no longer needed. i.e. `fs.close()`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on pull request #2117: Change sorted recovery to write to RFiles

Posted by GitBox <gi...@apache.org>.
milleruntime commented on pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#issuecomment-848812977


   > Ran full ITs and saw timeout failures in: CreateInitialSplitsIT, SuspendedTabletsIT
   
   Both ITs passed when run locally.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2117: Change sorted recovery to write to RFiles

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#discussion_r648511140



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
##########
@@ -20,60 +20,85 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
 
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ScannerBase;
+import org.apache.accumulo.core.client.rfile.RFile;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.log.SortedLogState;
+import org.apache.accumulo.tserver.logger.LogEvents;
 import org.apache.accumulo.tserver.logger.LogFileKey;
-import org.apache.accumulo.tserver.logger.LogFileValue;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Iterators;
-import com.google.common.collect.UnmodifiableIterator;
 
 /**
  * Iterates over multiple sorted recovery logs merging them into a single sorted stream.
  */
-public class RecoveryLogsIterator implements CloseableIterator<Entry<LogFileKey,LogFileValue>> {
+public class RecoveryLogsIterator implements Iterator<Entry<Key,Value>>, AutoCloseable {

Review comment:
       I made this change in 6a94527457491cd0ff165122b93bfead6b9de85b. I put a try/catch in the method where the IOException was being thrown because I didn't want to have one for all cases. I am not sure the code is shorter but having the conversion only happen in one spot is nice.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] EdColeman commented on a change in pull request #2117: Change sorted recovery to write to RFiles

Posted by GitBox <gi...@apache.org>.
EdColeman commented on a change in pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#discussion_r640065308



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
##########
@@ -93,11 +107,48 @@ public void remove() {
 
   @Override
   public void close() {
-    for (CloseableIterator<?> reader : iterators) {
-      try {
-        reader.close();
-      } catch (IOException e) {
-        LOG.debug("Failed to close reader", e);
+    scanners.forEach(ScannerBase::close);
+  }
+
+  /**
+   * Check for sorting signal files (finished/failed) and get the logs in the provided directory.
+   */
+  private List<Path> getFiles(VolumeManager fs, Path directory) throws IOException {
+    boolean foundFinish = false;
+    List<Path> logFiles = new ArrayList<>();
+    for (FileStatus child : fs.listStatus(directory)) {
+      if (child.getPath().getName().startsWith("_"))
+        continue;
+      if (SortedLogState.isFinished(child.getPath().getName())) {
+        foundFinish = true;
+        continue;
+      }
+      if (SortedLogState.FAILED.getMarker().equals(child.getPath().getName())) {
+        continue;
+      }
+      FileSystem ns = fs.getFileSystemByPath(child.getPath());

Review comment:
       I am not sure if this applies here, or if it is still an issue with hadoop  (maybe behavior has changed)  But if this close is acting on a hadoop file system, you need to check that you are not closing the underlying hadoop file system for all clients. (see [stack-overflow](https://stackoverflow.com/questions/20057881/hadoop-filesystem-closed-exception-when-doing-bufferedreader-close) ). 
   
   `There is a little-known gotcha with the hadoop filesystem API: FileSystem.get returns the same object for every invocation with the same filesystem. So if one is closed anywhere, they are all closed. You could debate the merits of this decision, but that's the way it is.`
   
    Its going to depend on if the fs.get() is returning a shared hdfs resource or a unique one to say if a close is appropriate here and in the other case mentioned.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime merged pull request #2117: Make sorted recovery write to RFiles

Posted by GitBox <gi...@apache.org>.
milleruntime merged pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2117: Change sorted recovery to write to RFiles

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#discussion_r648581020



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileValue.java
##########
@@ -93,4 +98,23 @@ public String toString() {
     return format(this, 5);
   }
 
+  /**
+   * Convert list of mutations to a byte array and use to create a Value
+   */
+  public Value toValue() throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();

Review comment:
       Oh yeah I was looking for  UnsynchronizedBuffer.java but could not find it.  It does not seem to implement dataoutput interface though.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] ctubbsii commented on a change in pull request #2117: Make sorted recovery write to RFiles

Posted by GitBox <gi...@apache.org>.
ctubbsii commented on a change in pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#discussion_r657317839



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java
##########
@@ -189,4 +198,162 @@ public String toString() {
     }
     throw new RuntimeException("Unknown type of entry: " + event);
   }
+
+  /**
+   * Converts LogFileKey to Key. Creates a Key containing all of the LogFileKey fields. The fields
+   * are stored so the Key sorts maintaining the legacy sort order. The row of the Key is composed
+   * of 3 fields: EventNum + tabletID + seq. The EventNum is the byte returned by eventType(). The
+   * column family is always the event. The column qualifier is dependent of the type of event and
+   * could be empty.
+   *
+   * <pre>
+   *     Key Schema:
+   *     Row = EventNum + tabletID + seq
+   *     Family = event
+   *     Qualifier = tserverSession OR filename OR KeyExtent
+   * </pre>
+   */
+  public Key toKey() throws IOException {
+    byte[] formattedRow;
+    String family = event.name();
+    var kb = Key.builder();
+    switch (event) {
+      case OPEN:
+        formattedRow = formatRow(0, 0);
+        return kb.row(formattedRow).family(family).qualifier(tserverSession).build();
+      case COMPACTION_START:
+        formattedRow = formatRow(tabletId, seq);
+        return kb.row(formattedRow).family(family).qualifier(filename).build();
+      case MUTATION:
+      case MANY_MUTATIONS:
+      case COMPACTION_FINISH:
+        return kb.row(formatRow(tabletId, seq)).family(family).build();
+      case DEFINE_TABLET:
+        formattedRow = formatRow(tabletId, seq);
+        DataOutputBuffer buffer = new DataOutputBuffer();
+        tablet.writeTo(buffer);
+        var q = copyOf(buffer.getData(), buffer.getLength());
+        buffer.close();
+        return kb.row(formattedRow).family(family).qualifier(q).build();
+      default:
+        throw new AssertionError("Invalid event type in LogFileKey: " + event);
+    }
+  }
+
+  /**
+   * Get the first byte for the event. The only possible values are 0-4. This is used as the highest
+   * byte in the row.
+   */
+  private byte getEventByte() {
+    int evenTypeInteger = eventType(event);
+    return (byte) (evenTypeInteger & 0xff);
+  }
+
+  /**
+   * Get the byte encoded row for this LogFileKey as a Text object.
+   */
+  private Text formatRow() {
+    return new Text(formatRow(tabletId, seq));
+  }
+
+  /**
+   * Format the row using 13 bytes encoded to allow proper sorting of the RFile Key. The highest
+   * byte is for the event number, 4 bytes for the tabletId and 8 bytes for the sequence long.
+   */
+  private byte[] formatRow(int tabletId, long seq) {
+    byte eventNum = getEventByte();
+    // These will not sort properly when encoded if negative. Negative is not expected currently,
+    // defending against future changes and/or bugs.
+    Preconditions.checkArgument(eventNum >= 0 && seq >= 0);
+    byte[] row = new byte[13];
+    // encode the signed integer so negatives will sort properly for tabletId
+    int encodedTabletId = tabletId ^ 0x80000000;
+
+    row[0] = eventNum;
+    row[1] = (byte) ((encodedTabletId >>> 24) & 0xff);
+    row[2] = (byte) ((encodedTabletId >>> 16) & 0xff);
+    row[3] = (byte) ((encodedTabletId >>> 8) & 0xff);
+    row[4] = (byte) (encodedTabletId & 0xff);
+    row[5] = (byte) (seq >>> 56);
+    row[6] = (byte) (seq >>> 48);
+    row[7] = (byte) (seq >>> 40);
+    row[8] = (byte) (seq >>> 32);
+    row[9] = (byte) (seq >>> 24);
+    row[10] = (byte) (seq >>> 16);
+    row[11] = (byte) (seq >>> 8);
+    row[12] = (byte) (seq); // >>> 0
+    return row;
+  }
+
+  /**
+   * Extract the tabletId integer from the byte encoded Row.
+   */
+  private static int getTabletId(byte[] row) {
+    int encoded = ((row[1] << 24) + (row[2] << 16) + (row[3] << 8) + row[4]);
+    return encoded ^ 0x80000000;
+  }
+
+  /**
+   * Extract the sequence long from the byte encoded Row.
+   */
+  private static long getSequence(byte[] row) {
+    // @formatter:off
+    return (((long) row[5] << 56) +
+            ((long) (row[6] & 255) << 48) +
+            ((long) (row[7] & 255) << 40) +
+            ((long) (row[8] & 255) << 32) +
+            ((long) (row[9] & 255) << 24) +
+            ((row[10] & 255) << 16) +
+            ((row[11] & 255) << 8) +
+            ((row[12] & 255)));

Review comment:
       Fixed in #2176 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2117: Change sorted recovery to write to RFiles

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#discussion_r647661951



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java
##########
@@ -189,4 +195,90 @@ public String toString() {
     }
     throw new RuntimeException("Unknown type of entry: " + event);
   }
+
+  /**
+   * Converts LogFileKey to Key. Creates a Key containing all of the LogFileKey fields. The fields
+   * are stored so the Key sorts maintaining the legacy sort order. The row of the Key is composed
+   * of 3 fields: EventNum + tabletID + seq (separated by underscore). The EventNum is the integer
+   * returned by eventType(). The column family is always the event. The column qualifier is
+   * dependent of the type of event and could be empty.
+   *
+   * <pre>
+   *     Key Schema:
+   *     Row = EventNum_tabletID_seq
+   *     Family = event
+   *     Qualifier = tserverSession OR filename OR KeyExtent
+   * </pre>
+   */
+  public Key toKey() throws IOException {
+    String row = "";
+    int eventNum = eventType(event);
+    String family = event.name();
+    String qual = "";
+    switch (event) {
+      case OPEN:
+        row = formatRow(eventNum, 0, 0);
+        qual = tserverSession;
+        break;
+      case COMPACTION_START:
+        row = formatRow(eventNum, tabletId, seq);
+        if (filename != null)
+          qual = filename;
+        break;
+      case MUTATION:
+      case MANY_MUTATIONS:
+      case COMPACTION_FINISH:
+        row = formatRow(eventNum, tabletId, seq);
+        break;
+      case DEFINE_TABLET:
+        row = formatRow(eventNum, tabletId, seq);
+        // Base64 encode KeyExtent
+        DataOutputBuffer buffer = new DataOutputBuffer();
+        tablet.writeTo(buffer);
+        qual = Base64.getEncoder().encodeToString(copyOf(buffer.getData(), buffer.getLength()));
+        buffer.close();
+        break;
+    }
+    return new Key(new Text(row), new Text(family), new Text(qual));
+  }
+
+  // format row = 1_000001_0000000001
+  private String formatRow(int eventNum, int tabletId, long seq) {
+    return String.format("%d_%06d_%010d", eventNum, tabletId, seq);
+  }

Review comment:
       It worked, thanks. Cool trick! I implemented your algorithm in 7b12e0e




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2117: Change sorted recovery to write to RFiles

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#discussion_r648604118



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileValue.java
##########
@@ -93,4 +98,23 @@ public String toString() {
     return format(this, 5);
   }
 
+  /**
+   * Convert list of mutations to a byte array and use to create a Value
+   */
+  public Value toValue() throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();

Review comment:
       I will open a follow on issue to explore use of `org.apache.commons.io.input.UnsynchronizedByteArrayInputStream`
   and `org.apache.commons.io.output.UnsynchronizedByteArrayOutputStream`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2117: Change sorted recovery to write to RFiles

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#discussion_r647691304



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java
##########
@@ -189,4 +194,134 @@ public String toString() {
     }
     throw new RuntimeException("Unknown type of entry: " + event);
   }
+
+  /**
+   * Converts LogFileKey to Key. Creates a Key containing all of the LogFileKey fields. The fields
+   * are stored so the Key sorts maintaining the legacy sort order. The row of the Key is composed
+   * of 3 fields: EventNum + tabletID + seq. The EventNum is the byte returned by eventType(). The
+   * column family is always the event. The column qualifier is dependent of the type of event and
+   * could be empty.
+   *
+   * <pre>
+   *     Key Schema:
+   *     Row = EventNum + tabletID + seq
+   *     Family = event
+   *     Qualifier = tserverSession OR filename OR KeyExtent
+   * </pre>
+   */
+  public Key toKey() throws IOException {
+    byte[] formattedRow;
+    byte eventByte = getEventByte(eventType(event));
+    Text family = new Text(event.name());
+    var kb = Key.builder();
+    switch (event) {
+      case OPEN:
+        formattedRow = formatRow(eventByte, 0, 0);
+        return kb.row(formattedRow).family(family).qualifier(new Text(tserverSession)).build();
+      case COMPACTION_START:
+        formattedRow = formatRow(eventByte, tabletId, seq);
+        return kb.row(formattedRow).family(family).qualifier(new Text(filename)).build();
+      case MUTATION:
+      case MANY_MUTATIONS:
+      case COMPACTION_FINISH:
+        return kb.row(formatRow(eventByte, tabletId, seq)).family(family).build();
+      case DEFINE_TABLET:
+        formattedRow = formatRow(eventByte, tabletId, seq);
+        DataOutputBuffer buffer = new DataOutputBuffer();
+        tablet.writeTo(buffer);
+        var q = copyOf(buffer.getData(), buffer.getLength());
+        buffer.close();
+        return kb.row(formattedRow).family(family).qualifier(q).build();
+      default:
+        throw new AssertionError("Invalid event type in LogFileKey: " + event);
+    }
+  }
+
+  private byte getEventByte(int evenTypeInteger) {
+    return (byte) (evenTypeInteger & 0xff);
+  }
+
+  /**
+   * Format the row using 13 bytes. 1 for event number + 4 for tabletId + 8 for sequence
+   */
+  private byte[] formatRow(byte eventNum, int tabletId, long seq) {
+    byte[] row = new byte[13];

Review comment:
       ```suggestion
       // These will not sort properly when encoded if negative.  Negative is not expected currently, defending against future changes and/or bugs.
       Preconditions.checkArgument(eventNum >=0 && seq >= 0);
       byte[] row = new byte[13];
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2117: Change sorted recovery to write to RFiles

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#discussion_r640780232



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
##########
@@ -224,6 +214,40 @@ public LogSorter(ServerContext context, AccumuloConfiguration conf) {
     this.walBlockSize = DfsLogger.getWalBlockSize(conf);
   }
 
+  @VisibleForTesting
+  public static void writeBuffer(ServerContext context, String destPath,
+      List<Pair<LogFileKey,LogFileValue>> buffer, int part) throws IOException {
+    String filename = String.format("part-r-%05d.rf", part);
+    Path path = new Path(destPath, filename);
+    FileSystem fs = context.getVolumeManager().getFileSystemByPath(path);

Review comment:
       I don't think it would be a good idea to close the `FileSystem` here. See my other comment.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2117: Change sorted recovery to write to RFiles

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#discussion_r648512335



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
##########
@@ -61,56 +66,58 @@
 
   private static final Logger log = LoggerFactory.getLogger(SortedLogRecovery.class);
 
-  private VolumeManager fs;
+  private final ServerContext context;
 
-  public SortedLogRecovery(VolumeManager fs) {
-    this.fs = fs;
+  public SortedLogRecovery(ServerContext context) {
+    this.context = context;
   }
 
-  static LogFileKey maxKey(LogEvents event) {
+  static LogFileKey maxKey(LogEvents event, KeyExtent extent) {
     LogFileKey key = new LogFileKey();
     key.event = event;
     key.tabletId = Integer.MAX_VALUE;
     key.seq = Long.MAX_VALUE;
+    key.tablet = extent;

Review comment:
       I did some refactoring in 6a94527457491cd0ff165122b93bfead6b9de85b so I could call formatRow for the range as well. Then I was able to drop the dummy value for filename.

##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
##########
@@ -61,56 +66,58 @@
 
   private static final Logger log = LoggerFactory.getLogger(SortedLogRecovery.class);
 
-  private VolumeManager fs;
+  private final ServerContext context;
 
-  public SortedLogRecovery(VolumeManager fs) {
-    this.fs = fs;
+  public SortedLogRecovery(ServerContext context) {
+    this.context = context;
   }
 
-  static LogFileKey maxKey(LogEvents event) {
+  static LogFileKey maxKey(LogEvents event, KeyExtent extent) {
     LogFileKey key = new LogFileKey();
     key.event = event;
     key.tabletId = Integer.MAX_VALUE;
     key.seq = Long.MAX_VALUE;
+    key.tablet = extent;

Review comment:
       Sorry I missed the extent change so removed in 18a8e41




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on pull request #2117: Change sorted recovery to write to RFiles

Posted by GitBox <gi...@apache.org>.
keith-turner commented on pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#issuecomment-861670888


   > @keith-turner Did you want to take another look at these changes? I think this is ready to merge
   
   currently looking at it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2117: Change sorted recovery to write to RFiles

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#discussion_r648526149



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileValue.java
##########
@@ -93,4 +98,23 @@ public String toString() {
     return format(this, 5);
   }
 
+  /**
+   * Convert list of mutations to a byte array and use to create a Value
+   */
+  public Value toValue() throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();

Review comment:
       Maybe we can use `FixedByteArrayOutputStream`? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2117: Change sorted recovery to write to RFiles

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#discussion_r648555766



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileValue.java
##########
@@ -93,4 +98,23 @@ public String toString() {
     return format(this, 5);
   }
 
+  /**
+   * Convert list of mutations to a byte array and use to create a Value
+   */
+  public Value toValue() throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();

Review comment:
       Or this one, which I think you wrote and I have used before haha: https://github.com/apache/accumulo/blob/83b171d9c4f071a39ef6da1158c34dcf70f83f6a/core/src/main/java/org/apache/accumulo/core/util/UnsynchronizedBuffer.java#L30




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime merged pull request #2117: Make sorted recovery write to RFiles

Posted by GitBox <gi...@apache.org>.
milleruntime merged pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2117: Change sorted recovery to write to RFiles

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#discussion_r646735312



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java
##########
@@ -189,4 +195,90 @@ public String toString() {
     }
     throw new RuntimeException("Unknown type of entry: " + event);
   }
+
+  /**
+   * Converts LogFileKey to Key. Creates a Key containing all of the LogFileKey fields. The fields
+   * are stored so the Key sorts maintaining the legacy sort order. The row of the Key is composed
+   * of 3 fields: EventNum + tabletID + seq (separated by underscore). The EventNum is the integer
+   * returned by eventType(). The column family is always the event. The column qualifier is
+   * dependent of the type of event and could be empty.
+   *
+   * <pre>
+   *     Key Schema:
+   *     Row = EventNum_tabletID_seq
+   *     Family = event
+   *     Qualifier = tserverSession OR filename OR KeyExtent
+   * </pre>
+   */
+  public Key toKey() throws IOException {
+    String row = "";
+    int eventNum = eventType(event);
+    String family = event.name();
+    String qual = "";
+    switch (event) {
+      case OPEN:
+        row = formatRow(eventNum, 0, 0);
+        qual = tserverSession;
+        break;
+      case COMPACTION_START:
+        row = formatRow(eventNum, tabletId, seq);
+        if (filename != null)

Review comment:
       I agree but I was using the minKey and maxKey methods in `SortedLogRecovery`. I guess I could set the filename to a dummy value to create the range. Otherwise I would have to create the start and end keys for the range manually. I wanted to use the same 'toKey()` method for consistency.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2117: Change sorted recovery to write to RFiles

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#discussion_r647584326



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java
##########
@@ -189,4 +195,90 @@ public String toString() {
     }
     throw new RuntimeException("Unknown type of entry: " + event);
   }
+
+  /**
+   * Converts LogFileKey to Key. Creates a Key containing all of the LogFileKey fields. The fields
+   * are stored so the Key sorts maintaining the legacy sort order. The row of the Key is composed
+   * of 3 fields: EventNum + tabletID + seq (separated by underscore). The EventNum is the integer
+   * returned by eventType(). The column family is always the event. The column qualifier is
+   * dependent of the type of event and could be empty.
+   *
+   * <pre>
+   *     Key Schema:
+   *     Row = EventNum_tabletID_seq
+   *     Family = event
+   *     Qualifier = tserverSession OR filename OR KeyExtent
+   * </pre>
+   */
+  public Key toKey() throws IOException {
+    String row = "";
+    int eventNum = eventType(event);
+    String family = event.name();
+    String qual = "";
+    switch (event) {
+      case OPEN:
+        row = formatRow(eventNum, 0, 0);
+        qual = tserverSession;
+        break;
+      case COMPACTION_START:
+        row = formatRow(eventNum, tabletId, seq);
+        if (filename != null)
+          qual = filename;
+        break;
+      case MUTATION:
+      case MANY_MUTATIONS:
+      case COMPACTION_FINISH:
+        row = formatRow(eventNum, tabletId, seq);
+        break;
+      case DEFINE_TABLET:
+        row = formatRow(eventNum, tabletId, seq);
+        // Base64 encode KeyExtent
+        DataOutputBuffer buffer = new DataOutputBuffer();
+        tablet.writeTo(buffer);
+        qual = Base64.getEncoder().encodeToString(copyOf(buffer.getData(), buffer.getLength()));
+        buffer.close();
+        break;
+    }
+    return new Key(new Text(row), new Text(family), new Text(qual));
+  }
+
+  // format row = 1_000001_0000000001
+  private String formatRow(int eventNum, int tabletId, long seq) {
+    return String.format("%d_%06d_%010d", eventNum, tabletId, seq);
+  }

Review comment:
       I am not sure what is going on with that -1.  If you want to make negatives sort correctly can use the following tricks. Just do that xor on encode and decode.
   
   https://github.com/apache/accumulo/blob/58a559fa6ac7c5137b5cd1418b0de457f7cad2a9/core/src/main/java/org/apache/accumulo/core/client/lexicoder/LongLexicoder.java#L30
   
   https://github.com/apache/accumulo/blob/58a559fa6ac7c5137b5cd1418b0de457f7cad2a9/core/src/main/java/org/apache/accumulo/core/client/lexicoder/IntegerLexicoder.java#L33
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] DomGarguilo commented on a change in pull request #2117: Change sorted recovery to write to RFiles

Posted by GitBox <gi...@apache.org>.
DomGarguilo commented on a change in pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#discussion_r639859548



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
##########
@@ -93,11 +107,48 @@ public void remove() {
 
   @Override
   public void close() {
-    for (CloseableIterator<?> reader : iterators) {
-      try {
-        reader.close();
-      } catch (IOException e) {
-        LOG.debug("Failed to close reader", e);
+    scanners.forEach(ScannerBase::close);
+  }
+
+  /**
+   * Check for sorting signal files (finished/failed) and get the logs in the provided directory.
+   */
+  private List<Path> getFiles(VolumeManager fs, Path directory) throws IOException {
+    boolean foundFinish = false;
+    List<Path> logFiles = new ArrayList<>();
+    for (FileStatus child : fs.listStatus(directory)) {
+      if (child.getPath().getName().startsWith("_"))
+        continue;
+      if (SortedLogState.isFinished(child.getPath().getName())) {
+        foundFinish = true;
+        continue;
+      }
+      if (SortedLogState.FAILED.getMarker().equals(child.getPath().getName())) {
+        continue;
+      }
+      FileSystem ns = fs.getFileSystemByPath(child.getPath());

Review comment:
       This may be able to be closed as well `ns.close()`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2117: Change sorted recovery to write to RFiles

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#discussion_r652154940



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
##########
@@ -19,61 +19,87 @@
 package org.apache.accumulo.tserver.log;
 
 import java.io.IOException;
+import java.util.AbstractMap;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
 
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ScannerBase;
+import org.apache.accumulo.core.client.rfile.RFile;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.log.SortedLogState;
+import org.apache.accumulo.tserver.logger.LogEvents;
 import org.apache.accumulo.tserver.logger.LogFileKey;
 import org.apache.accumulo.tserver.logger.LogFileValue;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Iterators;
-import com.google.common.collect.UnmodifiableIterator;
 
 /**
  * Iterates over multiple sorted recovery logs merging them into a single sorted stream.
  */
-public class RecoveryLogsIterator implements CloseableIterator<Entry<LogFileKey,LogFileValue>> {
+public class RecoveryLogsIterator
+    implements Iterator<Entry<LogFileKey,LogFileValue>>, AutoCloseable {
 
   private static final Logger LOG = LoggerFactory.getLogger(RecoveryLogsIterator.class);
 
-  List<CloseableIterator<Entry<LogFileKey,LogFileValue>>> iterators;
-  private UnmodifiableIterator<Entry<LogFileKey,LogFileValue>> iter;
+  private final List<Scanner> scanners;
+  private final Iterator<Entry<Key,Value>> iter;
 
   /**
-   * Iterates only over keys in the range [start,end].
+   * Scans the files in each recoveryLogDir over the range [start,end].
    */
-  RecoveryLogsIterator(VolumeManager fs, List<Path> recoveryLogPaths, LogFileKey start,
-      LogFileKey end) throws IOException {
+  RecoveryLogsIterator(ServerContext context, List<Path> recoveryLogDirs, LogFileKey start,
+      LogFileKey end, boolean checkFirstKey, LogEvents... colFamToFetch) throws IOException {
 
-    iterators = new ArrayList<>(recoveryLogPaths.size());
+    List<Iterator<Entry<Key,Value>>> iterators = new ArrayList<>(recoveryLogDirs.size());
+    scanners = new ArrayList<>();
+    Range range = new Range(start.formatRow(), end.formatRow());
+    var vm = context.getVolumeManager();
 
-    try {
-      for (Path log : recoveryLogPaths) {
-        LOG.debug("Opening recovery log {}", log.getName());
-        RecoveryLogReader rlr = new RecoveryLogReader(fs, log, start, end);
-        if (rlr.hasNext()) {
+    for (Path logDir : recoveryLogDirs) {
+      LOG.debug("Opening recovery log dir {}", logDir.getName());
+      List<Path> logFiles = getFiles(vm, logDir);
+      var fs = vm.getFileSystemByPath(logDir);
+
+      // only check the first key once to prevent extra iterator creation and seeking
+      if (checkFirstKey) {
+        validateFirstKey(
+            RFile.newScanner().from(logFiles.stream().map(Path::toString).toArray(String[]::new))

Review comment:
       using a dedicate recovery Rfile block cache would solve what #882 was trying to address.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] ctubbsii commented on a change in pull request #2117: Make sorted recovery write to RFiles

Posted by GitBox <gi...@apache.org>.
ctubbsii commented on a change in pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#discussion_r657317206



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java
##########
@@ -189,4 +198,162 @@ public String toString() {
     }
     throw new RuntimeException("Unknown type of entry: " + event);
   }
+
+  /**
+   * Converts LogFileKey to Key. Creates a Key containing all of the LogFileKey fields. The fields
+   * are stored so the Key sorts maintaining the legacy sort order. The row of the Key is composed
+   * of 3 fields: EventNum + tabletID + seq. The EventNum is the byte returned by eventType(). The
+   * column family is always the event. The column qualifier is dependent of the type of event and
+   * could be empty.
+   *
+   * <pre>
+   *     Key Schema:
+   *     Row = EventNum + tabletID + seq
+   *     Family = event
+   *     Qualifier = tserverSession OR filename OR KeyExtent
+   * </pre>
+   */
+  public Key toKey() throws IOException {
+    byte[] formattedRow;
+    String family = event.name();
+    var kb = Key.builder();
+    switch (event) {
+      case OPEN:
+        formattedRow = formatRow(0, 0);
+        return kb.row(formattedRow).family(family).qualifier(tserverSession).build();
+      case COMPACTION_START:
+        formattedRow = formatRow(tabletId, seq);
+        return kb.row(formattedRow).family(family).qualifier(filename).build();
+      case MUTATION:
+      case MANY_MUTATIONS:
+      case COMPACTION_FINISH:
+        return kb.row(formatRow(tabletId, seq)).family(family).build();
+      case DEFINE_TABLET:
+        formattedRow = formatRow(tabletId, seq);
+        DataOutputBuffer buffer = new DataOutputBuffer();
+        tablet.writeTo(buffer);
+        var q = copyOf(buffer.getData(), buffer.getLength());
+        buffer.close();
+        return kb.row(formattedRow).family(family).qualifier(q).build();
+      default:
+        throw new AssertionError("Invalid event type in LogFileKey: " + event);
+    }
+  }
+
+  /**
+   * Get the first byte for the event. The only possible values are 0-4. This is used as the highest
+   * byte in the row.
+   */
+  private byte getEventByte() {
+    int evenTypeInteger = eventType(event);
+    return (byte) (evenTypeInteger & 0xff);
+  }
+
+  /**
+   * Get the byte encoded row for this LogFileKey as a Text object.
+   */
+  private Text formatRow() {
+    return new Text(formatRow(tabletId, seq));
+  }
+
+  /**
+   * Format the row using 13 bytes encoded to allow proper sorting of the RFile Key. The highest
+   * byte is for the event number, 4 bytes for the tabletId and 8 bytes for the sequence long.
+   */
+  private byte[] formatRow(int tabletId, long seq) {
+    byte eventNum = getEventByte();
+    // These will not sort properly when encoded if negative. Negative is not expected currently,
+    // defending against future changes and/or bugs.
+    Preconditions.checkArgument(eventNum >= 0 && seq >= 0);
+    byte[] row = new byte[13];
+    // encode the signed integer so negatives will sort properly for tabletId
+    int encodedTabletId = tabletId ^ 0x80000000;
+
+    row[0] = eventNum;
+    row[1] = (byte) ((encodedTabletId >>> 24) & 0xff);
+    row[2] = (byte) ((encodedTabletId >>> 16) & 0xff);
+    row[3] = (byte) ((encodedTabletId >>> 8) & 0xff);
+    row[4] = (byte) (encodedTabletId & 0xff);
+    row[5] = (byte) (seq >>> 56);
+    row[6] = (byte) (seq >>> 48);
+    row[7] = (byte) (seq >>> 40);
+    row[8] = (byte) (seq >>> 32);
+    row[9] = (byte) (seq >>> 24);
+    row[10] = (byte) (seq >>> 16);
+    row[11] = (byte) (seq >>> 8);
+    row[12] = (byte) (seq); // >>> 0
+    return row;
+  }
+
+  /**
+   * Extract the tabletId integer from the byte encoded Row.
+   */
+  private static int getTabletId(byte[] row) {
+    int encoded = ((row[1] << 24) + (row[2] << 16) + (row[3] << 8) + row[4]);

Review comment:
       You can get away with `+` in DataInputStream's implementation, because it uses `int` the whole time. So, there's no integer type promotion from `byte` to `int` going on before the `+`... they are already guaranteed to be unsigned `int`s. That doesn't work if you are using `byte` types, which will be automatically sign-extended when they are promoted to `int` for the addition operation, thereby changing their value. This doesn't matter for the first byte that gets shifted to the far left, because shifting all the way removes the effects of sign extension. If we properly truncate any sign-extension from the promotion to `int`, we can probably use `+`, but I think it's more intuitive to use `|`, and I don't think it really matters, except to communicate to other developers that we're doing bit-wise stuff, and not arithmetic stuff.
   
   In any case, we've fixed this in #2176 and #2177, but I just wanted to comment with the explanation for the archives.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2117: Change sorted recovery to write to RFiles

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#discussion_r648577182



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileValue.java
##########
@@ -93,4 +98,23 @@ public String toString() {
     return format(this, 5);
   }
 
+  /**
+   * Convert list of mutations to a byte array and use to create a Value
+   */
+  public Value toValue() throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();

Review comment:
       I am not sure either of those will work. This one is promising:
   https://commons.apache.org/proper/commons-io/apidocs/org/apache/commons/io/output/UnsynchronizedByteArrayOutputStream.html




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2117: Change sorted recovery to write to RFiles

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#discussion_r646830766



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
##########
@@ -20,60 +20,85 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
 
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ScannerBase;
+import org.apache.accumulo.core.client.rfile.RFile;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.log.SortedLogState;
+import org.apache.accumulo.tserver.logger.LogEvents;
 import org.apache.accumulo.tserver.logger.LogFileKey;
-import org.apache.accumulo.tserver.logger.LogFileValue;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Iterators;
-import com.google.common.collect.UnmodifiableIterator;
 
 /**
  * Iterates over multiple sorted recovery logs merging them into a single sorted stream.
  */
-public class RecoveryLogsIterator implements CloseableIterator<Entry<LogFileKey,LogFileValue>> {
+public class RecoveryLogsIterator implements Iterator<Entry<Key,Value>>, AutoCloseable {

Review comment:
       > I was using what is returned when reading an RFile using scanner.iterator(). How would I iterate over the RFile using LogFileKey and LogFileValue?
   
   Can you point me to the code that does this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2117: Change sorted recovery to write to RFiles

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#discussion_r646637653



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
##########
@@ -224,6 +214,40 @@ public LogSorter(ServerContext context, AccumuloConfiguration conf) {
     this.walBlockSize = DfsLogger.getWalBlockSize(conf);
   }
 
+  @VisibleForTesting
+  public static void writeBuffer(ServerContext context, String destPath,
+      List<Pair<LogFileKey,LogFileValue>> buffer, int part) throws IOException {
+    String filename = String.format("part-r-%05d.rf", part);
+    Path path = new Path(destPath, filename);
+    FileSystem fs = context.getVolumeManager().getFileSystemByPath(path);
+    Path fullPath = fs.makeQualified(path);
+
+    // convert the LogFileKeys to Keys, sort and collect the mutations
+    Map<Key,List<Mutation>> keyListMap = new TreeMap<>();
+    for (Pair<LogFileKey,LogFileValue> pair : buffer) {
+      var logFileKey = pair.getFirst();
+      var logFileValue = pair.getSecond();
+      Key k = logFileKey.toKey();
+      var list = keyListMap.putIfAbsent(k, logFileValue.mutations);
+      if (list != null) {
+        var muts = new ArrayList<>(list);
+        muts.addAll(logFileValue.mutations);
+        keyListMap.put(logFileKey.toKey(), muts);
+      }

Review comment:
       Looking at the constructor for `ArrayList`, if we can guarantee the list is an ArrayList then it won't copy. But I don't think we can. The List in `LogFileValue` is public.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2117: Change sorted recovery to write to RFiles

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#discussion_r646712529



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java
##########
@@ -189,4 +195,90 @@ public String toString() {
     }
     throw new RuntimeException("Unknown type of entry: " + event);
   }
+
+  /**
+   * Converts LogFileKey to Key. Creates a Key containing all of the LogFileKey fields. The fields
+   * are stored so the Key sorts maintaining the legacy sort order. The row of the Key is composed
+   * of 3 fields: EventNum + tabletID + seq (separated by underscore). The EventNum is the integer
+   * returned by eventType(). The column family is always the event. The column qualifier is
+   * dependent of the type of event and could be empty.
+   *
+   * <pre>
+   *     Key Schema:
+   *     Row = EventNum_tabletID_seq
+   *     Family = event
+   *     Qualifier = tserverSession OR filename OR KeyExtent
+   * </pre>
+   */
+  public Key toKey() throws IOException {
+    String row = "";
+    int eventNum = eventType(event);
+    String family = event.name();
+    String qual = "";
+    switch (event) {
+      case OPEN:
+        row = formatRow(eventNum, 0, 0);
+        qual = tserverSession;
+        break;
+      case COMPACTION_START:
+        row = formatRow(eventNum, tabletId, seq);
+        if (filename != null)
+          qual = filename;
+        break;
+      case MUTATION:
+      case MANY_MUTATIONS:
+      case COMPACTION_FINISH:
+        row = formatRow(eventNum, tabletId, seq);
+        break;
+      case DEFINE_TABLET:
+        row = formatRow(eventNum, tabletId, seq);
+        // Base64 encode KeyExtent
+        DataOutputBuffer buffer = new DataOutputBuffer();
+        tablet.writeTo(buffer);
+        qual = Base64.getEncoder().encodeToString(copyOf(buffer.getData(), buffer.getLength()));
+        buffer.close();
+        break;
+    }
+    return new Key(new Text(row), new Text(family), new Text(qual));
+  }
+
+  // format row = 1_000001_0000000001
+  private String formatRow(int eventNum, int tabletId, long seq) {
+    return String.format("%d_%06d_%010d", eventNum, tabletId, seq);
+  }

Review comment:
       If we are using a byte for row now, I wonder if `new Text(row)` is the best way to create the row for the `Key`. That constructor for Text will copy the bytes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2117: Change sorted recovery to write to RFiles

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#discussion_r646849389



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
##########
@@ -20,60 +20,85 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
 
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ScannerBase;
+import org.apache.accumulo.core.client.rfile.RFile;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.log.SortedLogState;
+import org.apache.accumulo.tserver.logger.LogEvents;
 import org.apache.accumulo.tserver.logger.LogFileKey;
-import org.apache.accumulo.tserver.logger.LogFileValue;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Iterators;
-import com.google.common.collect.UnmodifiableIterator;
 
 /**
  * Iterates over multiple sorted recovery logs merging them into a single sorted stream.
  */
-public class RecoveryLogsIterator implements CloseableIterator<Entry<LogFileKey,LogFileValue>> {
+public class RecoveryLogsIterator implements Iterator<Entry<Key,Value>>, AutoCloseable {

Review comment:
       
   RecoveryLogsIterator.next() could convert from Key,Value, like the following.  Not sure if there is anything using RecoveryLogsIterator that requires a Key/Value though.
   
   ```java
   public Entry<LogFileKey,LogFileValue> next() {
       Entry<Key,Value> e = iter.next();
       return new AbstractMap.SimpleImmutableEntry(LogFileKey.fromKey(e.getKey()), LogFileValue.fromValue(e.getValue()));
     }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2117: Change sorted recovery to write to RFiles

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#discussion_r651837281



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileValue.java
##########
@@ -93,4 +98,23 @@ public String toString() {
     return format(this, 5);
   }
 
+  /**
+   * Convert list of mutations to a byte array and use to create a Value
+   */
+  public Value toValue() throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();

Review comment:
       I was waiting until after I merge for follow on task but this one is independent of this PR so I just created https://github.com/apache/accumulo/issues/2165




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2117: Change sorted recovery to write to RFiles

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#discussion_r645785953



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java
##########
@@ -189,4 +195,90 @@ public String toString() {
     }
     throw new RuntimeException("Unknown type of entry: " + event);
   }
+
+  /**
+   * Converts LogFileKey to Key. Creates a Key containing all of the LogFileKey fields. The fields
+   * are stored so the Key sorts maintaining the legacy sort order. The row of the Key is composed
+   * of 3 fields: EventNum + tabletID + seq (separated by underscore). The EventNum is the integer
+   * returned by eventType(). The column family is always the event. The column qualifier is
+   * dependent of the type of event and could be empty.
+   *
+   * <pre>
+   *     Key Schema:
+   *     Row = EventNum_tabletID_seq
+   *     Family = event
+   *     Qualifier = tserverSession OR filename OR KeyExtent
+   * </pre>
+   */
+  public Key toKey() throws IOException {
+    String row = "";
+    int eventNum = eventType(event);
+    String family = event.name();
+    String qual = "";
+    switch (event) {
+      case OPEN:
+        row = formatRow(eventNum, 0, 0);
+        qual = tserverSession;
+        break;
+      case COMPACTION_START:
+        row = formatRow(eventNum, tabletId, seq);
+        if (filename != null)
+          qual = filename;
+        break;
+      case MUTATION:
+      case MANY_MUTATIONS:
+      case COMPACTION_FINISH:
+        row = formatRow(eventNum, tabletId, seq);
+        break;
+      case DEFINE_TABLET:
+        row = formatRow(eventNum, tabletId, seq);
+        // Base64 encode KeyExtent
+        DataOutputBuffer buffer = new DataOutputBuffer();
+        tablet.writeTo(buffer);
+        qual = Base64.getEncoder().encodeToString(copyOf(buffer.getData(), buffer.getLength()));
+        buffer.close();
+        break;
+    }
+    return new Key(new Text(row), new Text(family), new Text(qual));
+  }
+
+  // format row = 1_000001_0000000001
+  private String formatRow(int eventNum, int tabletId, long seq) {
+    return String.format("%d_%06d_%010d", eventNum, tabletId, seq);

Review comment:
       Along time ago when writing continuous ingest, I found that String.format was super slow (It was the slowest thing in the data generation).  Not sure if that is still the case, may be something to consider and look into as this could have a lot of data going through it.

##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
##########
@@ -224,6 +214,40 @@ public LogSorter(ServerContext context, AccumuloConfiguration conf) {
     this.walBlockSize = DfsLogger.getWalBlockSize(conf);
   }
 
+  @VisibleForTesting
+  public static void writeBuffer(ServerContext context, String destPath,
+      List<Pair<LogFileKey,LogFileValue>> buffer, int part) throws IOException {
+    String filename = String.format("part-r-%05d.rf", part);
+    Path path = new Path(destPath, filename);
+    FileSystem fs = context.getVolumeManager().getFileSystemByPath(path);
+    Path fullPath = fs.makeQualified(path);
+
+    // convert the LogFileKeys to Keys, sort and collect the mutations
+    Map<Key,List<Mutation>> keyListMap = new TreeMap<>();
+    for (Pair<LogFileKey,LogFileValue> pair : buffer) {
+      var logFileKey = pair.getFirst();
+      var logFileValue = pair.getSecond();
+      Key k = logFileKey.toKey();
+      var list = keyListMap.putIfAbsent(k, logFileValue.mutations);
+      if (list != null) {
+        var muts = new ArrayList<>(list);
+        muts.addAll(logFileValue.mutations);
+        keyListMap.put(logFileKey.toKey(), muts);
+      }
+    }
+
+    try (var writer = FileOperations.getInstance().newWriterBuilder()
+        .forFile(fullPath.toString(), fs, fs.getConf(), context.getCryptoService())
+        .withTableConfiguration(DefaultConfiguration.getInstance()).build()) {

Review comment:
       The use of withTableConfiguration() is interesting, I suspect that will impact what index/data blocks sizes are used for the rfile and also compression.  This opens up the possibility in the future of possibly using snappy instead of gzip for sorted WALs.  The defaults are fine and I am not suggesting any changes, I just thought it was interesting.

##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
##########
@@ -224,6 +214,40 @@ public LogSorter(ServerContext context, AccumuloConfiguration conf) {
     this.walBlockSize = DfsLogger.getWalBlockSize(conf);
   }
 
+  @VisibleForTesting
+  public static void writeBuffer(ServerContext context, String destPath,
+      List<Pair<LogFileKey,LogFileValue>> buffer, int part) throws IOException {
+    String filename = String.format("part-r-%05d.rf", part);
+    Path path = new Path(destPath, filename);
+    FileSystem fs = context.getVolumeManager().getFileSystemByPath(path);
+    Path fullPath = fs.makeQualified(path);
+
+    // convert the LogFileKeys to Keys, sort and collect the mutations
+    Map<Key,List<Mutation>> keyListMap = new TreeMap<>();
+    for (Pair<LogFileKey,LogFileValue> pair : buffer) {
+      var logFileKey = pair.getFirst();
+      var logFileValue = pair.getSecond();
+      Key k = logFileKey.toKey();
+      var list = keyListMap.putIfAbsent(k, logFileValue.mutations);
+      if (list != null) {
+        var muts = new ArrayList<>(list);
+        muts.addAll(logFileValue.mutations);
+        keyListMap.put(logFileKey.toKey(), muts);
+      }

Review comment:
       Seems like this code could possibly be done in a single line using computeIfAbsent.  However if your intent was to avoid copying the list when there is only a single key then that may not be the case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2117: Change sorted recovery to write to RFiles

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#discussion_r646858538



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
##########
@@ -20,60 +20,85 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
 
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ScannerBase;
+import org.apache.accumulo.core.client.rfile.RFile;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.log.SortedLogState;
+import org.apache.accumulo.tserver.logger.LogEvents;
 import org.apache.accumulo.tserver.logger.LogFileKey;
-import org.apache.accumulo.tserver.logger.LogFileValue;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Iterators;
-import com.google.common.collect.UnmodifiableIterator;
 
 /**
  * Iterates over multiple sorted recovery logs merging them into a single sorted stream.
  */
-public class RecoveryLogsIterator implements CloseableIterator<Entry<LogFileKey,LogFileValue>> {
+public class RecoveryLogsIterator implements Iterator<Entry<Key,Value>>, AutoCloseable {

Review comment:
       I use the RFile API here to read each sorted part:
   https://github.com/milleruntime/accumulo/blob/7457fe5e0f753f6c9a1289963075c7caac89b45c/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java#L84-L89




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] ctubbsii commented on a change in pull request #2117: Make sorted recovery write to RFiles

Posted by GitBox <gi...@apache.org>.
ctubbsii commented on a change in pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#discussion_r656625900



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java
##########
@@ -189,4 +198,162 @@ public String toString() {
     }
     throw new RuntimeException("Unknown type of entry: " + event);
   }
+
+  /**
+   * Converts LogFileKey to Key. Creates a Key containing all of the LogFileKey fields. The fields
+   * are stored so the Key sorts maintaining the legacy sort order. The row of the Key is composed
+   * of 3 fields: EventNum + tabletID + seq. The EventNum is the byte returned by eventType(). The
+   * column family is always the event. The column qualifier is dependent of the type of event and
+   * could be empty.
+   *
+   * <pre>
+   *     Key Schema:
+   *     Row = EventNum + tabletID + seq
+   *     Family = event
+   *     Qualifier = tserverSession OR filename OR KeyExtent
+   * </pre>
+   */
+  public Key toKey() throws IOException {
+    byte[] formattedRow;
+    String family = event.name();
+    var kb = Key.builder();
+    switch (event) {
+      case OPEN:
+        formattedRow = formatRow(0, 0);
+        return kb.row(formattedRow).family(family).qualifier(tserverSession).build();
+      case COMPACTION_START:
+        formattedRow = formatRow(tabletId, seq);
+        return kb.row(formattedRow).family(family).qualifier(filename).build();
+      case MUTATION:
+      case MANY_MUTATIONS:
+      case COMPACTION_FINISH:
+        return kb.row(formatRow(tabletId, seq)).family(family).build();
+      case DEFINE_TABLET:
+        formattedRow = formatRow(tabletId, seq);
+        DataOutputBuffer buffer = new DataOutputBuffer();
+        tablet.writeTo(buffer);
+        var q = copyOf(buffer.getData(), buffer.getLength());
+        buffer.close();
+        return kb.row(formattedRow).family(family).qualifier(q).build();
+      default:
+        throw new AssertionError("Invalid event type in LogFileKey: " + event);
+    }
+  }
+
+  /**
+   * Get the first byte for the event. The only possible values are 0-4. This is used as the highest
+   * byte in the row.
+   */
+  private byte getEventByte() {
+    int evenTypeInteger = eventType(event);
+    return (byte) (evenTypeInteger & 0xff);
+  }
+
+  /**
+   * Get the byte encoded row for this LogFileKey as a Text object.
+   */
+  private Text formatRow() {
+    return new Text(formatRow(tabletId, seq));
+  }
+
+  /**
+   * Format the row using 13 bytes encoded to allow proper sorting of the RFile Key. The highest
+   * byte is for the event number, 4 bytes for the tabletId and 8 bytes for the sequence long.
+   */
+  private byte[] formatRow(int tabletId, long seq) {
+    byte eventNum = getEventByte();
+    // These will not sort properly when encoded if negative. Negative is not expected currently,
+    // defending against future changes and/or bugs.
+    Preconditions.checkArgument(eventNum >= 0 && seq >= 0);
+    byte[] row = new byte[13];
+    // encode the signed integer so negatives will sort properly for tabletId
+    int encodedTabletId = tabletId ^ 0x80000000;
+
+    row[0] = eventNum;
+    row[1] = (byte) ((encodedTabletId >>> 24) & 0xff);
+    row[2] = (byte) ((encodedTabletId >>> 16) & 0xff);
+    row[3] = (byte) ((encodedTabletId >>> 8) & 0xff);
+    row[4] = (byte) (encodedTabletId & 0xff);
+    row[5] = (byte) (seq >>> 56);
+    row[6] = (byte) (seq >>> 48);
+    row[7] = (byte) (seq >>> 40);
+    row[8] = (byte) (seq >>> 32);
+    row[9] = (byte) (seq >>> 24);
+    row[10] = (byte) (seq >>> 16);
+    row[11] = (byte) (seq >>> 8);
+    row[12] = (byte) (seq); // >>> 0
+    return row;
+  }
+
+  /**
+   * Extract the tabletId integer from the byte encoded Row.
+   */
+  private static int getTabletId(byte[] row) {
+    int encoded = ((row[1] << 24) + (row[2] << 16) + (row[3] << 8) + row[4]);
+    return encoded ^ 0x80000000;
+  }
+
+  /**
+   * Extract the sequence long from the byte encoded Row.
+   */
+  private static long getSequence(byte[] row) {
+    // @formatter:off
+    return (((long) row[5] << 56) +
+            ((long) (row[6] & 255) << 48) +
+            ((long) (row[7] & 255) << 40) +
+            ((long) (row[8] & 255) << 32) +
+            ((long) (row[9] & 255) << 24) +
+            ((row[10] & 255) << 16) +
+            ((row[11] & 255) << 8) +
+            ((row[12] & 255)));

Review comment:
       Similar comment as before. Should use bitwise `|` instead of `+` to ensure no funny business happens if the shift results in a negative number.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on pull request #2117: Change sorted recovery to write to RFiles

Posted by GitBox <gi...@apache.org>.
milleruntime commented on pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#issuecomment-849076263


   Need to rework my rework in 007d581


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on pull request #2117: Change sorted recovery to write to RFiles

Posted by GitBox <gi...@apache.org>.
milleruntime commented on pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#issuecomment-848895402


   I found a bug with the writeBuffer method while testing in Uno.
   <pre>
   2021-05-26T11:00:44,630 [tserver.AssignmentHandler] WARN : exception trying to assign tablet +r<< null
   java.lang.RuntimeException: Error recovering tablet +r<< from log files
           at org.apache.accumulo.tserver.tablet.Tablet.<init>(Tablet.java:398) ~[accumulo-tserver-2.1.0-SNAPSHOT.jar:2.1.0-SNAPSHOT]
           at org.apache.accumulo.tserver.AssignmentHandler.run(AssignmentHandler.java:160) ~[accumulo-tserver-2.1.0-SNAPSHOT.jar:2.1.0-SNAPSHOT]
           at java.lang.Thread.run(Thread.java:829) [?:?]
   Caused by: java.io.IOException: java.lang.IllegalStateException: First log entry value is not OPEN
           at org.apache.accumulo.tserver.log.TabletServerLogger.recover(TabletServerLogger.java:539) ~[accumulo-tserver-2.1.0-SNAPSHOT.jar:2.1.0-SNAPSHOT]
           at org.apache.accumulo.tserver.TabletServer.recover(TabletServer.java:1148) ~[accumulo-tserver-2.1.0-SNAPSHOT.jar:2.1.0-SNAPSHOT]
           at org.apache.accumulo.tserver.tablet.Tablet.<init>(Tablet.java:357) ~[accumulo-tserver-2.1.0-SNAPSHOT.jar:2.1.0-SNAPSHOT]
           ... 2 more
   Caused by: java.lang.IllegalStateException: First log entry value is not OPEN
           at org.apache.accumulo.tserver.log.RecoveryLogsIterator.validateFirstKey(RecoveryLogsIterator.java:151) ~[accumulo-tserver-2.1.0-SNAPSHOT.jar:2.1.0-SNAPSHOT]
           at org.apache.accumulo.tserver.log.RecoveryLogsIterator.getFiles(RecoveryLogsIterator.java:133) ~[accumulo-tserver-2.1.0-SNAPSHOT.jar:2.1.0-SNAPSHOT]
           at org.apache.accumulo.tserver.log.RecoveryLogsIterator.<init>(RecoveryLogsIterator.java:71) ~[accumulo-tserver-2.1.0-SNAPSHOT.jar:2.1.0-SNAPSHOT]
           at org.apache.accumulo.tserver.log.SortedLogRecovery.findMaxTabletId(SortedLogRecovery.java:113) ~[accumulo-tserver-2.1.0-SNAPSHOT.jar:2.1.0-SNAPSHOT]
           at org.apache.accumulo.tserver.log.SortedLogRecovery.findLogsThatDefineTablet(SortedLogRecovery.java:153) ~[accumulo-tserver-2.1.0-SNAPSHOT.jar:2.1.0-SNAPSHOT]
           at org.apache.accumulo.tserver.log.SortedLogRecovery.recover(SortedLogRecovery.java:299) ~[accumulo-tserver-2.1.0-SNAPSHOT.jar:2.1.0-SNAPSHOT]
           at org.apache.accumulo.tserver.log.TabletServerLogger.recover(TabletServerLogger.java:537) ~[accumulo-tserver-2.1.0-SNAPSHOT.jar:2.1.0-SNAPSHOT]
           at org.apache.accumulo.tserver.TabletServer.recover(TabletServer.java:1148) ~[accumulo-tserver-2.1.0-SNAPSHOT.jar:2.1.0-SNAPSHOT]
           at org.apache.accumulo.tserver.tablet.Tablet.<init>(Tablet.java:357) ~[accumulo-tserver-2.1.0-SNAPSHOT.jar:2.1.0-SNAPSHOT]
           ... 2 more
   </pre>
   
   This is because the entries get flushed in the LogSorter based on `tserver.sort.buffer.size` but the validation is still being run for each sorted part. I will have to find a better place to call `validateFirstKey`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org