You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@accumulo.apache.org by GitBox <gi...@apache.org> on 2021/06/04 18:56:23 UTC

[GitHub] [accumulo] keith-turner commented on a change in pull request #2117: Change sorted recovery to write to RFiles

keith-turner commented on a change in pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#discussion_r645785953



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java
##########
@@ -189,4 +195,90 @@ public String toString() {
     }
     throw new RuntimeException("Unknown type of entry: " + event);
   }
+
+  /**
+   * Converts LogFileKey to Key. Creates a Key containing all of the LogFileKey fields. The fields
+   * are stored so the Key sorts maintaining the legacy sort order. The row of the Key is composed
+   * of 3 fields: EventNum + tabletID + seq (separated by underscore). The EventNum is the integer
+   * returned by eventType(). The column family is always the event. The column qualifier is
+   * dependent of the type of event and could be empty.
+   *
+   * <pre>
+   *     Key Schema:
+   *     Row = EventNum_tabletID_seq
+   *     Family = event
+   *     Qualifier = tserverSession OR filename OR KeyExtent
+   * </pre>
+   */
+  public Key toKey() throws IOException {
+    String row = "";
+    int eventNum = eventType(event);
+    String family = event.name();
+    String qual = "";
+    switch (event) {
+      case OPEN:
+        row = formatRow(eventNum, 0, 0);
+        qual = tserverSession;
+        break;
+      case COMPACTION_START:
+        row = formatRow(eventNum, tabletId, seq);
+        if (filename != null)
+          qual = filename;
+        break;
+      case MUTATION:
+      case MANY_MUTATIONS:
+      case COMPACTION_FINISH:
+        row = formatRow(eventNum, tabletId, seq);
+        break;
+      case DEFINE_TABLET:
+        row = formatRow(eventNum, tabletId, seq);
+        // Base64 encode KeyExtent
+        DataOutputBuffer buffer = new DataOutputBuffer();
+        tablet.writeTo(buffer);
+        qual = Base64.getEncoder().encodeToString(copyOf(buffer.getData(), buffer.getLength()));
+        buffer.close();
+        break;
+    }
+    return new Key(new Text(row), new Text(family), new Text(qual));
+  }
+
+  // format row = 1_000001_0000000001
+  private String formatRow(int eventNum, int tabletId, long seq) {
+    return String.format("%d_%06d_%010d", eventNum, tabletId, seq);

Review comment:
       Along time ago when writing continuous ingest, I found that String.format was super slow (It was the slowest thing in the data generation).  Not sure if that is still the case, may be something to consider and look into as this could have a lot of data going through it.

##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
##########
@@ -224,6 +214,40 @@ public LogSorter(ServerContext context, AccumuloConfiguration conf) {
     this.walBlockSize = DfsLogger.getWalBlockSize(conf);
   }
 
+  @VisibleForTesting
+  public static void writeBuffer(ServerContext context, String destPath,
+      List<Pair<LogFileKey,LogFileValue>> buffer, int part) throws IOException {
+    String filename = String.format("part-r-%05d.rf", part);
+    Path path = new Path(destPath, filename);
+    FileSystem fs = context.getVolumeManager().getFileSystemByPath(path);
+    Path fullPath = fs.makeQualified(path);
+
+    // convert the LogFileKeys to Keys, sort and collect the mutations
+    Map<Key,List<Mutation>> keyListMap = new TreeMap<>();
+    for (Pair<LogFileKey,LogFileValue> pair : buffer) {
+      var logFileKey = pair.getFirst();
+      var logFileValue = pair.getSecond();
+      Key k = logFileKey.toKey();
+      var list = keyListMap.putIfAbsent(k, logFileValue.mutations);
+      if (list != null) {
+        var muts = new ArrayList<>(list);
+        muts.addAll(logFileValue.mutations);
+        keyListMap.put(logFileKey.toKey(), muts);
+      }
+    }
+
+    try (var writer = FileOperations.getInstance().newWriterBuilder()
+        .forFile(fullPath.toString(), fs, fs.getConf(), context.getCryptoService())
+        .withTableConfiguration(DefaultConfiguration.getInstance()).build()) {

Review comment:
       The use of withTableConfiguration() is interesting, I suspect that will impact what index/data blocks sizes are used for the rfile and also compression.  This opens up the possibility in the future of possibly using snappy instead of gzip for sorted WALs.  The defaults are fine and I am not suggesting any changes, I just thought it was interesting.

##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
##########
@@ -224,6 +214,40 @@ public LogSorter(ServerContext context, AccumuloConfiguration conf) {
     this.walBlockSize = DfsLogger.getWalBlockSize(conf);
   }
 
+  @VisibleForTesting
+  public static void writeBuffer(ServerContext context, String destPath,
+      List<Pair<LogFileKey,LogFileValue>> buffer, int part) throws IOException {
+    String filename = String.format("part-r-%05d.rf", part);
+    Path path = new Path(destPath, filename);
+    FileSystem fs = context.getVolumeManager().getFileSystemByPath(path);
+    Path fullPath = fs.makeQualified(path);
+
+    // convert the LogFileKeys to Keys, sort and collect the mutations
+    Map<Key,List<Mutation>> keyListMap = new TreeMap<>();
+    for (Pair<LogFileKey,LogFileValue> pair : buffer) {
+      var logFileKey = pair.getFirst();
+      var logFileValue = pair.getSecond();
+      Key k = logFileKey.toKey();
+      var list = keyListMap.putIfAbsent(k, logFileValue.mutations);
+      if (list != null) {
+        var muts = new ArrayList<>(list);
+        muts.addAll(logFileValue.mutations);
+        keyListMap.put(logFileKey.toKey(), muts);
+      }

Review comment:
       Seems like this code could possibly be done in a single line using computeIfAbsent.  However if your intent was to avoid copying the list when there is only a single key then that may not be the case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org