You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by dl...@apache.org on 2022/09/29 14:21:13 UTC

[accumulo] branch main updated: Use internal RFile scan code for RecoveryLogsIterator (#2982)

This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new b14df03bd7 Use internal RFile scan code for RecoveryLogsIterator (#2982)
b14df03bd7 is described below

commit b14df03bd7eee697fa9002832f03c2aaa7442695
Author: Dave Marion <dl...@apache.org>
AuthorDate: Thu Sep 29 10:21:08 2022 -0400

    Use internal RFile scan code for RecoveryLogsIterator (#2982)
    
    RecoveryLogsIterator was using the client side RFile scanner
    which does not allow for the Recovery scope to be used. This
    change modifies the RecoveryLogsIterator to use the server
    side RFile reader code (FileOperations) with the Recovery
    crypto environment
    
    Closes #2979
---
 .../accumulo/tserver/log/RecoveryLogsIterator.java | 57 +++++++++++++---------
 1 file changed, 35 insertions(+), 22 deletions(-)

diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
index 9403de4f0d..bc7ffc4eec 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
@@ -21,16 +21,20 @@ package org.apache.accumulo.tserver.log;
 import java.io.IOException;
 import java.util.AbstractMap;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
 
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.ScannerBase;
-import org.apache.accumulo.core.client.rfile.RFile;
+import org.apache.accumulo.core.crypto.CryptoEnvironmentImpl;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.iterators.IteratorAdapter;
+import org.apache.accumulo.core.spi.crypto.CryptoEnvironment;
+import org.apache.accumulo.core.spi.crypto.CryptoService;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.log.SortedLogState;
@@ -53,8 +57,9 @@ public class RecoveryLogsIterator
 
   private static final Logger LOG = LoggerFactory.getLogger(RecoveryLogsIterator.class);
 
-  private final List<Scanner> scanners;
+  private final List<FileSKVIterator> fileIters;
   private final Iterator<Entry<Key,Value>> iter;
+  private final CryptoEnvironment env = new CryptoEnvironmentImpl(CryptoEnvironment.Scope.RECOVERY);
 
   /**
    * Scans the files in each recoveryLogDir over the range [start,end].
@@ -63,34 +68,39 @@ public class RecoveryLogsIterator
       LogFileKey end, boolean checkFirstKey) throws IOException {
 
     List<Iterator<Entry<Key,Value>>> iterators = new ArrayList<>(recoveryLogDirs.size());
-    scanners = new ArrayList<>();
+    fileIters = new ArrayList<>();
     Range range = start == null ? null : LogFileKey.toRange(start, end);
     var vm = context.getVolumeManager();
 
+    final CryptoService cryptoService = context.getCryptoFactory().getService(env,
+        context.getConfiguration().getAllCryptoProperties());
+
     for (Path logDir : recoveryLogDirs) {
       LOG.debug("Opening recovery log dir {}", logDir.getName());
       List<Path> logFiles = getFiles(vm, logDir);
       var fs = vm.getFileSystemByPath(logDir);
 
       // only check the first key once to prevent extra iterator creation and seeking
-      if (checkFirstKey) {
-        validateFirstKey(context, fs, logFiles, logDir);
+      if (checkFirstKey && !logFiles.isEmpty()) {
+        validateFirstKey(context, cryptoService, fs, logFiles, logDir);
       }
 
       for (Path log : logFiles) {
-        var scanner = RFile.newScanner().from(log.toString()).withFileSystem(fs)
-            .withTableProperties(context.getConfiguration()).build();
-
-        scanner.setRange(range);
-        Iterator<Entry<Key,Value>> scanIter = scanner.iterator();
+        FileSKVIterator fileIter = FileOperations.getInstance().newReaderBuilder()
+            .forFile(log.toString(), fs, fs.getConf(), cryptoService)
+            .withTableConfiguration(context.getConfiguration()).seekToBeginning().build();
+        if (range != null) {
+          fileIter.seek(range, Collections.emptySet(), false);
+        }
+        Iterator<Entry<Key,Value>> scanIter = new IteratorAdapter(fileIter);
 
         if (scanIter.hasNext()) {
           LOG.debug("Write ahead log {} has data in range {} {}", log.getName(), start, end);
           iterators.add(scanIter);
-          scanners.add(scanner);
+          fileIters.add(fileIter);
         } else {
           LOG.debug("Write ahead log {} has no data in range {} {}", log.getName(), start, end);
-          scanner.close();
+          fileIter.close();
         }
       }
     }
@@ -115,8 +125,10 @@ public class RecoveryLogsIterator
   }
 
   @Override
-  public void close() {
-    scanners.forEach(ScannerBase::close);
+  public void close() throws IOException {
+    for (FileSKVIterator fskv : fileIters) {
+      fskv.close();
+    }
   }
 
   /**
@@ -148,12 +160,13 @@ public class RecoveryLogsIterator
   /**
    * Check that the first entry in the WAL is OPEN. Only need to do this once.
    */
-  private void validateFirstKey(ServerContext context, FileSystem fs, List<Path> logFiles,
-      Path fullLogPath) {
-    try (var scanner =
-        RFile.newScanner().from(logFiles.stream().map(Path::toString).toArray(String[]::new))
-            .withFileSystem(fs).withTableProperties(context.getConfiguration()).build()) {
-      Iterator<Entry<Key,Value>> iterator = scanner.iterator();
+  private void validateFirstKey(ServerContext context, CryptoService cs, FileSystem fs,
+      List<Path> logFiles, Path fullLogPath) throws IOException {
+    try (FileSKVIterator fileIter = FileOperations.getInstance().newReaderBuilder()
+        .forFile(logFiles.get(0).toString(), fs, fs.getConf(), cs)
+        .withTableConfiguration(context.getConfiguration()).seekToBeginning().build()) {
+      Iterator<Entry<Key,Value>> iterator = new IteratorAdapter(fileIter);
+
       if (iterator.hasNext()) {
         Key firstKey = iterator.next().getKey();
         LogFileKey key = LogFileKey.fromKey(firstKey);