You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mm...@apache.org on 2022/09/22 15:02:05 UTC

[accumulo] branch main updated: Fix crypto in GenerateSplits (#2949)

This is an automated email from the ASF dual-hosted git repository.

mmiller pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 679302adbf Fix crypto in GenerateSplits (#2949)
679302adbf is described below

commit 679302adbf7cec2ec1067ef600fcd8458f94409e
Author: Mike Miller <mm...@apache.org>
AuthorDate: Thu Sep 22 15:01:59 2022 +0000

    Fix crypto in GenerateSplits (#2949)
---
 .../accumulo/core/file/rfile/GenerateSplits.java   | 38 +++++++++++++---------
 1 file changed, 22 insertions(+), 16 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java
index 1d33a2c8d9..df75905e43 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java
@@ -38,6 +38,7 @@ import java.util.TreeSet;
 import org.apache.accumulo.core.cli.ConfigOpts;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.crypto.CryptoFactoryLoader;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
@@ -45,7 +46,8 @@ import org.apache.accumulo.core.file.FileOperations;
 import org.apache.accumulo.core.file.FileSKVIterator;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator;
-import org.apache.accumulo.core.spi.crypto.NoCryptoServiceFactory;
+import org.apache.accumulo.core.spi.crypto.CryptoEnvironment;
+import org.apache.accumulo.core.spi.crypto.CryptoService;
 import org.apache.accumulo.core.util.TextUtil;
 import org.apache.accumulo.start.spi.KeywordExecutable;
 import org.apache.datasketches.quantiles.ItemsSketch;
@@ -83,7 +85,7 @@ public class GenerateSplits implements KeywordExecutable {
     @Parameter(names = {"-sf", "--splits-file"}, description = "Output the splits to a file")
     public String outputFile;
 
-    @Parameter(description = " <file|directory> { <file|directory> ... }")
+    @Parameter(description = "<file|directory>[ <file|directory>...] -n <num> | -ss <split_size>")
     public List<String> files = new ArrayList<>();
   }
 
@@ -111,6 +113,8 @@ public class GenerateSplits implements KeywordExecutable {
 
     Configuration hadoopConf = new Configuration();
     SiteConfiguration siteConf = opts.getSiteConfiguration();
+    CryptoService cryptoService = CryptoFactoryLoader
+        .getServiceForClient(CryptoEnvironment.Scope.TABLE, siteConf.getAllCryptoProperties());
     boolean encode = opts.base64encode;
 
     TreeSet<String> splits;
@@ -140,16 +144,18 @@ public class GenerateSplits implements KeywordExecutable {
 
     // if no size specified look at indexed keys first
     if (opts.splitSize == 0) {
-      splits = getIndexKeys(siteConf, hadoopConf, fs, filePaths, requestedNumSplits, encode);
+      splits = getIndexKeys(siteConf, hadoopConf, fs, filePaths, requestedNumSplits, encode,
+          cryptoService);
       // if there weren't enough splits indexed, try again with size = 0
       if (splits.size() < requestedNumSplits) {
         log.info("Only found {} indexed keys but need {}. Doing a full scan on files {}",
             splits.size(), requestedNumSplits, filePaths);
-        splits =
-            getSplitsFromFullScan(siteConf, hadoopConf, filePaths, fs, requestedNumSplits, encode);
+        splits = getSplitsFromFullScan(siteConf, hadoopConf, filePaths, fs, requestedNumSplits,
+            encode, cryptoService);
       }
     } else {
-      splits = getSplitsBySize(siteConf, hadoopConf, filePaths, fs, splitSize, encode);
+      splits =
+          getSplitsBySize(siteConf, hadoopConf, filePaths, fs, splitSize, encode, cryptoService);
     }
 
     TreeSet<String> desiredSplits;
@@ -258,16 +264,16 @@ public class GenerateSplits implements KeywordExecutable {
    * Scan the files for indexed keys first since it is more efficient than a full file scan.
    */
   private TreeSet<String> getIndexKeys(AccumuloConfiguration accumuloConf, Configuration hadoopConf,
-      FileSystem fs, List<Path> files, int requestedNumSplits, boolean base64encode)
-      throws IOException {
+      FileSystem fs, List<Path> files, int requestedNumSplits, boolean base64encode,
+      CryptoService cs) throws IOException {
     Text[] splitArray;
     List<SortedKeyValueIterator<Key,Value>> readers = new ArrayList<>(files.size());
     List<FileSKVIterator> fileReaders = new ArrayList<>(files.size());
     try {
       for (Path file : files) {
         FileSKVIterator reader = FileOperations.getInstance().newIndexReaderBuilder()
-            .forFile(file.toString(), fs, hadoopConf, NoCryptoServiceFactory.NONE)
-            .withTableConfiguration(accumuloConf).build();
+            .forFile(file.toString(), fs, hadoopConf, cs).withTableConfiguration(accumuloConf)
+            .build();
         readers.add(reader);
         fileReaders.add(reader);
       }
@@ -286,7 +292,7 @@ public class GenerateSplits implements KeywordExecutable {
 
   private TreeSet<String> getSplitsFromFullScan(SiteConfiguration accumuloConf,
       Configuration hadoopConf, List<Path> files, FileSystem fs, int numSplits,
-      boolean base64encode) throws IOException {
+      boolean base64encode, CryptoService cs) throws IOException {
     Text[] splitArray;
     List<FileSKVIterator> fileReaders = new ArrayList<>(files.size());
     List<SortedKeyValueIterator<Key,Value>> readers = new ArrayList<>(files.size());
@@ -295,8 +301,8 @@ public class GenerateSplits implements KeywordExecutable {
     try {
       for (Path file : files) {
         FileSKVIterator reader = FileOperations.getInstance().newScanReaderBuilder()
-            .forFile(file.toString(), fs, hadoopConf, NoCryptoServiceFactory.NONE)
-            .withTableConfiguration(accumuloConf).overRange(new Range(), Set.of(), false).build();
+            .forFile(file.toString(), fs, hadoopConf, cs).withTableConfiguration(accumuloConf)
+            .overRange(new Range(), Set.of(), false).build();
         readers.add(reader);
         fileReaders.add(reader);
       }
@@ -319,7 +325,7 @@ public class GenerateSplits implements KeywordExecutable {
    */
   private TreeSet<String> getSplitsBySize(AccumuloConfiguration accumuloConf,
       Configuration hadoopConf, List<Path> files, FileSystem fs, long splitSize,
-      boolean base64encode) throws IOException {
+      boolean base64encode, CryptoService cs) throws IOException {
     long currentSplitSize = 0;
     long totalSize = 0;
     TreeSet<String> splits = new TreeSet<>();
@@ -329,8 +335,8 @@ public class GenerateSplits implements KeywordExecutable {
     try {
       for (Path file : files) {
         FileSKVIterator reader = FileOperations.getInstance().newScanReaderBuilder()
-            .forFile(file.toString(), fs, hadoopConf, NoCryptoServiceFactory.NONE)
-            .withTableConfiguration(accumuloConf).overRange(new Range(), Set.of(), false).build();
+            .forFile(file.toString(), fs, hadoopConf, cs).withTableConfiguration(accumuloConf)
+            .overRange(new Range(), Set.of(), false).build();
         readers.add(reader);
         fileReaders.add(reader);
       }