You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@accumulo.apache.org by GitBox <gi...@apache.org> on 2021/11/29 17:10:35 UTC

[GitHub] [accumulo] keith-turner commented on a change in pull request #2368: Create GenerateSplits utility

keith-turner commented on a change in pull request #2368:
URL: https://github.com/apache/accumulo/pull/2368#discussion_r758496857



##########
File path: core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.BufferedWriter;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.cli.ClientOpts;
+import org.apache.accumulo.core.cli.ConfigOpts;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.crypto.CryptoServiceFactory;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.accumulo.start.spi.KeywordExecutable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.google.auto.service.AutoService;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@AutoService(KeywordExecutable.class)
+@SuppressFBWarnings(value = "PATH_TRAVERSAL_OUT",
+    justification = "app is run in same security context as user providing the filename")
+public class GenerateSplits implements KeywordExecutable {
+  private static final Logger log = LoggerFactory.getLogger(GenerateSplits.class);
+
+  static class Opts extends ConfigOpts {
+    @Parameter(names = {"-a", "--auths"}, converter = ClientOpts.AuthConverter.class,
+        description = "the authorizations to use when reading the files")
+    public Authorizations auths = Authorizations.EMPTY;
+
+    @Parameter(names = {"-n", "--num"}, description = "The number of splits to generate")

Review comment:
       ```suggestion
       @Parameter(names = {"-n", "--num"}, description = "The number of splits to generate. Cannot use with the split size option.")
   ```

##########
File path: core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.BufferedWriter;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.cli.ClientOpts;
+import org.apache.accumulo.core.cli.ConfigOpts;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.crypto.CryptoServiceFactory;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.accumulo.start.spi.KeywordExecutable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.google.auto.service.AutoService;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@AutoService(KeywordExecutable.class)
+@SuppressFBWarnings(value = "PATH_TRAVERSAL_OUT",
+    justification = "app is run in same security context as user providing the filename")
+public class GenerateSplits implements KeywordExecutable {
+  private static final Logger log = LoggerFactory.getLogger(GenerateSplits.class);
+
+  static class Opts extends ConfigOpts {
+    @Parameter(names = {"-a", "--auths"}, converter = ClientOpts.AuthConverter.class,
+        description = "the authorizations to use when reading the files")
+    public Authorizations auths = Authorizations.EMPTY;
+
+    @Parameter(names = {"-n", "--num"}, description = "The number of splits to generate")
+    public long numSplits = 0;
+
+    @Parameter(names = {"-ss", "--split-size"}, description = "The split size desired in bytes")

Review comment:
       ```suggestion
       @Parameter(names = {"-ss", "--split-size"}, description = "The split size desired in bytes. Cannot use with num splits option.")
   ```

##########
File path: core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.BufferedWriter;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.cli.ClientOpts;
+import org.apache.accumulo.core.cli.ConfigOpts;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.crypto.CryptoServiceFactory;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.accumulo.start.spi.KeywordExecutable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.google.auto.service.AutoService;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@AutoService(KeywordExecutable.class)
+@SuppressFBWarnings(value = "PATH_TRAVERSAL_OUT",
+    justification = "app is run in same security context as user providing the filename")
+public class GenerateSplits implements KeywordExecutable {
+  private static final Logger log = LoggerFactory.getLogger(GenerateSplits.class);
+
+  static class Opts extends ConfigOpts {
+    @Parameter(names = {"-a", "--auths"}, converter = ClientOpts.AuthConverter.class,
+        description = "the authorizations to use when reading the files")
+    public Authorizations auths = Authorizations.EMPTY;
+
+    @Parameter(names = {"-n", "--num"}, description = "The number of splits to generate")
+    public long numSplits = 0;
+
+    @Parameter(names = {"-ss", "--split-size"}, description = "The split size desired in bytes")
+    public long splitSize = 0;
+
+    @Parameter(names = {"-b64", "--base64encoded"}, description = "Base 64 encode the split points")
+    public boolean base64encode = false;
+
+    @Parameter(names = {"-sf", "--splits-file"}, description = "Output the splits to a file")
+    public String outputFile;
+
+    @Parameter(description = " <file|directory> { <file> ... }")
+    public List<String> files = new ArrayList<>();
+  }
+
+  @Override
+  public String keyword() {
+    return "generate-splits";
+  }
+
+  @Override
+  public String description() {
+    return "Generate split points from a set of 1 or more rfiles";
+  }
+
+  public static void main(String[] args) throws Exception {
+    new GenerateSplits().execute(args);
+  }
+
+  @Override
+  public void execute(String[] args) throws Exception {
+    Opts opts = new Opts();
+    opts.parseArgs(GenerateSplits.class.getName(), args);
+    if (opts.files.isEmpty()) {
+      throw new IllegalArgumentException("No files were given");
+    }
+
+    Configuration hadoopConf = new Configuration();
+    SiteConfiguration siteConf = opts.getSiteConfiguration();
+    boolean encode = opts.base64encode;
+
+    TreeSet<String> splits;
+    TreeSet<String> desiredSplits = new TreeSet<>();
+
+    if (opts.numSplits > 0 && opts.splitSize > 0) {
+      throw new IllegalArgumentException("Requested number of splits and split size.");
+    }
+    if (opts.numSplits == 0 && opts.splitSize == 0) {
+      throw new IllegalArgumentException("Required number of splits or split size.");
+    }
+    long numSplits = opts.numSplits;
+    long splitSize = opts.splitSize;
+
+    FileSystem fs = FileSystem.get(hadoopConf);
+    FileSystem localFs = FileSystem.getLocal(hadoopConf);
+    List<Path> filePaths = new ArrayList<>();
+    for (String file : opts.files) {
+      Path path = new Path(file);
+      if (file.contains(":")) {
+        fs = path.getFileSystem(hadoopConf);
+      } else {
+        log.warn("Attempting to find file across filesystems. Consider providing URI "
+            + "instead of path");
+        if (!fs.exists(path))
+          fs = localFs; // fall back to local
+      }
+      // get all the files in the directory
+      if (fs.getFileStatus(path).isDirectory()) {
+        // can only explode one directory
+        if (opts.files.size() > 1)
+          throw new IllegalArgumentException("Only one directory can be specified");
+        var iter = fs.listFiles(path, true);
+        while (iter.hasNext()) {
+          filePaths.add(iter.next().getPath());
+        }
+      } else {
+        filePaths.add(path);
+      }
+    }
+
+    // if no size specified look at indexed keys first
+    if (opts.splitSize == 0) {
+      splits = getIndexKeys(siteConf, hadoopConf, fs, filePaths, encode);
+      // if there weren't enough splits indexed, try again with size = 0
+      if (splits.size() < numSplits) {
+        log.info("Only found {} indexed keys but need {}. Doing a full scan on files {}",
+            splits.size(), numSplits, filePaths);
+        splits = getSplitsBySize(siteConf, hadoopConf, filePaths, fs, 0, encode);
+      }
+    } else {
+      splits = getSplitsBySize(siteConf, hadoopConf, filePaths, fs, splitSize, encode);
+    }
+
+    int numFound = splits.size();
+    // its possible we found too many indexed so take every (numFound / numSplits) split
+    if (opts.splitSize == 0 && numFound > numSplits) {
+      var iter = splits.iterator();
+      long factor = numFound / numSplits;
+      log.debug("Found {} splits but requested {} picking 1 every {}", numFound, opts.numSplits,
+          factor);
+      for (int i = 0; i < numFound; i++) {
+        String next = iter.next();
+        if (i % factor == 0 && desiredSplits.size() < numSplits) {
+          desiredSplits.add(next);
+        }
+      }

Review comment:
       For a case where numFound is 150 and numSplits is 100, 150/100 is 1, so factor is 1.   `i % 1` is always zero, so I think the first 100 splits out of the 150 will be taken.  This probably not desired.   Thinking an approach similar to the following will yield better results.  Hopefully the following code would take splits `0,1,3,4,6,7,9,10, etc` but I am not sure its correct... however that is the intent.
   
   ```suggestion
         double targetFactor = (double)numFound / numSplits;
         log.debug("Found {} splits but requested {} picking 1 every {}", numFound, opts.numSplits,
             factor);
     
         for (int i = 0; i < numFound; i++) {
           double currFactor = (double)(i+1)/desiredSplits.size();
           String next = iter.next();
           // unsure if this should be currFactor >= targetFactor .. need to think through some edge cases
           if (currFactor > targetFactor && desiredSplits.size() < numSplits) {
             desiredSplits.add(next);
           }
         }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org