You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@accumulo.apache.org by GitBox <gi...@apache.org> on 2021/11/23 21:41:21 UTC

[GitHub] [accumulo] milleruntime opened a new pull request #2368: Create GenerateSplits utility

milleruntime opened a new pull request #2368:
URL: https://github.com/apache/accumulo/pull/2368


   * New command line utility to generate splits from 1 or more rfiles
   * Create new GenerateSplitsTest
   * Add to KeywordStartIT
   * Closes #2361


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2368: Create GenerateSplits utility

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2368:
URL: https://github.com/apache/accumulo/pull/2368#discussion_r758683185



##########
File path: core/src/test/java/org/apache/accumulo/core/file/rfile/GenerateSplitsTest.java
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import static org.apache.accumulo.core.file.rfile.GenerateSplits.main;
+import static org.apache.accumulo.core.file.rfile.RFileTest.newColFamByteSequence;
+import static org.apache.accumulo.core.file.rfile.RFileTest.newKey;
+import static org.apache.accumulo.core.file.rfile.RFileTest.newValue;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.List;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "paths provided by test")
+public class GenerateSplitsTest {
+  private static final Logger log = LoggerFactory.getLogger(GenerateSplitsTest.class);
+
+  @ClassRule
+  public static final TemporaryFolder tempFolder =
+      new TemporaryFolder(new File(System.getProperty("user.dir") + "/target"));
+
+  private static final RFileTest.TestRFile trf = new RFileTest.TestRFile(null);
+  private static String fileName;
+
+  /**
+   * Creates a test file with 84 bytes of data and 2 Locality groups.
+   */
+  @BeforeClass
+  public static void createFile() throws IOException {
+    trf.openWriter(false);
+    trf.writer.startNewLocalityGroup("lg1", newColFamByteSequence("cf1", "cf2"));
+    trf.writer.append(newKey("r1", "cf1", "cq1", "L1", 55), newValue("foo1"));
+    trf.writer.append(newKey("r2", "cf2", "cq1", "L1", 55), newValue("foo2"));
+    trf.writer.append(newKey("r3", "cf2", "cq1", "L1", 55), newValue("foo3"));
+    trf.writer.startNewLocalityGroup("lg2", newColFamByteSequence("cf3", "cf4"));
+    trf.writer.append(newKey("r4", "cf3", "cq1", "L1", 55), newValue("foo4"));
+    trf.writer.append(newKey("r5", "cf4", "cq1", "L1", 55), newValue("foo5"));
+    trf.writer.append(newKey("r6", "cf4", "cq1", "L1", 55), newValue("foo6"));
+    trf.closeWriter();
+
+    File file = tempFolder.newFile("testGenerateSplits.rf");
+    FileOutputStream fileOutputStream = new FileOutputStream(file);
+    fileOutputStream.write(trf.baos.toByteArray());
+    fileOutputStream.flush();
+    fileOutputStream.close();
+
+    fileName = file.getAbsolutePath();
+    log.info("Wrote to file {}", fileName);
+  }
+
+  @AfterClass
+  public static void cleanUp() {
+    File file = new File(fileName);
+    if (file.delete())
+      log.info("Cleaned up test file {}", fileName);
+  }
+
+  @Test
+  public void testNum() throws Exception {
+    List<String> args = List.of(fileName, "--num", "2");
+    log.info("Invoking GenerateSplits with {}", args);
+    PrintStream oldOut = System.out;
+    try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        PrintStream newOut = new PrintStream(baos)) {
+      System.setOut(newOut);

Review comment:
       Replaced the STDOUT analysis with a file in each of the tests.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2368: Create GenerateSplits utility

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2368:
URL: https://github.com/apache/accumulo/pull/2368#discussion_r759311371



##########
File path: core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java
##########
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.BufferedWriter;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.cli.ClientOpts;
+import org.apache.accumulo.core.cli.ConfigOpts;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.crypto.CryptoServiceFactory;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.accumulo.start.spi.KeywordExecutable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.google.auto.service.AutoService;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@AutoService(KeywordExecutable.class)
+@SuppressFBWarnings(value = "PATH_TRAVERSAL_OUT",
+    justification = "app is run in same security context as user providing the filename")
+public class GenerateSplits implements KeywordExecutable {
+  private static final Logger log = LoggerFactory.getLogger(GenerateSplits.class);
+
+  static class Opts extends ConfigOpts {
+    @Parameter(names = {"-a", "--auths"}, converter = ClientOpts.AuthConverter.class,
+        description = "the authorizations to use when reading the files")
+    public Authorizations auths = Authorizations.EMPTY;
+
+    @Parameter(names = {"-n", "--num"},
+        description = "The number of splits to generate. Cannot use with the split size option.")
+    public long numSplits = 0;
+
+    @Parameter(names = {"-ss", "--split-size"},
+        description = "The split size desired in bytes. Cannot use with num splits option.")
+    public long splitSize = 0;
+
+    @Parameter(names = {"-b64", "--base64encoded"}, description = "Base 64 encode the split points")
+    public boolean base64encode = false;
+
+    @Parameter(names = {"-sf", "--splits-file"}, description = "Output the splits to a file")
+    public String outputFile;
+
+    @Parameter(description = " <file|directory> { <file> ... }")
+    public List<String> files = new ArrayList<>();
+  }
+
+  @Override
+  public String keyword() {
+    return "generate-splits";
+  }
+
+  @Override
+  public String description() {
+    return "Generate split points from a set of 1 or more rfiles";
+  }
+
+  public static void main(String[] args) throws Exception {
+    new GenerateSplits().execute(args);
+  }
+
+  @Override
+  public void execute(String[] args) throws Exception {
+    Opts opts = new Opts();
+    opts.parseArgs(GenerateSplits.class.getName(), args);
+    if (opts.files.isEmpty()) {
+      throw new IllegalArgumentException("No files were given");
+    }
+
+    Configuration hadoopConf = new Configuration();
+    SiteConfiguration siteConf = opts.getSiteConfiguration();
+    boolean encode = opts.base64encode;
+
+    TreeSet<String> splits;
+    TreeSet<String> desiredSplits = new TreeSet<>();
+
+    if (opts.numSplits > 0 && opts.splitSize > 0) {
+      throw new IllegalArgumentException("Requested number of splits and split size.");
+    }
+    if (opts.numSplits == 0 && opts.splitSize == 0) {
+      throw new IllegalArgumentException("Required number of splits or split size.");
+    }
+    long numSplits = opts.numSplits;
+    long splitSize = opts.splitSize;
+
+    FileSystem fs = FileSystem.get(hadoopConf);
+    FileSystem localFs = FileSystem.getLocal(hadoopConf);
+    List<Path> filePaths = new ArrayList<>();
+    for (String file : opts.files) {
+      Path path = new Path(file);
+      if (file.contains(":")) {
+        fs = path.getFileSystem(hadoopConf);
+      } else {
+        log.warn("Attempting to find file across filesystems. Consider providing URI "
+            + "instead of path");
+        if (!fs.exists(path))
+          fs = localFs; // fall back to local
+      }
+      // get all the files in the directory
+      if (fs.getFileStatus(path).isDirectory()) {
+        // can only explode one directory
+        if (opts.files.size() > 1)
+          throw new IllegalArgumentException("Only one directory can be specified");
+        var iter = fs.listFiles(path, true);
+        while (iter.hasNext()) {
+          filePaths.add(iter.next().getPath());
+        }
+      } else {
+        filePaths.add(path);
+      }
+    }
+
+    // if no size specified look at indexed keys first
+    if (opts.splitSize == 0) {
+      splits = getIndexKeys(siteConf, hadoopConf, fs, filePaths, encode);
+      // if there weren't enough splits indexed, try again with size = 0
+      if (splits.size() < numSplits) {
+        log.info("Only found {} indexed keys but need {}. Doing a full scan on files {}",
+            splits.size(), numSplits, filePaths);
+        splits = getSplitsBySize(siteConf, hadoopConf, filePaths, fs, 0, encode);
+      }
+    } else {
+      splits = getSplitsBySize(siteConf, hadoopConf, filePaths, fs, splitSize, encode);
+    }
+
+    int numFound = splits.size();
+    // its possible we found too many indexed so take every (numFound / numSplits) split
+    if (opts.splitSize == 0 && numFound > numSplits) {
+      var iter = splits.iterator();
+      double targetFactor = (double) numFound / numSplits;
+      log.debug("Found {} splits but requested {} picking 1 every {}", numFound, opts.numSplits,
+          targetFactor);
+
+      for (int i = 0; i < numFound; i++) {
+        double currFactor = (double) (i + 1) / desiredSplits.size();
+        String next = iter.next();
+        // unsure if this should be currFactor >= targetFactor
+        if (currFactor > targetFactor && desiredSplits.size() < numSplits) {
+          desiredSplits.add(next);
+        }
+      }

Review comment:
       > total -= 1;
   
   This doesn't seem right to me. Should this be `total -= increment` or `total = 0`? This seems to add too many splits in a row.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on pull request #2368: Create GenerateSplits utility

Posted by GitBox <gi...@apache.org>.
milleruntime commented on pull request #2368:
URL: https://github.com/apache/accumulo/pull/2368#issuecomment-983592263


   I was talking with @ctubbsii a bit about the many different edge cases that can be encountered with splitting data evenly. Since this tool is a good opportunity to use a library which may help, I am going to take a look at the Apache Datasketches project, starting with the quantiles in ItemsSketch: https://datasketches.apache.org/api/java/snapshot/apidocs/org/apache/datasketches/quantiles/ItemsSketch.html#getQuantiles-int-


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] ctubbsii commented on pull request #2368: Create GenerateSplits utility

Posted by GitBox <gi...@apache.org>.
ctubbsii commented on pull request #2368:
URL: https://github.com/apache/accumulo/pull/2368#issuecomment-983998202


   Based on the behavior, we probably want to just drop the first and last items in the returned array, then.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2368: Create GenerateSplits utility

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2368:
URL: https://github.com/apache/accumulo/pull/2368#discussion_r758567995



##########
File path: core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.BufferedWriter;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.cli.ClientOpts;
+import org.apache.accumulo.core.cli.ConfigOpts;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.crypto.CryptoServiceFactory;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.accumulo.start.spi.KeywordExecutable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.google.auto.service.AutoService;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@AutoService(KeywordExecutable.class)
+@SuppressFBWarnings(value = "PATH_TRAVERSAL_OUT",
+    justification = "app is run in same security context as user providing the filename")
+public class GenerateSplits implements KeywordExecutable {
+  private static final Logger log = LoggerFactory.getLogger(GenerateSplits.class);
+
+  static class Opts extends ConfigOpts {
+    @Parameter(names = {"-a", "--auths"}, converter = ClientOpts.AuthConverter.class,
+        description = "the authorizations to use when reading the files")
+    public Authorizations auths = Authorizations.EMPTY;
+
+    @Parameter(names = {"-n", "--num"}, description = "The number of splits to generate")
+    public long numSplits = 0;
+
+    @Parameter(names = {"-ss", "--split-size"}, description = "The split size desired in bytes")
+    public long splitSize = 0;
+
+    @Parameter(names = {"-b64", "--base64encoded"}, description = "Base 64 encode the split points")
+    public boolean base64encode = false;
+
+    @Parameter(names = {"-sf", "--splits-file"}, description = "Output the splits to a file")
+    public String outputFile;
+
+    @Parameter(description = " <file|directory> { <file> ... }")
+    public List<String> files = new ArrayList<>();
+  }
+
+  @Override
+  public String keyword() {
+    return "generate-splits";
+  }
+
+  @Override
+  public String description() {
+    return "Generate split points from a set of 1 or more rfiles";
+  }
+
+  public static void main(String[] args) throws Exception {
+    new GenerateSplits().execute(args);
+  }
+
+  @Override
+  public void execute(String[] args) throws Exception {
+    Opts opts = new Opts();
+    opts.parseArgs(GenerateSplits.class.getName(), args);
+    if (opts.files.isEmpty()) {
+      throw new IllegalArgumentException("No files were given");
+    }
+
+    Configuration hadoopConf = new Configuration();
+    SiteConfiguration siteConf = opts.getSiteConfiguration();
+    boolean encode = opts.base64encode;
+
+    TreeSet<String> splits;
+    TreeSet<String> desiredSplits = new TreeSet<>();
+
+    if (opts.numSplits > 0 && opts.splitSize > 0) {
+      throw new IllegalArgumentException("Requested number of splits and split size.");
+    }
+    if (opts.numSplits == 0 && opts.splitSize == 0) {
+      throw new IllegalArgumentException("Required number of splits or split size.");
+    }
+    long numSplits = opts.numSplits;
+    long splitSize = opts.splitSize;
+
+    FileSystem fs = FileSystem.get(hadoopConf);
+    FileSystem localFs = FileSystem.getLocal(hadoopConf);
+    List<Path> filePaths = new ArrayList<>();
+    for (String file : opts.files) {
+      Path path = new Path(file);
+      if (file.contains(":")) {
+        fs = path.getFileSystem(hadoopConf);
+      } else {
+        log.warn("Attempting to find file across filesystems. Consider providing URI "
+            + "instead of path");
+        if (!fs.exists(path))
+          fs = localFs; // fall back to local
+      }
+      // get all the files in the directory
+      if (fs.getFileStatus(path).isDirectory()) {
+        // can only explode one directory
+        if (opts.files.size() > 1)
+          throw new IllegalArgumentException("Only one directory can be specified");
+        var iter = fs.listFiles(path, true);
+        while (iter.hasNext()) {
+          filePaths.add(iter.next().getPath());
+        }
+      } else {
+        filePaths.add(path);
+      }
+    }
+
+    // if no size specified look at indexed keys first
+    if (opts.splitSize == 0) {
+      splits = getIndexKeys(siteConf, hadoopConf, fs, filePaths, encode);
+      // if there weren't enough splits indexed, try again with size = 0
+      if (splits.size() < numSplits) {
+        log.info("Only found {} indexed keys but need {}. Doing a full scan on files {}",
+            splits.size(), numSplits, filePaths);
+        splits = getSplitsBySize(siteConf, hadoopConf, filePaths, fs, 0, encode);
+      }
+    } else {
+      splits = getSplitsBySize(siteConf, hadoopConf, filePaths, fs, splitSize, encode);
+    }
+
+    int numFound = splits.size();
+    // its possible we found too many indexed so take every (numFound / numSplits) split
+    if (opts.splitSize == 0 && numFound > numSplits) {
+      var iter = splits.iterator();
+      long factor = numFound / numSplits;
+      log.debug("Found {} splits but requested {} picking 1 every {}", numFound, opts.numSplits,
+          factor);
+      for (int i = 0; i < numFound; i++) {
+        String next = iter.next();
+        if (i % factor == 0 && desiredSplits.size() < numSplits) {
+          desiredSplits.add(next);
+        }
+      }

Review comment:
       For a case where numFound is 150 and numSplits is 100, 150/100 is 1, so factor is 1.   `i % 1` is always zero, so I think the first 100 splits out of the 150 will be taken.  This is probably not desired.   Thinking an approach similar to the following will yield better results.  Hopefully the following code would take splits `0,1,3,4,6,7,9,10, etc` but I am not sure its correct... however that is the intent.
   
   ```suggestion
         double targetFactor = (double)numFound / numSplits;
         log.debug("Found {} splits but requested {} picking 1 every {}", numFound, opts.numSplits,
             factor);
     
         for (int i = 0; i < numFound; i++) {
           double currFactor = (double)(i+1)/desiredSplits.size();
           String next = iter.next();
           // unsure if this should be currFactor >= targetFactor .. need to think through some edge cases
           if (currFactor > targetFactor && desiredSplits.size() < numSplits) {
             desiredSplits.add(next);
           }
         }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2368: Create GenerateSplits utility

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2368:
URL: https://github.com/apache/accumulo/pull/2368#discussion_r756283404



##########
File path: core/src/test/java/org/apache/accumulo/core/file/rfile/GenerateSplitsTest.java
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import static org.apache.accumulo.core.file.rfile.GenerateSplits.main;
+import static org.apache.accumulo.core.file.rfile.RFileTest.newColFamByteSequence;
+import static org.apache.accumulo.core.file.rfile.RFileTest.newKey;
+import static org.apache.accumulo.core.file.rfile.RFileTest.newValue;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.List;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "paths provided by test")
+public class GenerateSplitsTest {
+  private static final Logger log = LoggerFactory.getLogger(GenerateSplitsTest.class);
+
+  @ClassRule
+  public static final TemporaryFolder tempFolder =
+      new TemporaryFolder(new File(System.getProperty("user.dir") + "/target"));
+
+  private static final RFileTest.TestRFile trf = new RFileTest.TestRFile(null);
+  private static String fileName;
+
+  /**
+   * Creates a test file with 84 bytes of data and 2 Locality groups.
+   */
+  @BeforeClass
+  public static void createFile() throws IOException {
+    trf.openWriter(false);
+    trf.writer.startNewLocalityGroup("lg1", newColFamByteSequence("cf1", "cf2"));
+    trf.writer.append(newKey("r1", "cf1", "cq1", "L1", 55), newValue("foo1"));
+    trf.writer.append(newKey("r2", "cf2", "cq1", "L1", 55), newValue("foo2"));
+    trf.writer.append(newKey("r3", "cf2", "cq1", "L1", 55), newValue("foo3"));
+    trf.writer.startNewLocalityGroup("lg2", newColFamByteSequence("cf3", "cf4"));
+    trf.writer.append(newKey("r4", "cf3", "cq1", "L1", 55), newValue("foo4"));
+    trf.writer.append(newKey("r5", "cf4", "cq1", "L1", 55), newValue("foo5"));
+    trf.writer.append(newKey("r6", "cf4", "cq1", "L1", 55), newValue("foo6"));
+    trf.closeWriter();
+
+    File file = tempFolder.newFile("testGenerateSplits.rf");
+    FileOutputStream fileOutputStream = new FileOutputStream(file);
+    fileOutputStream.write(trf.baos.toByteArray());
+    fileOutputStream.flush();
+    fileOutputStream.close();
+
+    fileName = file.getAbsolutePath();
+    log.info("Wrote to file {}", fileName);
+  }
+
+  @AfterClass
+  public static void cleanUp() {
+    File file = new File(fileName);
+    if (file.delete())
+      log.info("Cleaned up test file {}", fileName);
+  }
+
+  @Test
+  public void testNum() throws Exception {
+    List<String> args = List.of(fileName, "--num", "2");
+    log.info("Invoking GenerateSplits with {}", args);
+    PrintStream oldOut = System.out;
+    try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        PrintStream newOut = new PrintStream(baos)) {
+      System.setOut(newOut);

Review comment:
       Copied from https://github.com/apache/accumulo/blob/785a3645261571f000a7adb3c7c72b07886f0587/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java#L393




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2368: Create GenerateSplits utility

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2368:
URL: https://github.com/apache/accumulo/pull/2368#discussion_r756284424



##########
File path: core/src/test/java/org/apache/accumulo/core/file/rfile/GenerateSplitsTest.java
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import static org.apache.accumulo.core.file.rfile.GenerateSplits.main;
+import static org.apache.accumulo.core.file.rfile.RFileTest.newColFamByteSequence;
+import static org.apache.accumulo.core.file.rfile.RFileTest.newKey;
+import static org.apache.accumulo.core.file.rfile.RFileTest.newValue;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.List;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "paths provided by test")
+public class GenerateSplitsTest {
+  private static final Logger log = LoggerFactory.getLogger(GenerateSplitsTest.class);
+
+  @ClassRule

Review comment:
       I can't think of one for this test since I am only reading the file.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2368: Create GenerateSplits utility

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2368:
URL: https://github.com/apache/accumulo/pull/2368#discussion_r758775371



##########
File path: core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java
##########
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.BufferedWriter;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.cli.ClientOpts;
+import org.apache.accumulo.core.cli.ConfigOpts;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.crypto.CryptoServiceFactory;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.accumulo.start.spi.KeywordExecutable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.google.auto.service.AutoService;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@AutoService(KeywordExecutable.class)
+@SuppressFBWarnings(value = "PATH_TRAVERSAL_OUT",
+    justification = "app is run in same security context as user providing the filename")
+public class GenerateSplits implements KeywordExecutable {
+  private static final Logger log = LoggerFactory.getLogger(GenerateSplits.class);
+
+  static class Opts extends ConfigOpts {
+    @Parameter(names = {"-a", "--auths"}, converter = ClientOpts.AuthConverter.class,
+        description = "the authorizations to use when reading the files")
+    public Authorizations auths = Authorizations.EMPTY;
+
+    @Parameter(names = {"-n", "--num"},
+        description = "The number of splits to generate. Cannot use with the split size option.")
+    public long numSplits = 0;
+
+    @Parameter(names = {"-ss", "--split-size"},
+        description = "The split size desired in bytes. Cannot use with num splits option.")
+    public long splitSize = 0;
+
+    @Parameter(names = {"-b64", "--base64encoded"}, description = "Base 64 encode the split points")
+    public boolean base64encode = false;
+
+    @Parameter(names = {"-sf", "--splits-file"}, description = "Output the splits to a file")
+    public String outputFile;
+
+    @Parameter(description = " <file|directory> { <file> ... }")
+    public List<String> files = new ArrayList<>();
+  }
+
+  @Override
+  public String keyword() {
+    return "generate-splits";
+  }
+
+  @Override
+  public String description() {
+    return "Generate split points from a set of 1 or more rfiles";
+  }
+
+  public static void main(String[] args) throws Exception {
+    new GenerateSplits().execute(args);
+  }
+
+  @Override
+  public void execute(String[] args) throws Exception {
+    Opts opts = new Opts();
+    opts.parseArgs(GenerateSplits.class.getName(), args);
+    if (opts.files.isEmpty()) {
+      throw new IllegalArgumentException("No files were given");
+    }
+
+    Configuration hadoopConf = new Configuration();
+    SiteConfiguration siteConf = opts.getSiteConfiguration();
+    boolean encode = opts.base64encode;
+
+    TreeSet<String> splits;
+    TreeSet<String> desiredSplits = new TreeSet<>();
+
+    if (opts.numSplits > 0 && opts.splitSize > 0) {
+      throw new IllegalArgumentException("Requested number of splits and split size.");
+    }
+    if (opts.numSplits == 0 && opts.splitSize == 0) {
+      throw new IllegalArgumentException("Required number of splits or split size.");
+    }
+    long numSplits = opts.numSplits;
+    long splitSize = opts.splitSize;
+
+    FileSystem fs = FileSystem.get(hadoopConf);
+    FileSystem localFs = FileSystem.getLocal(hadoopConf);
+    List<Path> filePaths = new ArrayList<>();
+    for (String file : opts.files) {
+      Path path = new Path(file);
+      if (file.contains(":")) {
+        fs = path.getFileSystem(hadoopConf);
+      } else {
+        log.warn("Attempting to find file across filesystems. Consider providing URI "
+            + "instead of path");
+        if (!fs.exists(path))
+          fs = localFs; // fall back to local
+      }
+      // get all the files in the directory
+      if (fs.getFileStatus(path).isDirectory()) {
+        // can only explode one directory
+        if (opts.files.size() > 1)
+          throw new IllegalArgumentException("Only one directory can be specified");
+        var iter = fs.listFiles(path, true);
+        while (iter.hasNext()) {
+          filePaths.add(iter.next().getPath());
+        }
+      } else {
+        filePaths.add(path);
+      }
+    }
+
+    // if no size specified look at indexed keys first
+    if (opts.splitSize == 0) {
+      splits = getIndexKeys(siteConf, hadoopConf, fs, filePaths, encode);
+      // if there weren't enough splits indexed, try again with size = 0
+      if (splits.size() < numSplits) {
+        log.info("Only found {} indexed keys but need {}. Doing a full scan on files {}",
+            splits.size(), numSplits, filePaths);
+        splits = getSplitsBySize(siteConf, hadoopConf, filePaths, fs, 0, encode);
+      }
+    } else {
+      splits = getSplitsBySize(siteConf, hadoopConf, filePaths, fs, splitSize, encode);
+    }
+
+    int numFound = splits.size();
+    // its possible we found too many indexed so take every (numFound / numSplits) split
+    if (opts.splitSize == 0 && numFound > numSplits) {
+      var iter = splits.iterator();
+      double targetFactor = (double) numFound / numSplits;
+      log.debug("Found {} splits but requested {} picking 1 every {}", numFound, opts.numSplits,
+          targetFactor);
+
+      for (int i = 0; i < numFound; i++) {
+        double currFactor = (double) (i + 1) / desiredSplits.size();
+        String next = iter.next();
+        // unsure if this should be currFactor >= targetFactor
+        if (currFactor > targetFactor && desiredSplits.size() < numSplits) {
+          desiredSplits.add(next);
+        }
+      }

Review comment:
       The following fixes some of the issues I mentioned.  Its modeled after the code in TabletOperations.listSplits w/o the suspicious while loop.
   
   ```suggestion
         var iter = splits.iterator();
         // This is how much each of the found rows will advances twoards a desired split point.  Add one to numSplits because if we request 9 splits, there will 10 tablets and we the 9 splits evenly spaces between the 10.
         double increment = (numSplits+1.0)/numFound;
         log.debug("Found {} splits but requested {} picking 1 every {}", numFound, opts.numSplits,
             targetFactor);
   
          // Tracks how far along we are twoards the next split.
          double total = 0;
   
         for (int i = 0; i < numFound; i++) {
           total += increment;
           String next = iter.next();
           if (total> 1 && desiredSplits.size() < numSplits) {
             desiredSplits.add(next);
             total -= 1;
           }
         }
   ```
   
   I created the following little program locally to play with the code and it seems good.
   
   ```java
   public class Test {
     public static void main(String[] args) {
       printSplits(100, 9);
       System.out.println();
       printSplits(15, 10);
       System.out.println();
       printSplits(10, 10);
     }
   
     private static void printSplits(int numFound, int numSplits) {
       // This is how much each of the found rows will advances twoards a desired split point.  Add one to numSplits because if we request 9 splits, there will 10 tablets and we the 9 splits evenly spaces between the 10.
       double increment = (numSplits+1.0)/numFound;
   
       System.out.println("increment "+increment);
       
       int added = 0;
   
       // Tracks how far along we are twoards the next split.
       double total = 0;
       
       for (int i = 0; i < numFound; i++) {
        total += increment;
       
         if (total > 1 && added < numSplits) {
           System.out.println(i+" "+added +" "+total);
           added++;
           total -= 1;
         }
       }
     }
   }
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on pull request #2368: Create GenerateSplits utility

Posted by GitBox <gi...@apache.org>.
keith-turner commented on pull request #2368:
URL: https://github.com/apache/accumulo/pull/2368#issuecomment-984006504


   > Also, we should be clear in the tool's usage/docs whether we return n-1 splits that correspond to n requested tablets, or n splits that correspond to n requested splits that result in n+1 tablets.
   
   I have always considered the terminology splits to be between tablets, so 1 split results in two tablets.   I think for tools it comes down to whether it uses the terminology #tablet or #split.  The tool docs could state what is going on either way.  Like if the tool has a numTablets parameter it could state it will output numTablets-1 splits.   If it has a numSplits parameter, it could state it will output that many splits and that numSplits+1 tablets are created when adding the splits to Accumulo.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2368: Create GenerateSplits utility

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2368:
URL: https://github.com/apache/accumulo/pull/2368#discussion_r756407251



##########
File path: core/src/test/java/org/apache/accumulo/core/file/rfile/GenerateSplitsTest.java
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import static org.apache.accumulo.core.file.rfile.GenerateSplits.main;
+import static org.apache.accumulo.core.file.rfile.RFileTest.newColFamByteSequence;
+import static org.apache.accumulo.core.file.rfile.RFileTest.newKey;
+import static org.apache.accumulo.core.file.rfile.RFileTest.newValue;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.List;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "paths provided by test")
+public class GenerateSplitsTest {
+  private static final Logger log = LoggerFactory.getLogger(GenerateSplitsTest.class);
+
+  @ClassRule
+  public static final TemporaryFolder tempFolder =
+      new TemporaryFolder(new File(System.getProperty("user.dir") + "/target"));
+
+  private static final RFileTest.TestRFile trf = new RFileTest.TestRFile(null);
+  private static String fileName;
+
+  /**
+   * Creates a test file with 84 bytes of data and 2 Locality groups.
+   */
+  @BeforeClass
+  public static void createFile() throws IOException {
+    trf.openWriter(false);
+    trf.writer.startNewLocalityGroup("lg1", newColFamByteSequence("cf1", "cf2"));
+    trf.writer.append(newKey("r1", "cf1", "cq1", "L1", 55), newValue("foo1"));
+    trf.writer.append(newKey("r2", "cf2", "cq1", "L1", 55), newValue("foo2"));
+    trf.writer.append(newKey("r3", "cf2", "cq1", "L1", 55), newValue("foo3"));
+    trf.writer.startNewLocalityGroup("lg2", newColFamByteSequence("cf3", "cf4"));
+    trf.writer.append(newKey("r4", "cf3", "cq1", "L1", 55), newValue("foo4"));
+    trf.writer.append(newKey("r5", "cf4", "cq1", "L1", 55), newValue("foo5"));
+    trf.writer.append(newKey("r6", "cf4", "cq1", "L1", 55), newValue("foo6"));
+    trf.closeWriter();
+
+    File file = tempFolder.newFile("testGenerateSplits.rf");
+    FileOutputStream fileOutputStream = new FileOutputStream(file);
+    fileOutputStream.write(trf.baos.toByteArray());
+    fileOutputStream.flush();
+    fileOutputStream.close();
+
+    fileName = file.getAbsolutePath();
+    log.info("Wrote to file {}", fileName);
+  }
+
+  @AfterClass
+  public static void cleanUp() {
+    File file = new File(fileName);
+    if (file.delete())
+      log.info("Cleaned up test file {}", fileName);

Review comment:
       Removed in 4d1fdc6




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2368: Create GenerateSplits utility

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2368:
URL: https://github.com/apache/accumulo/pull/2368#discussion_r758496857



##########
File path: core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.BufferedWriter;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.cli.ClientOpts;
+import org.apache.accumulo.core.cli.ConfigOpts;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.crypto.CryptoServiceFactory;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.accumulo.start.spi.KeywordExecutable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.google.auto.service.AutoService;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@AutoService(KeywordExecutable.class)
+@SuppressFBWarnings(value = "PATH_TRAVERSAL_OUT",
+    justification = "app is run in same security context as user providing the filename")
+public class GenerateSplits implements KeywordExecutable {
+  private static final Logger log = LoggerFactory.getLogger(GenerateSplits.class);
+
+  static class Opts extends ConfigOpts {
+    @Parameter(names = {"-a", "--auths"}, converter = ClientOpts.AuthConverter.class,
+        description = "the authorizations to use when reading the files")
+    public Authorizations auths = Authorizations.EMPTY;
+
+    @Parameter(names = {"-n", "--num"}, description = "The number of splits to generate")

Review comment:
       ```suggestion
       @Parameter(names = {"-n", "--num"}, description = "The number of splits to generate. Cannot use with the split size option.")
   ```

##########
File path: core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.BufferedWriter;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.cli.ClientOpts;
+import org.apache.accumulo.core.cli.ConfigOpts;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.crypto.CryptoServiceFactory;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.accumulo.start.spi.KeywordExecutable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.google.auto.service.AutoService;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@AutoService(KeywordExecutable.class)
+@SuppressFBWarnings(value = "PATH_TRAVERSAL_OUT",
+    justification = "app is run in same security context as user providing the filename")
+public class GenerateSplits implements KeywordExecutable {
+  private static final Logger log = LoggerFactory.getLogger(GenerateSplits.class);
+
+  static class Opts extends ConfigOpts {
+    @Parameter(names = {"-a", "--auths"}, converter = ClientOpts.AuthConverter.class,
+        description = "the authorizations to use when reading the files")
+    public Authorizations auths = Authorizations.EMPTY;
+
+    @Parameter(names = {"-n", "--num"}, description = "The number of splits to generate")
+    public long numSplits = 0;
+
+    @Parameter(names = {"-ss", "--split-size"}, description = "The split size desired in bytes")

Review comment:
       ```suggestion
       @Parameter(names = {"-ss", "--split-size"}, description = "The split size desired in bytes. Cannot use with num splits option.")
   ```

##########
File path: core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.BufferedWriter;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.cli.ClientOpts;
+import org.apache.accumulo.core.cli.ConfigOpts;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.crypto.CryptoServiceFactory;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.accumulo.start.spi.KeywordExecutable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.google.auto.service.AutoService;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@AutoService(KeywordExecutable.class)
+@SuppressFBWarnings(value = "PATH_TRAVERSAL_OUT",
+    justification = "app is run in same security context as user providing the filename")
+public class GenerateSplits implements KeywordExecutable {
+  private static final Logger log = LoggerFactory.getLogger(GenerateSplits.class);
+
+  static class Opts extends ConfigOpts {
+    @Parameter(names = {"-a", "--auths"}, converter = ClientOpts.AuthConverter.class,
+        description = "the authorizations to use when reading the files")
+    public Authorizations auths = Authorizations.EMPTY;
+
+    @Parameter(names = {"-n", "--num"}, description = "The number of splits to generate")
+    public long numSplits = 0;
+
+    @Parameter(names = {"-ss", "--split-size"}, description = "The split size desired in bytes")
+    public long splitSize = 0;
+
+    @Parameter(names = {"-b64", "--base64encoded"}, description = "Base 64 encode the split points")
+    public boolean base64encode = false;
+
+    @Parameter(names = {"-sf", "--splits-file"}, description = "Output the splits to a file")
+    public String outputFile;
+
+    @Parameter(description = " <file|directory> { <file> ... }")
+    public List<String> files = new ArrayList<>();
+  }
+
+  @Override
+  public String keyword() {
+    return "generate-splits";
+  }
+
+  @Override
+  public String description() {
+    return "Generate split points from a set of 1 or more rfiles";
+  }
+
+  public static void main(String[] args) throws Exception {
+    new GenerateSplits().execute(args);
+  }
+
+  @Override
+  public void execute(String[] args) throws Exception {
+    Opts opts = new Opts();
+    opts.parseArgs(GenerateSplits.class.getName(), args);
+    if (opts.files.isEmpty()) {
+      throw new IllegalArgumentException("No files were given");
+    }
+
+    Configuration hadoopConf = new Configuration();
+    SiteConfiguration siteConf = opts.getSiteConfiguration();
+    boolean encode = opts.base64encode;
+
+    TreeSet<String> splits;
+    TreeSet<String> desiredSplits = new TreeSet<>();
+
+    if (opts.numSplits > 0 && opts.splitSize > 0) {
+      throw new IllegalArgumentException("Requested number of splits and split size.");
+    }
+    if (opts.numSplits == 0 && opts.splitSize == 0) {
+      throw new IllegalArgumentException("Required number of splits or split size.");
+    }
+    long numSplits = opts.numSplits;
+    long splitSize = opts.splitSize;
+
+    FileSystem fs = FileSystem.get(hadoopConf);
+    FileSystem localFs = FileSystem.getLocal(hadoopConf);
+    List<Path> filePaths = new ArrayList<>();
+    for (String file : opts.files) {
+      Path path = new Path(file);
+      if (file.contains(":")) {
+        fs = path.getFileSystem(hadoopConf);
+      } else {
+        log.warn("Attempting to find file across filesystems. Consider providing URI "
+            + "instead of path");
+        if (!fs.exists(path))
+          fs = localFs; // fall back to local
+      }
+      // get all the files in the directory
+      if (fs.getFileStatus(path).isDirectory()) {
+        // can only explode one directory
+        if (opts.files.size() > 1)
+          throw new IllegalArgumentException("Only one directory can be specified");
+        var iter = fs.listFiles(path, true);
+        while (iter.hasNext()) {
+          filePaths.add(iter.next().getPath());
+        }
+      } else {
+        filePaths.add(path);
+      }
+    }
+
+    // if no size specified look at indexed keys first
+    if (opts.splitSize == 0) {
+      splits = getIndexKeys(siteConf, hadoopConf, fs, filePaths, encode);
+      // if there weren't enough splits indexed, try again with size = 0
+      if (splits.size() < numSplits) {
+        log.info("Only found {} indexed keys but need {}. Doing a full scan on files {}",
+            splits.size(), numSplits, filePaths);
+        splits = getSplitsBySize(siteConf, hadoopConf, filePaths, fs, 0, encode);
+      }
+    } else {
+      splits = getSplitsBySize(siteConf, hadoopConf, filePaths, fs, splitSize, encode);
+    }
+
+    int numFound = splits.size();
+    // its possible we found too many indexed so take every (numFound / numSplits) split
+    if (opts.splitSize == 0 && numFound > numSplits) {
+      var iter = splits.iterator();
+      long factor = numFound / numSplits;
+      log.debug("Found {} splits but requested {} picking 1 every {}", numFound, opts.numSplits,
+          factor);
+      for (int i = 0; i < numFound; i++) {
+        String next = iter.next();
+        if (i % factor == 0 && desiredSplits.size() < numSplits) {
+          desiredSplits.add(next);
+        }
+      }

Review comment:
       For a case where numFound is 150 and numSplits is 100, 150/100 is 1, so factor is 1.   `i % 1` is always zero, so I think the first 100 splits out of the 150 will be taken.  This probably not desired.   Thinking an approach similar to the following will yield better results.  Hopefully the following code would take splits `0,1,3,4,6,7,9,10, etc` but I am not sure its correct... however that is the intent.
   
   ```suggestion
         double targetFactor = (double)numFound / numSplits;
         log.debug("Found {} splits but requested {} picking 1 every {}", numFound, opts.numSplits,
             factor);
     
         for (int i = 0; i < numFound; i++) {
           double currFactor = (double)(i+1)/desiredSplits.size();
           String next = iter.next();
           // unsure if this should be currFactor >= targetFactor .. need to think through some edge cases
           if (currFactor > targetFactor && desiredSplits.size() < numSplits) {
             desiredSplits.add(next);
           }
         }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2368: Create GenerateSplits utility

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2368:
URL: https://github.com/apache/accumulo/pull/2368#discussion_r756280283



##########
File path: core/src/test/java/org/apache/accumulo/core/file/rfile/GenerateSplitsTest.java
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import static org.apache.accumulo.core.file.rfile.GenerateSplits.main;
+import static org.apache.accumulo.core.file.rfile.RFileTest.newColFamByteSequence;
+import static org.apache.accumulo.core.file.rfile.RFileTest.newKey;
+import static org.apache.accumulo.core.file.rfile.RFileTest.newValue;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.List;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "paths provided by test")
+public class GenerateSplitsTest {
+  private static final Logger log = LoggerFactory.getLogger(GenerateSplitsTest.class);
+
+  @ClassRule
+  public static final TemporaryFolder tempFolder =
+      new TemporaryFolder(new File(System.getProperty("user.dir") + "/target"));
+
+  private static final RFileTest.TestRFile trf = new RFileTest.TestRFile(null);
+  private static String fileName;
+
+  /**
+   * Creates a test file with 84 bytes of data and 2 Locality groups.
+   */
+  @BeforeClass
+  public static void createFile() throws IOException {
+    trf.openWriter(false);
+    trf.writer.startNewLocalityGroup("lg1", newColFamByteSequence("cf1", "cf2"));
+    trf.writer.append(newKey("r1", "cf1", "cq1", "L1", 55), newValue("foo1"));
+    trf.writer.append(newKey("r2", "cf2", "cq1", "L1", 55), newValue("foo2"));
+    trf.writer.append(newKey("r3", "cf2", "cq1", "L1", 55), newValue("foo3"));
+    trf.writer.startNewLocalityGroup("lg2", newColFamByteSequence("cf3", "cf4"));
+    trf.writer.append(newKey("r4", "cf3", "cq1", "L1", 55), newValue("foo4"));
+    trf.writer.append(newKey("r5", "cf4", "cq1", "L1", 55), newValue("foo5"));
+    trf.writer.append(newKey("r6", "cf4", "cq1", "L1", 55), newValue("foo6"));
+    trf.closeWriter();
+
+    File file = tempFolder.newFile("testGenerateSplits.rf");
+    FileOutputStream fileOutputStream = new FileOutputStream(file);
+    fileOutputStream.write(trf.baos.toByteArray());
+    fileOutputStream.flush();
+    fileOutputStream.close();
+
+    fileName = file.getAbsolutePath();
+    log.info("Wrote to file {}", fileName);
+  }
+
+  @AfterClass
+  public static void cleanUp() {
+    File file = new File(fileName);
+    if (file.delete())
+      log.info("Cleaned up test file {}", fileName);

Review comment:
       I had added this when i created the test initially because I was unsure but I think you are right. I thought I saw another Test doing this as well but can't find it now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2368: Create GenerateSplits utility

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2368:
URL: https://github.com/apache/accumulo/pull/2368#discussion_r765924253



##########
File path: core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java
##########
@@ -201,7 +201,9 @@ public void execute(String[] args) throws Exception {
       itemsSketch.update(row);
       iterator.next();
     }
-    return itemsSketch.getQuantiles(numSplits);
+    Text[] items = itemsSketch.getQuantiles(numSplits + 2);

Review comment:
       > I think checking the different outputs of the Datasketches API is a bit out of scope for this utility.
   
   I was not proposing checking different outputs of the API at this point.  Was more thinking it would be good to fully understand the functionality behind the API and what the edge cases may be. The `.getRetainedItems()` API seems to indicate that that there may be an upper bound to how many splits the API would track, but I am not sure about this.  The class seems to buffer information in memory, but I don't have a good sense of what if any limits there are on how much it will buffer.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] ctubbsii commented on pull request #2368: Create GenerateSplits utility

Posted by GitBox <gi...@apache.org>.
ctubbsii commented on pull request #2368:
URL: https://github.com/apache/accumulo/pull/2368#issuecomment-983999498


   Also, we should be clear in the tool's usage/docs whether we return splits `n-1` that correspond to `n` requested tablets, or `n` splits that correspond to `n` requested splits that result in `n+1` tablets.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2368: Create GenerateSplits utility

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2368:
URL: https://github.com/apache/accumulo/pull/2368#discussion_r758758742



##########
File path: core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java
##########
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.BufferedWriter;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.cli.ClientOpts;
+import org.apache.accumulo.core.cli.ConfigOpts;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.crypto.CryptoServiceFactory;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.accumulo.start.spi.KeywordExecutable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.google.auto.service.AutoService;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@AutoService(KeywordExecutable.class)
+@SuppressFBWarnings(value = "PATH_TRAVERSAL_OUT",
+    justification = "app is run in same security context as user providing the filename")
+public class GenerateSplits implements KeywordExecutable {
+  private static final Logger log = LoggerFactory.getLogger(GenerateSplits.class);
+
+  static class Opts extends ConfigOpts {
+    @Parameter(names = {"-a", "--auths"}, converter = ClientOpts.AuthConverter.class,
+        description = "the authorizations to use when reading the files")
+    public Authorizations auths = Authorizations.EMPTY;
+
+    @Parameter(names = {"-n", "--num"},
+        description = "The number of splits to generate. Cannot use with the split size option.")
+    public long numSplits = 0;
+
+    @Parameter(names = {"-ss", "--split-size"},
+        description = "The split size desired in bytes. Cannot use with num splits option.")
+    public long splitSize = 0;
+
+    @Parameter(names = {"-b64", "--base64encoded"}, description = "Base 64 encode the split points")
+    public boolean base64encode = false;
+
+    @Parameter(names = {"-sf", "--splits-file"}, description = "Output the splits to a file")
+    public String outputFile;
+
+    @Parameter(description = " <file|directory> { <file> ... }")
+    public List<String> files = new ArrayList<>();
+  }
+
+  @Override
+  public String keyword() {
+    return "generate-splits";
+  }
+
+  @Override
+  public String description() {
+    return "Generate split points from a set of 1 or more rfiles";
+  }
+
+  public static void main(String[] args) throws Exception {
+    new GenerateSplits().execute(args);
+  }
+
+  @Override
+  public void execute(String[] args) throws Exception {
+    Opts opts = new Opts();
+    opts.parseArgs(GenerateSplits.class.getName(), args);
+    if (opts.files.isEmpty()) {
+      throw new IllegalArgumentException("No files were given");
+    }
+
+    Configuration hadoopConf = new Configuration();
+    SiteConfiguration siteConf = opts.getSiteConfiguration();
+    boolean encode = opts.base64encode;
+
+    TreeSet<String> splits;
+    TreeSet<String> desiredSplits = new TreeSet<>();
+
+    if (opts.numSplits > 0 && opts.splitSize > 0) {
+      throw new IllegalArgumentException("Requested number of splits and split size.");
+    }
+    if (opts.numSplits == 0 && opts.splitSize == 0) {
+      throw new IllegalArgumentException("Required number of splits or split size.");
+    }
+    long numSplits = opts.numSplits;
+    long splitSize = opts.splitSize;
+
+    FileSystem fs = FileSystem.get(hadoopConf);
+    FileSystem localFs = FileSystem.getLocal(hadoopConf);
+    List<Path> filePaths = new ArrayList<>();
+    for (String file : opts.files) {
+      Path path = new Path(file);
+      if (file.contains(":")) {
+        fs = path.getFileSystem(hadoopConf);
+      } else {
+        log.warn("Attempting to find file across filesystems. Consider providing URI "
+            + "instead of path");
+        if (!fs.exists(path))
+          fs = localFs; // fall back to local
+      }
+      // get all the files in the directory
+      if (fs.getFileStatus(path).isDirectory()) {
+        // can only explode one directory
+        if (opts.files.size() > 1)
+          throw new IllegalArgumentException("Only one directory can be specified");
+        var iter = fs.listFiles(path, true);
+        while (iter.hasNext()) {
+          filePaths.add(iter.next().getPath());
+        }
+      } else {
+        filePaths.add(path);
+      }
+    }
+
+    // if no size specified look at indexed keys first
+    if (opts.splitSize == 0) {
+      splits = getIndexKeys(siteConf, hadoopConf, fs, filePaths, encode);
+      // if there weren't enough splits indexed, try again with size = 0
+      if (splits.size() < numSplits) {
+        log.info("Only found {} indexed keys but need {}. Doing a full scan on files {}",
+            splits.size(), numSplits, filePaths);
+        splits = getSplitsBySize(siteConf, hadoopConf, filePaths, fs, 0, encode);
+      }
+    } else {
+      splits = getSplitsBySize(siteConf, hadoopConf, filePaths, fs, splitSize, encode);
+    }
+
+    int numFound = splits.size();
+    // its possible we found too many indexed so take every (numFound / numSplits) split
+    if (opts.splitSize == 0 && numFound > numSplits) {
+      var iter = splits.iterator();
+      double targetFactor = (double) numFound / numSplits;
+      log.debug("Found {} splits but requested {} picking 1 every {}", numFound, opts.numSplits,
+          targetFactor);
+
+      for (int i = 0; i < numFound; i++) {
+        double currFactor = (double) (i + 1) / desiredSplits.size();
+        String next = iter.next();
+        // unsure if this should be currFactor >= targetFactor
+        if (currFactor > targetFactor && desiredSplits.size() < numSplits) {

Review comment:
       This code I suggested is not correct, I think it always add the first split even though that is not always desired.  Like if I have 1000 splits and I want 10 split points from those then we would not want to take the 1st one that comes along.  Also I think targetFactor is a bit off. I am playing around with the code locally.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2368: Create GenerateSplits utility

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2368:
URL: https://github.com/apache/accumulo/pull/2368#discussion_r758567995



##########
File path: core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.BufferedWriter;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.cli.ClientOpts;
+import org.apache.accumulo.core.cli.ConfigOpts;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.crypto.CryptoServiceFactory;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.accumulo.start.spi.KeywordExecutable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.google.auto.service.AutoService;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@AutoService(KeywordExecutable.class)
+@SuppressFBWarnings(value = "PATH_TRAVERSAL_OUT",
+    justification = "app is run in same security context as user providing the filename")
+public class GenerateSplits implements KeywordExecutable {
+  private static final Logger log = LoggerFactory.getLogger(GenerateSplits.class);
+
+  static class Opts extends ConfigOpts {
+    @Parameter(names = {"-a", "--auths"}, converter = ClientOpts.AuthConverter.class,
+        description = "the authorizations to use when reading the files")
+    public Authorizations auths = Authorizations.EMPTY;
+
+    @Parameter(names = {"-n", "--num"}, description = "The number of splits to generate")
+    public long numSplits = 0;
+
+    @Parameter(names = {"-ss", "--split-size"}, description = "The split size desired in bytes")
+    public long splitSize = 0;
+
+    @Parameter(names = {"-b64", "--base64encoded"}, description = "Base 64 encode the split points")
+    public boolean base64encode = false;
+
+    @Parameter(names = {"-sf", "--splits-file"}, description = "Output the splits to a file")
+    public String outputFile;
+
+    @Parameter(description = " <file|directory> { <file> ... }")
+    public List<String> files = new ArrayList<>();
+  }
+
+  @Override
+  public String keyword() {
+    return "generate-splits";
+  }
+
+  @Override
+  public String description() {
+    return "Generate split points from a set of 1 or more rfiles";
+  }
+
+  public static void main(String[] args) throws Exception {
+    new GenerateSplits().execute(args);
+  }
+
+  @Override
+  public void execute(String[] args) throws Exception {
+    Opts opts = new Opts();
+    opts.parseArgs(GenerateSplits.class.getName(), args);
+    if (opts.files.isEmpty()) {
+      throw new IllegalArgumentException("No files were given");
+    }
+
+    Configuration hadoopConf = new Configuration();
+    SiteConfiguration siteConf = opts.getSiteConfiguration();
+    boolean encode = opts.base64encode;
+
+    TreeSet<String> splits;
+    TreeSet<String> desiredSplits = new TreeSet<>();
+
+    if (opts.numSplits > 0 && opts.splitSize > 0) {
+      throw new IllegalArgumentException("Requested number of splits and split size.");
+    }
+    if (opts.numSplits == 0 && opts.splitSize == 0) {
+      throw new IllegalArgumentException("Required number of splits or split size.");
+    }
+    long numSplits = opts.numSplits;
+    long splitSize = opts.splitSize;
+
+    FileSystem fs = FileSystem.get(hadoopConf);
+    FileSystem localFs = FileSystem.getLocal(hadoopConf);
+    List<Path> filePaths = new ArrayList<>();
+    for (String file : opts.files) {
+      Path path = new Path(file);
+      if (file.contains(":")) {
+        fs = path.getFileSystem(hadoopConf);
+      } else {
+        log.warn("Attempting to find file across filesystems. Consider providing URI "
+            + "instead of path");
+        if (!fs.exists(path))
+          fs = localFs; // fall back to local
+      }
+      // get all the files in the directory
+      if (fs.getFileStatus(path).isDirectory()) {
+        // can only explode one directory
+        if (opts.files.size() > 1)
+          throw new IllegalArgumentException("Only one directory can be specified");
+        var iter = fs.listFiles(path, true);
+        while (iter.hasNext()) {
+          filePaths.add(iter.next().getPath());
+        }
+      } else {
+        filePaths.add(path);
+      }
+    }
+
+    // if no size specified look at indexed keys first
+    if (opts.splitSize == 0) {
+      splits = getIndexKeys(siteConf, hadoopConf, fs, filePaths, encode);
+      // if there weren't enough splits indexed, try again with size = 0
+      if (splits.size() < numSplits) {
+        log.info("Only found {} indexed keys but need {}. Doing a full scan on files {}",
+            splits.size(), numSplits, filePaths);
+        splits = getSplitsBySize(siteConf, hadoopConf, filePaths, fs, 0, encode);
+      }
+    } else {
+      splits = getSplitsBySize(siteConf, hadoopConf, filePaths, fs, splitSize, encode);
+    }
+
+    int numFound = splits.size();
+    // its possible we found too many indexed so take every (numFound / numSplits) split
+    if (opts.splitSize == 0 && numFound > numSplits) {
+      var iter = splits.iterator();
+      long factor = numFound / numSplits;
+      log.debug("Found {} splits but requested {} picking 1 every {}", numFound, opts.numSplits,
+          factor);
+      for (int i = 0; i < numFound; i++) {
+        String next = iter.next();
+        if (i % factor == 0 && desiredSplits.size() < numSplits) {
+          desiredSplits.add(next);
+        }
+      }

Review comment:
       For a case where numFound is 150 and numSplits is 100, 150/100 is 1, so factor is 1.   `i % 1` is always zero, so I think the first 100 splits out of the 150 will be taken.  This is probably not desired.   Thinking an approach similar to the following will yield better results.  Hopefully the following code would take splits `0,1,3,4,6,7,9,10, etc` but I am not sure the code is correct... however that is the intent.
   
   ```suggestion
         double targetFactor = (double)numFound / numSplits;
         log.debug("Found {} splits but requested {} picking 1 every {}", numFound, opts.numSplits,
             factor);
     
         for (int i = 0; i < numFound; i++) {
           double currFactor = (double)(i+1)/desiredSplits.size();
           String next = iter.next();
           // unsure if this should be currFactor >= targetFactor .. need to think through some edge cases
           if (currFactor > targetFactor && desiredSplits.size() < numSplits) {
             desiredSplits.add(next);
           }
         }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2368: Create GenerateSplits utility

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2368:
URL: https://github.com/apache/accumulo/pull/2368#discussion_r758581278



##########
File path: core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.BufferedWriter;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.cli.ClientOpts;
+import org.apache.accumulo.core.cli.ConfigOpts;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.crypto.CryptoServiceFactory;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.accumulo.start.spi.KeywordExecutable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.google.auto.service.AutoService;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@AutoService(KeywordExecutable.class)
+@SuppressFBWarnings(value = "PATH_TRAVERSAL_OUT",
+    justification = "app is run in same security context as user providing the filename")
+public class GenerateSplits implements KeywordExecutable {
+  private static final Logger log = LoggerFactory.getLogger(GenerateSplits.class);
+
+  static class Opts extends ConfigOpts {
+    @Parameter(names = {"-a", "--auths"}, converter = ClientOpts.AuthConverter.class,
+        description = "the authorizations to use when reading the files")
+    public Authorizations auths = Authorizations.EMPTY;
+
+    @Parameter(names = {"-n", "--num"}, description = "The number of splits to generate")
+    public long numSplits = 0;
+
+    @Parameter(names = {"-ss", "--split-size"}, description = "The split size desired in bytes")
+    public long splitSize = 0;
+
+    @Parameter(names = {"-b64", "--base64encoded"}, description = "Base 64 encode the split points")
+    public boolean base64encode = false;
+
+    @Parameter(names = {"-sf", "--splits-file"}, description = "Output the splits to a file")
+    public String outputFile;
+
+    @Parameter(description = " <file|directory> { <file> ... }")
+    public List<String> files = new ArrayList<>();
+  }
+
+  @Override
+  public String keyword() {
+    return "generate-splits";
+  }
+
+  @Override
+  public String description() {
+    return "Generate split points from a set of 1 or more rfiles";
+  }
+
+  public static void main(String[] args) throws Exception {
+    new GenerateSplits().execute(args);
+  }
+
+  @Override
+  public void execute(String[] args) throws Exception {
+    Opts opts = new Opts();
+    opts.parseArgs(GenerateSplits.class.getName(), args);
+    if (opts.files.isEmpty()) {
+      throw new IllegalArgumentException("No files were given");
+    }
+
+    Configuration hadoopConf = new Configuration();
+    SiteConfiguration siteConf = opts.getSiteConfiguration();
+    boolean encode = opts.base64encode;
+
+    TreeSet<String> splits;
+    TreeSet<String> desiredSplits = new TreeSet<>();
+
+    if (opts.numSplits > 0 && opts.splitSize > 0) {
+      throw new IllegalArgumentException("Requested number of splits and split size.");
+    }
+    if (opts.numSplits == 0 && opts.splitSize == 0) {
+      throw new IllegalArgumentException("Required number of splits or split size.");
+    }
+    long numSplits = opts.numSplits;
+    long splitSize = opts.splitSize;
+
+    FileSystem fs = FileSystem.get(hadoopConf);
+    FileSystem localFs = FileSystem.getLocal(hadoopConf);
+    List<Path> filePaths = new ArrayList<>();
+    for (String file : opts.files) {
+      Path path = new Path(file);
+      if (file.contains(":")) {
+        fs = path.getFileSystem(hadoopConf);
+      } else {
+        log.warn("Attempting to find file across filesystems. Consider providing URI "
+            + "instead of path");
+        if (!fs.exists(path))
+          fs = localFs; // fall back to local
+      }
+      // get all the files in the directory
+      if (fs.getFileStatus(path).isDirectory()) {
+        // can only explode one directory
+        if (opts.files.size() > 1)
+          throw new IllegalArgumentException("Only one directory can be specified");
+        var iter = fs.listFiles(path, true);
+        while (iter.hasNext()) {
+          filePaths.add(iter.next().getPath());
+        }
+      } else {
+        filePaths.add(path);
+      }
+    }
+
+    // if no size specified look at indexed keys first
+    if (opts.splitSize == 0) {
+      splits = getIndexKeys(siteConf, hadoopConf, fs, filePaths, encode);
+      // if there weren't enough splits indexed, try again with size = 0
+      if (splits.size() < numSplits) {
+        log.info("Only found {} indexed keys but need {}. Doing a full scan on files {}",
+            splits.size(), numSplits, filePaths);
+        splits = getSplitsBySize(siteConf, hadoopConf, filePaths, fs, 0, encode);
+      }
+    } else {
+      splits = getSplitsBySize(siteConf, hadoopConf, filePaths, fs, splitSize, encode);
+    }
+
+    int numFound = splits.size();
+    // its possible we found too many indexed so take every (numFound / numSplits) split
+    if (opts.splitSize == 0 && numFound > numSplits) {
+      var iter = splits.iterator();
+      long factor = numFound / numSplits;
+      log.debug("Found {} splits but requested {} picking 1 every {}", numFound, opts.numSplits,
+          factor);
+      for (int i = 0; i < numFound; i++) {
+        String next = iter.next();
+        if (i % factor == 0 && desiredSplits.size() < numSplits) {
+          desiredSplits.add(next);
+        }
+      }

Review comment:
       I think the code I just linked has bug, will open an issue.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2368: Create GenerateSplits utility

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2368:
URL: https://github.com/apache/accumulo/pull/2368#discussion_r758775371



##########
File path: core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java
##########
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.BufferedWriter;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.cli.ClientOpts;
+import org.apache.accumulo.core.cli.ConfigOpts;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.crypto.CryptoServiceFactory;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.accumulo.start.spi.KeywordExecutable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.google.auto.service.AutoService;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@AutoService(KeywordExecutable.class)
+@SuppressFBWarnings(value = "PATH_TRAVERSAL_OUT",
+    justification = "app is run in same security context as user providing the filename")
+public class GenerateSplits implements KeywordExecutable {
+  private static final Logger log = LoggerFactory.getLogger(GenerateSplits.class);
+
+  static class Opts extends ConfigOpts {
+    @Parameter(names = {"-a", "--auths"}, converter = ClientOpts.AuthConverter.class,
+        description = "the authorizations to use when reading the files")
+    public Authorizations auths = Authorizations.EMPTY;
+
+    @Parameter(names = {"-n", "--num"},
+        description = "The number of splits to generate. Cannot use with the split size option.")
+    public long numSplits = 0;
+
+    @Parameter(names = {"-ss", "--split-size"},
+        description = "The split size desired in bytes. Cannot use with num splits option.")
+    public long splitSize = 0;
+
+    @Parameter(names = {"-b64", "--base64encoded"}, description = "Base 64 encode the split points")
+    public boolean base64encode = false;
+
+    @Parameter(names = {"-sf", "--splits-file"}, description = "Output the splits to a file")
+    public String outputFile;
+
+    @Parameter(description = " <file|directory> { <file> ... }")
+    public List<String> files = new ArrayList<>();
+  }
+
+  @Override
+  public String keyword() {
+    return "generate-splits";
+  }
+
+  @Override
+  public String description() {
+    return "Generate split points from a set of 1 or more rfiles";
+  }
+
+  public static void main(String[] args) throws Exception {
+    new GenerateSplits().execute(args);
+  }
+
+  @Override
+  public void execute(String[] args) throws Exception {
+    Opts opts = new Opts();
+    opts.parseArgs(GenerateSplits.class.getName(), args);
+    if (opts.files.isEmpty()) {
+      throw new IllegalArgumentException("No files were given");
+    }
+
+    Configuration hadoopConf = new Configuration();
+    SiteConfiguration siteConf = opts.getSiteConfiguration();
+    boolean encode = opts.base64encode;
+
+    TreeSet<String> splits;
+    TreeSet<String> desiredSplits = new TreeSet<>();
+
+    if (opts.numSplits > 0 && opts.splitSize > 0) {
+      throw new IllegalArgumentException("Requested number of splits and split size.");
+    }
+    if (opts.numSplits == 0 && opts.splitSize == 0) {
+      throw new IllegalArgumentException("Required number of splits or split size.");
+    }
+    long numSplits = opts.numSplits;
+    long splitSize = opts.splitSize;
+
+    FileSystem fs = FileSystem.get(hadoopConf);
+    FileSystem localFs = FileSystem.getLocal(hadoopConf);
+    List<Path> filePaths = new ArrayList<>();
+    for (String file : opts.files) {
+      Path path = new Path(file);
+      if (file.contains(":")) {
+        fs = path.getFileSystem(hadoopConf);
+      } else {
+        log.warn("Attempting to find file across filesystems. Consider providing URI "
+            + "instead of path");
+        if (!fs.exists(path))
+          fs = localFs; // fall back to local
+      }
+      // get all the files in the directory
+      if (fs.getFileStatus(path).isDirectory()) {
+        // can only explode one directory
+        if (opts.files.size() > 1)
+          throw new IllegalArgumentException("Only one directory can be specified");
+        var iter = fs.listFiles(path, true);
+        while (iter.hasNext()) {
+          filePaths.add(iter.next().getPath());
+        }
+      } else {
+        filePaths.add(path);
+      }
+    }
+
+    // if no size specified look at indexed keys first
+    if (opts.splitSize == 0) {
+      splits = getIndexKeys(siteConf, hadoopConf, fs, filePaths, encode);
+      // if there weren't enough splits indexed, try again with size = 0
+      if (splits.size() < numSplits) {
+        log.info("Only found {} indexed keys but need {}. Doing a full scan on files {}",
+            splits.size(), numSplits, filePaths);
+        splits = getSplitsBySize(siteConf, hadoopConf, filePaths, fs, 0, encode);
+      }
+    } else {
+      splits = getSplitsBySize(siteConf, hadoopConf, filePaths, fs, splitSize, encode);
+    }
+
+    int numFound = splits.size();
+    // its possible we found too many indexed so take every (numFound / numSplits) split
+    if (opts.splitSize == 0 && numFound > numSplits) {
+      var iter = splits.iterator();
+      double targetFactor = (double) numFound / numSplits;
+      log.debug("Found {} splits but requested {} picking 1 every {}", numFound, opts.numSplits,
+          targetFactor);
+
+      for (int i = 0; i < numFound; i++) {
+        double currFactor = (double) (i + 1) / desiredSplits.size();
+        String next = iter.next();
+        // unsure if this should be currFactor >= targetFactor
+        if (currFactor > targetFactor && desiredSplits.size() < numSplits) {
+          desiredSplits.add(next);
+        }
+      }

Review comment:
       The following fixes some of the issues I mentioned.  Its loosely modeled after the code in TabletOperations.listSplits w/o the suspicious while loop.
   
   ```suggestion
         var iter = splits.iterator();
         // This is how much each of the found rows will advances twoards a desired split point.  Add one to numSplits because if we request 9 splits, there will 10 tablets and we want the 9 splits evenly spaced between the 10 tablets.
         double increment = (numSplits+1.0)/numFound;
         log.debug("Found {} splits but requested {} picking 1 every {}", numFound, opts.numSplits,
             targetFactor);
   
          // Tracks how far along we are twoards the next split.
          double total = 0;
   
         for (int i = 0; i < numFound; i++) {
           total += increment;
           String next = iter.next();
           if (total> 1 && desiredSplits.size() < numSplits) {
             desiredSplits.add(next);
             total -= 1;
           }
         }
   ```
   
   I created the following little program locally to play with the code and it seems good.
   
   ```java
   public class Test {
     public static void main(String[] args) {
       printSplits(100, 9);
       System.out.println();
       printSplits(15, 10);
       System.out.println();
       printSplits(10, 10);
     }
   
     private static void printSplits(int numFound, int numSplits) {
       double increment = (numSplits+1.0)/numFound;
   
       System.out.println("increment "+increment);
       
       int added = 0;
   
       double total = 0;
       
       for (int i = 0; i < numFound; i++) {
        total += increment;
       
         if (total > 1 && added < numSplits) {
           System.out.println(i+" "+added +" "+total);
           added++;
           total -= 1;
         }
       }
     }
   }
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2368: Create GenerateSplits utility

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2368:
URL: https://github.com/apache/accumulo/pull/2368#discussion_r760564817



##########
File path: core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java
##########
@@ -201,7 +201,9 @@ public void execute(String[] args) throws Exception {
       itemsSketch.update(row);
       iterator.next();
     }
-    return itemsSketch.getQuantiles(numSplits);
+    Text[] items = itemsSketch.getQuantiles(numSplits + 2);

Review comment:
       ItemSketch has a method named getRetainedItems(), I wonder if this is the max that could be requested for getQuantiles()?  If so, what should be done if numSplits+2>itemsSketch.getRetainedItems()




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2368: Create GenerateSplits utility

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2368:
URL: https://github.com/apache/accumulo/pull/2368#discussion_r759335994



##########
File path: core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java
##########
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.BufferedWriter;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.cli.ClientOpts;
+import org.apache.accumulo.core.cli.ConfigOpts;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.crypto.CryptoServiceFactory;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.accumulo.start.spi.KeywordExecutable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.google.auto.service.AutoService;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@AutoService(KeywordExecutable.class)
+@SuppressFBWarnings(value = "PATH_TRAVERSAL_OUT",
+    justification = "app is run in same security context as user providing the filename")
+public class GenerateSplits implements KeywordExecutable {
+  private static final Logger log = LoggerFactory.getLogger(GenerateSplits.class);
+
+  static class Opts extends ConfigOpts {
+    @Parameter(names = {"-a", "--auths"}, converter = ClientOpts.AuthConverter.class,
+        description = "the authorizations to use when reading the files")
+    public Authorizations auths = Authorizations.EMPTY;
+
+    @Parameter(names = {"-n", "--num"},
+        description = "The number of splits to generate. Cannot use with the split size option.")
+    public long numSplits = 0;
+
+    @Parameter(names = {"-ss", "--split-size"},
+        description = "The split size desired in bytes. Cannot use with num splits option.")
+    public long splitSize = 0;
+
+    @Parameter(names = {"-b64", "--base64encoded"}, description = "Base 64 encode the split points")
+    public boolean base64encode = false;
+
+    @Parameter(names = {"-sf", "--splits-file"}, description = "Output the splits to a file")
+    public String outputFile;
+
+    @Parameter(description = " <file|directory> { <file> ... }")
+    public List<String> files = new ArrayList<>();
+  }
+
+  @Override
+  public String keyword() {
+    return "generate-splits";
+  }
+
+  @Override
+  public String description() {
+    return "Generate split points from a set of 1 or more rfiles";
+  }
+
+  public static void main(String[] args) throws Exception {
+    new GenerateSplits().execute(args);
+  }
+
+  @Override
+  public void execute(String[] args) throws Exception {
+    Opts opts = new Opts();
+    opts.parseArgs(GenerateSplits.class.getName(), args);
+    if (opts.files.isEmpty()) {
+      throw new IllegalArgumentException("No files were given");
+    }
+
+    Configuration hadoopConf = new Configuration();
+    SiteConfiguration siteConf = opts.getSiteConfiguration();
+    boolean encode = opts.base64encode;
+
+    TreeSet<String> splits;
+    TreeSet<String> desiredSplits = new TreeSet<>();
+
+    if (opts.numSplits > 0 && opts.splitSize > 0) {
+      throw new IllegalArgumentException("Requested number of splits and split size.");
+    }
+    if (opts.numSplits == 0 && opts.splitSize == 0) {
+      throw new IllegalArgumentException("Required number of splits or split size.");
+    }
+    long numSplits = opts.numSplits;
+    long splitSize = opts.splitSize;
+
+    FileSystem fs = FileSystem.get(hadoopConf);
+    FileSystem localFs = FileSystem.getLocal(hadoopConf);
+    List<Path> filePaths = new ArrayList<>();
+    for (String file : opts.files) {
+      Path path = new Path(file);
+      if (file.contains(":")) {
+        fs = path.getFileSystem(hadoopConf);
+      } else {
+        log.warn("Attempting to find file across filesystems. Consider providing URI "
+            + "instead of path");
+        if (!fs.exists(path))
+          fs = localFs; // fall back to local
+      }
+      // get all the files in the directory
+      if (fs.getFileStatus(path).isDirectory()) {
+        // can only explode one directory
+        if (opts.files.size() > 1)
+          throw new IllegalArgumentException("Only one directory can be specified");
+        var iter = fs.listFiles(path, true);
+        while (iter.hasNext()) {
+          filePaths.add(iter.next().getPath());
+        }
+      } else {
+        filePaths.add(path);
+      }
+    }
+
+    // if no size specified look at indexed keys first
+    if (opts.splitSize == 0) {
+      splits = getIndexKeys(siteConf, hadoopConf, fs, filePaths, encode);
+      // if there weren't enough splits indexed, try again with size = 0
+      if (splits.size() < numSplits) {
+        log.info("Only found {} indexed keys but need {}. Doing a full scan on files {}",
+            splits.size(), numSplits, filePaths);
+        splits = getSplitsBySize(siteConf, hadoopConf, filePaths, fs, 0, encode);
+      }
+    } else {
+      splits = getSplitsBySize(siteConf, hadoopConf, filePaths, fs, splitSize, encode);
+    }
+
+    int numFound = splits.size();
+    // its possible we found too many indexed so take every (numFound / numSplits) split
+    if (opts.splitSize == 0 && numFound > numSplits) {
+      var iter = splits.iterator();
+      double targetFactor = (double) numFound / numSplits;
+      log.debug("Found {} splits but requested {} picking 1 every {}", numFound, opts.numSplits,
+          targetFactor);
+
+      for (int i = 0; i < numFound; i++) {
+        double currFactor = (double) (i + 1) / desiredSplits.size();
+        String next = iter.next();
+        // unsure if this should be currFactor >= targetFactor
+        if (currFactor > targetFactor && desiredSplits.size() < numSplits) {
+          desiredSplits.add(next);
+        }
+      }

Review comment:
       > Should this be total -= increment or total = 0? This seems to add too many splits in a row.
   
   @milleruntime  the goal is if the total is 1.66 then the .66 will not be lost and can contribute towards future splits.  May be good to add a comment to the code about this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] DomGarguilo commented on a change in pull request #2368: Create GenerateSplits utility

Posted by GitBox <gi...@apache.org>.
DomGarguilo commented on a change in pull request #2368:
URL: https://github.com/apache/accumulo/pull/2368#discussion_r756350235



##########
File path: core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.BufferedWriter;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.cli.ClientOpts;
+import org.apache.accumulo.core.cli.ConfigOpts;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.crypto.CryptoServiceFactory;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.accumulo.start.spi.KeywordExecutable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.google.auto.service.AutoService;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@AutoService(KeywordExecutable.class)
+@SuppressFBWarnings(value = "PATH_TRAVERSAL_OUT",
+    justification = "app is run in same security context as user providing the filename")
+public class GenerateSplits implements KeywordExecutable {
+  private static final Logger log = LoggerFactory.getLogger(GenerateSplits.class);
+
+  static class Opts extends ConfigOpts {
+    @Parameter(names = {"-a", "--auths"}, converter = ClientOpts.AuthConverter.class,
+        description = "the authorizations to use when reading the files")
+    public Authorizations auths = Authorizations.EMPTY;
+
+    @Parameter(names = {"-n", "--num"}, description = "The number of splits to generate")
+    public long numSplits = 0;
+
+    @Parameter(names = {"-ss", "--split-size"}, description = "The split size desired in bytes")
+    public long splitSize = 0;
+
+    @Parameter(names = {"-b64", "--base64encoded"}, description = "Base 64 encode the split points")
+    public boolean base64encode = false;
+
+    @Parameter(names = {"-sf", "--splits-file"}, description = "Output the splits to a file")
+    public String outputFile;
+
+    @Parameter(description = " <file|directory> { <file> ... }")
+    public List<String> files = new ArrayList<>();
+  }
+
+  @Override
+  public String keyword() {
+    return "generate-splits";
+  }
+
+  @Override
+  public String description() {
+    return "Generate split points from a set of 1 or more rfiles";
+  }
+
+  public static void main(String[] args) throws Exception {
+    new GenerateSplits().execute(args);
+  }
+
+  @Override
+  public void execute(String[] args) throws Exception {
+    Opts opts = new Opts();
+    opts.parseArgs(GenerateSplits.class.getName(), args);
+    if (opts.files.isEmpty()) {
+      throw new IllegalArgumentException("No files were given");
+    }
+
+    Configuration hadoopConf = new Configuration();
+    SiteConfiguration siteConf = opts.getSiteConfiguration();
+    boolean encode = opts.base64encode;
+
+    TreeSet<String> splits;
+    TreeSet<String> desiredSplits = new TreeSet<>();
+
+    if (opts.numSplits > 0 && opts.splitSize > 0) {
+      throw new IllegalArgumentException("Requested number of splits and split size.");
+    }
+    if (opts.numSplits == 0 && opts.splitSize == 0) {
+      throw new IllegalArgumentException("Required number of splits or split size.");
+    }
+    long numSplits = opts.numSplits;
+    long splitSize = opts.splitSize;
+
+    FileSystem fs = FileSystem.get(hadoopConf);
+    FileSystem localFs = FileSystem.getLocal(hadoopConf);
+    List<Path> filePaths = new ArrayList<>();
+    for (String file : opts.files) {
+      Path path = new Path(file);
+      if (file.contains(":")) {
+        fs = path.getFileSystem(hadoopConf);
+      } else {
+        log.warn("Attempting to find file across filesystems. Consider providing URI "
+            + "instead of path");
+        if (!fs.exists(path))
+          fs = localFs; // fall back to local
+      }
+      // get all the files in the directory
+      if (fs.getFileStatus(path).isDirectory()) {
+        // can only explode one directory
+        if (opts.files.size() > 1)
+          throw new IllegalArgumentException("Only one directory can be specified");
+        var iter = fs.listFiles(path, true);
+        while (iter.hasNext()) {
+          filePaths.add(iter.next().getPath());
+        }
+      } else {
+        filePaths.add(path);
+      }
+    }
+
+    // if no size specified look at indexed keys first
+    if (opts.splitSize == 0) {
+      splits = getIndexKeys(siteConf, hadoopConf, fs, filePaths, encode);
+      // if there weren't enough splits indexed, try again with size = 0
+      if (splits.size() < numSplits) {
+        log.info("Only found {} indexed keys but need {}. Doing a full scan on files {}",
+            splits.size(), numSplits, filePaths);
+        splits = getSplitsBySize(siteConf, hadoopConf, filePaths, fs, 0, encode);
+      }
+    } else {
+      splits = getSplitsBySize(siteConf, hadoopConf, filePaths, fs, splitSize, encode);
+    }
+
+    int numFound = splits.size();
+    // its possible we found too many indexed so take every (numFound / numSplits) split
+    if (opts.splitSize == 0 && numFound > numSplits) {
+      var iter = splits.iterator();
+      long factor = numFound / numSplits;
+      log.debug("Found {} splits but requested {} picking 1 every {}", numFound, opts.numSplits,
+          factor);
+      for (int i = 0; i < numFound; i++) {
+        String next = iter.next();
+        if (i % factor == 0 && desiredSplits.size() < numSplits) {
+          desiredSplits.add(next);
+        }
+      }
+    } else if (numFound < numSplits) {
+      log.warn("Only found " + numFound + " splits");

Review comment:
       ```suggestion
         log.warn("Only found {} splits", numFound);
   ```
   Very small inconsistency




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2368: Create GenerateSplits utility

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2368:
URL: https://github.com/apache/accumulo/pull/2368#discussion_r756357090



##########
File path: core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.BufferedWriter;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.cli.ClientOpts;
+import org.apache.accumulo.core.cli.ConfigOpts;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.crypto.CryptoServiceFactory;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.accumulo.start.spi.KeywordExecutable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.google.auto.service.AutoService;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@AutoService(KeywordExecutable.class)
+@SuppressFBWarnings(value = "PATH_TRAVERSAL_OUT",
+    justification = "app is run in same security context as user providing the filename")
+public class GenerateSplits implements KeywordExecutable {
+  private static final Logger log = LoggerFactory.getLogger(GenerateSplits.class);
+
+  static class Opts extends ConfigOpts {
+    @Parameter(names = {"-a", "--auths"}, converter = ClientOpts.AuthConverter.class,
+        description = "the authorizations to use when reading the files")
+    public Authorizations auths = Authorizations.EMPTY;
+
+    @Parameter(names = {"-n", "--num"}, description = "The number of splits to generate")
+    public long numSplits = 0;
+
+    @Parameter(names = {"-ss", "--split-size"}, description = "The split size desired in bytes")
+    public long splitSize = 0;
+
+    @Parameter(names = {"-b64", "--base64encoded"}, description = "Base 64 encode the split points")

Review comment:
       I went with that string to be consistent with the `getsplits` shell command. https://github.com/apache/accumulo/blob/babb1d9e559a78577fbe763181043715e572bfa5/shell/src/main/java/org/apache/accumulo/shell/commands/GetSplitsCommand.java#L131




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] DomGarguilo commented on a change in pull request #2368: Create GenerateSplits utility

Posted by GitBox <gi...@apache.org>.
DomGarguilo commented on a change in pull request #2368:
URL: https://github.com/apache/accumulo/pull/2368#discussion_r756343518



##########
File path: core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.BufferedWriter;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.cli.ClientOpts;
+import org.apache.accumulo.core.cli.ConfigOpts;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.crypto.CryptoServiceFactory;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.accumulo.start.spi.KeywordExecutable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.google.auto.service.AutoService;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@AutoService(KeywordExecutable.class)
+@SuppressFBWarnings(value = "PATH_TRAVERSAL_OUT",
+    justification = "app is run in same security context as user providing the filename")
+public class GenerateSplits implements KeywordExecutable {
+  private static final Logger log = LoggerFactory.getLogger(GenerateSplits.class);
+
+  static class Opts extends ConfigOpts {
+    @Parameter(names = {"-a", "--auths"}, converter = ClientOpts.AuthConverter.class,
+        description = "the authorizations to use when reading the files")
+    public Authorizations auths = Authorizations.EMPTY;
+
+    @Parameter(names = {"-n", "--num"}, description = "The number of splits to generate")
+    public long numSplits = 0;
+
+    @Parameter(names = {"-ss", "--split-size"}, description = "The split size desired in bytes")
+    public long splitSize = 0;
+
+    @Parameter(names = {"-b64", "--base64encoded"}, description = "Base 64 encode the split points")

Review comment:
       ```suggestion
       @Parameter(names = {"-b64", "--base64-encoded"}, description = "Base 64 encode the split points")
   ```
   Could add this for consistency. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2368: Create GenerateSplits utility

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2368:
URL: https://github.com/apache/accumulo/pull/2368#discussion_r765991529



##########
File path: core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java
##########
@@ -201,7 +201,9 @@ public void execute(String[] args) throws Exception {
       itemsSketch.update(row);
       iterator.next();
     }
-    return itemsSketch.getQuantiles(numSplits);
+    Text[] items = itemsSketch.getQuantiles(numSplits + 2);

Review comment:
       > The class seems to buffer information in memory, but I don't have a good sense of what if any limits there are on how much it will buffer.
   
   That is a good question, I am not sure either. Perhaps this utility is a good, harmless way to test the limits of the library, in the case where we want to use it in other, more important parts of the code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on pull request #2368: Create GenerateSplits utility

Posted by GitBox <gi...@apache.org>.
milleruntime commented on pull request #2368:
URL: https://github.com/apache/accumulo/pull/2368#issuecomment-989085215


   @keith-turner @ctubbsii I think this is OK to merge. Were you OK with the new dependency? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2368: Create GenerateSplits utility

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2368:
URL: https://github.com/apache/accumulo/pull/2368#discussion_r758758742



##########
File path: core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java
##########
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.BufferedWriter;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.cli.ClientOpts;
+import org.apache.accumulo.core.cli.ConfigOpts;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.crypto.CryptoServiceFactory;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.accumulo.start.spi.KeywordExecutable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.google.auto.service.AutoService;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@AutoService(KeywordExecutable.class)
+@SuppressFBWarnings(value = "PATH_TRAVERSAL_OUT",
+    justification = "app is run in same security context as user providing the filename")
+public class GenerateSplits implements KeywordExecutable {
+  private static final Logger log = LoggerFactory.getLogger(GenerateSplits.class);
+
+  static class Opts extends ConfigOpts {
+    @Parameter(names = {"-a", "--auths"}, converter = ClientOpts.AuthConverter.class,
+        description = "the authorizations to use when reading the files")
+    public Authorizations auths = Authorizations.EMPTY;
+
+    @Parameter(names = {"-n", "--num"},
+        description = "The number of splits to generate. Cannot use with the split size option.")
+    public long numSplits = 0;
+
+    @Parameter(names = {"-ss", "--split-size"},
+        description = "The split size desired in bytes. Cannot use with num splits option.")
+    public long splitSize = 0;
+
+    @Parameter(names = {"-b64", "--base64encoded"}, description = "Base 64 encode the split points")
+    public boolean base64encode = false;
+
+    @Parameter(names = {"-sf", "--splits-file"}, description = "Output the splits to a file")
+    public String outputFile;
+
+    @Parameter(description = " <file|directory> { <file> ... }")
+    public List<String> files = new ArrayList<>();
+  }
+
+  @Override
+  public String keyword() {
+    return "generate-splits";
+  }
+
+  @Override
+  public String description() {
+    return "Generate split points from a set of 1 or more rfiles";
+  }
+
+  public static void main(String[] args) throws Exception {
+    new GenerateSplits().execute(args);
+  }
+
+  @Override
+  public void execute(String[] args) throws Exception {
+    Opts opts = new Opts();
+    opts.parseArgs(GenerateSplits.class.getName(), args);
+    if (opts.files.isEmpty()) {
+      throw new IllegalArgumentException("No files were given");
+    }
+
+    Configuration hadoopConf = new Configuration();
+    SiteConfiguration siteConf = opts.getSiteConfiguration();
+    boolean encode = opts.base64encode;
+
+    TreeSet<String> splits;
+    TreeSet<String> desiredSplits = new TreeSet<>();
+
+    if (opts.numSplits > 0 && opts.splitSize > 0) {
+      throw new IllegalArgumentException("Requested number of splits and split size.");
+    }
+    if (opts.numSplits == 0 && opts.splitSize == 0) {
+      throw new IllegalArgumentException("Required number of splits or split size.");
+    }
+    long numSplits = opts.numSplits;
+    long splitSize = opts.splitSize;
+
+    FileSystem fs = FileSystem.get(hadoopConf);
+    FileSystem localFs = FileSystem.getLocal(hadoopConf);
+    List<Path> filePaths = new ArrayList<>();
+    for (String file : opts.files) {
+      Path path = new Path(file);
+      if (file.contains(":")) {
+        fs = path.getFileSystem(hadoopConf);
+      } else {
+        log.warn("Attempting to find file across filesystems. Consider providing URI "
+            + "instead of path");
+        if (!fs.exists(path))
+          fs = localFs; // fall back to local
+      }
+      // get all the files in the directory
+      if (fs.getFileStatus(path).isDirectory()) {
+        // can only explode one directory
+        if (opts.files.size() > 1)
+          throw new IllegalArgumentException("Only one directory can be specified");
+        var iter = fs.listFiles(path, true);
+        while (iter.hasNext()) {
+          filePaths.add(iter.next().getPath());
+        }
+      } else {
+        filePaths.add(path);
+      }
+    }
+
+    // if no size specified look at indexed keys first
+    if (opts.splitSize == 0) {
+      splits = getIndexKeys(siteConf, hadoopConf, fs, filePaths, encode);
+      // if there weren't enough splits indexed, try again with size = 0
+      if (splits.size() < numSplits) {
+        log.info("Only found {} indexed keys but need {}. Doing a full scan on files {}",
+            splits.size(), numSplits, filePaths);
+        splits = getSplitsBySize(siteConf, hadoopConf, filePaths, fs, 0, encode);
+      }
+    } else {
+      splits = getSplitsBySize(siteConf, hadoopConf, filePaths, fs, splitSize, encode);
+    }
+
+    int numFound = splits.size();
+    // its possible we found too many indexed so take every (numFound / numSplits) split
+    if (opts.splitSize == 0 && numFound > numSplits) {
+      var iter = splits.iterator();
+      double targetFactor = (double) numFound / numSplits;
+      log.debug("Found {} splits but requested {} picking 1 every {}", numFound, opts.numSplits,
+          targetFactor);
+
+      for (int i = 0; i < numFound; i++) {
+        double currFactor = (double) (i + 1) / desiredSplits.size();
+        String next = iter.next();
+        // unsure if this should be currFactor >= targetFactor
+        if (currFactor > targetFactor && desiredSplits.size() < numSplits) {

Review comment:
       This code I suggested is not correct, I think it always add the first split even though that is not always desired.  Like if numFound is 1000 and I want 10 split points from those then we would not want to take the 1st one that comes along.  Also I think targetFactor is a bit off. I am playing around with the code locally.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2368: Create GenerateSplits utility

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2368:
URL: https://github.com/apache/accumulo/pull/2368#discussion_r759320712



##########
File path: core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java
##########
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.BufferedWriter;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.cli.ClientOpts;
+import org.apache.accumulo.core.cli.ConfigOpts;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.crypto.CryptoServiceFactory;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.accumulo.start.spi.KeywordExecutable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.google.auto.service.AutoService;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@AutoService(KeywordExecutable.class)
+@SuppressFBWarnings(value = "PATH_TRAVERSAL_OUT",
+    justification = "app is run in same security context as user providing the filename")
+public class GenerateSplits implements KeywordExecutable {
+  private static final Logger log = LoggerFactory.getLogger(GenerateSplits.class);
+
+  static class Opts extends ConfigOpts {
+    @Parameter(names = {"-a", "--auths"}, converter = ClientOpts.AuthConverter.class,
+        description = "the authorizations to use when reading the files")
+    public Authorizations auths = Authorizations.EMPTY;
+
+    @Parameter(names = {"-n", "--num"},
+        description = "The number of splits to generate. Cannot use with the split size option.")
+    public long numSplits = 0;
+
+    @Parameter(names = {"-ss", "--split-size"},
+        description = "The split size desired in bytes. Cannot use with num splits option.")
+    public long splitSize = 0;
+
+    @Parameter(names = {"-b64", "--base64encoded"}, description = "Base 64 encode the split points")
+    public boolean base64encode = false;
+
+    @Parameter(names = {"-sf", "--splits-file"}, description = "Output the splits to a file")
+    public String outputFile;
+
+    @Parameter(description = " <file|directory> { <file> ... }")
+    public List<String> files = new ArrayList<>();
+  }
+
+  @Override
+  public String keyword() {
+    return "generate-splits";
+  }
+
+  @Override
+  public String description() {
+    return "Generate split points from a set of 1 or more rfiles";
+  }
+
+  public static void main(String[] args) throws Exception {
+    new GenerateSplits().execute(args);
+  }
+
+  @Override
+  public void execute(String[] args) throws Exception {
+    Opts opts = new Opts();
+    opts.parseArgs(GenerateSplits.class.getName(), args);
+    if (opts.files.isEmpty()) {
+      throw new IllegalArgumentException("No files were given");
+    }
+
+    Configuration hadoopConf = new Configuration();
+    SiteConfiguration siteConf = opts.getSiteConfiguration();
+    boolean encode = opts.base64encode;
+
+    TreeSet<String> splits;
+    TreeSet<String> desiredSplits = new TreeSet<>();
+
+    if (opts.numSplits > 0 && opts.splitSize > 0) {
+      throw new IllegalArgumentException("Requested number of splits and split size.");
+    }
+    if (opts.numSplits == 0 && opts.splitSize == 0) {
+      throw new IllegalArgumentException("Required number of splits or split size.");
+    }
+    long numSplits = opts.numSplits;
+    long splitSize = opts.splitSize;
+
+    FileSystem fs = FileSystem.get(hadoopConf);
+    FileSystem localFs = FileSystem.getLocal(hadoopConf);
+    List<Path> filePaths = new ArrayList<>();
+    for (String file : opts.files) {
+      Path path = new Path(file);
+      if (file.contains(":")) {
+        fs = path.getFileSystem(hadoopConf);
+      } else {
+        log.warn("Attempting to find file across filesystems. Consider providing URI "
+            + "instead of path");
+        if (!fs.exists(path))
+          fs = localFs; // fall back to local
+      }
+      // get all the files in the directory
+      if (fs.getFileStatus(path).isDirectory()) {
+        // can only explode one directory
+        if (opts.files.size() > 1)
+          throw new IllegalArgumentException("Only one directory can be specified");
+        var iter = fs.listFiles(path, true);
+        while (iter.hasNext()) {
+          filePaths.add(iter.next().getPath());
+        }
+      } else {
+        filePaths.add(path);
+      }
+    }
+
+    // if no size specified look at indexed keys first
+    if (opts.splitSize == 0) {
+      splits = getIndexKeys(siteConf, hadoopConf, fs, filePaths, encode);
+      // if there weren't enough splits indexed, try again with size = 0
+      if (splits.size() < numSplits) {
+        log.info("Only found {} indexed keys but need {}. Doing a full scan on files {}",
+            splits.size(), numSplits, filePaths);
+        splits = getSplitsBySize(siteConf, hadoopConf, filePaths, fs, 0, encode);
+      }
+    } else {
+      splits = getSplitsBySize(siteConf, hadoopConf, filePaths, fs, splitSize, encode);
+    }
+
+    int numFound = splits.size();
+    // its possible we found too many indexed so take every (numFound / numSplits) split
+    if (opts.splitSize == 0 && numFound > numSplits) {
+      var iter = splits.iterator();
+      double targetFactor = (double) numFound / numSplits;
+      log.debug("Found {} splits but requested {} picking 1 every {}", numFound, opts.numSplits,
+          targetFactor);
+
+      for (int i = 0; i < numFound; i++) {
+        double currFactor = (double) (i + 1) / desiredSplits.size();
+        String next = iter.next();
+        // unsure if this should be currFactor >= targetFactor
+        if (currFactor > targetFactor && desiredSplits.size() < numSplits) {
+          desiredSplits.add(next);
+        }
+      }

Review comment:
       This fails the test case with 6 splits and `--num 4`. It will only return 3 splits, r2, r4, r6.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2368: Create GenerateSplits utility

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2368:
URL: https://github.com/apache/accumulo/pull/2368#discussion_r758775371



##########
File path: core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java
##########
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.BufferedWriter;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.cli.ClientOpts;
+import org.apache.accumulo.core.cli.ConfigOpts;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.crypto.CryptoServiceFactory;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.accumulo.start.spi.KeywordExecutable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.google.auto.service.AutoService;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@AutoService(KeywordExecutable.class)
+@SuppressFBWarnings(value = "PATH_TRAVERSAL_OUT",
+    justification = "app is run in same security context as user providing the filename")
+public class GenerateSplits implements KeywordExecutable {
+  private static final Logger log = LoggerFactory.getLogger(GenerateSplits.class);
+
+  static class Opts extends ConfigOpts {
+    @Parameter(names = {"-a", "--auths"}, converter = ClientOpts.AuthConverter.class,
+        description = "the authorizations to use when reading the files")
+    public Authorizations auths = Authorizations.EMPTY;
+
+    @Parameter(names = {"-n", "--num"},
+        description = "The number of splits to generate. Cannot use with the split size option.")
+    public long numSplits = 0;
+
+    @Parameter(names = {"-ss", "--split-size"},
+        description = "The split size desired in bytes. Cannot use with num splits option.")
+    public long splitSize = 0;
+
+    @Parameter(names = {"-b64", "--base64encoded"}, description = "Base 64 encode the split points")
+    public boolean base64encode = false;
+
+    @Parameter(names = {"-sf", "--splits-file"}, description = "Output the splits to a file")
+    public String outputFile;
+
+    @Parameter(description = " <file|directory> { <file> ... }")
+    public List<String> files = new ArrayList<>();
+  }
+
+  @Override
+  public String keyword() {
+    return "generate-splits";
+  }
+
+  @Override
+  public String description() {
+    return "Generate split points from a set of 1 or more rfiles";
+  }
+
+  public static void main(String[] args) throws Exception {
+    new GenerateSplits().execute(args);
+  }
+
+  @Override
+  public void execute(String[] args) throws Exception {
+    Opts opts = new Opts();
+    opts.parseArgs(GenerateSplits.class.getName(), args);
+    if (opts.files.isEmpty()) {
+      throw new IllegalArgumentException("No files were given");
+    }
+
+    Configuration hadoopConf = new Configuration();
+    SiteConfiguration siteConf = opts.getSiteConfiguration();
+    boolean encode = opts.base64encode;
+
+    TreeSet<String> splits;
+    TreeSet<String> desiredSplits = new TreeSet<>();
+
+    if (opts.numSplits > 0 && opts.splitSize > 0) {
+      throw new IllegalArgumentException("Requested number of splits and split size.");
+    }
+    if (opts.numSplits == 0 && opts.splitSize == 0) {
+      throw new IllegalArgumentException("Required number of splits or split size.");
+    }
+    long numSplits = opts.numSplits;
+    long splitSize = opts.splitSize;
+
+    FileSystem fs = FileSystem.get(hadoopConf);
+    FileSystem localFs = FileSystem.getLocal(hadoopConf);
+    List<Path> filePaths = new ArrayList<>();
+    for (String file : opts.files) {
+      Path path = new Path(file);
+      if (file.contains(":")) {
+        fs = path.getFileSystem(hadoopConf);
+      } else {
+        log.warn("Attempting to find file across filesystems. Consider providing URI "
+            + "instead of path");
+        if (!fs.exists(path))
+          fs = localFs; // fall back to local
+      }
+      // get all the files in the directory
+      if (fs.getFileStatus(path).isDirectory()) {
+        // can only explode one directory
+        if (opts.files.size() > 1)
+          throw new IllegalArgumentException("Only one directory can be specified");
+        var iter = fs.listFiles(path, true);
+        while (iter.hasNext()) {
+          filePaths.add(iter.next().getPath());
+        }
+      } else {
+        filePaths.add(path);
+      }
+    }
+
+    // if no size specified look at indexed keys first
+    if (opts.splitSize == 0) {
+      splits = getIndexKeys(siteConf, hadoopConf, fs, filePaths, encode);
+      // if there weren't enough splits indexed, try again with size = 0
+      if (splits.size() < numSplits) {
+        log.info("Only found {} indexed keys but need {}. Doing a full scan on files {}",
+            splits.size(), numSplits, filePaths);
+        splits = getSplitsBySize(siteConf, hadoopConf, filePaths, fs, 0, encode);
+      }
+    } else {
+      splits = getSplitsBySize(siteConf, hadoopConf, filePaths, fs, splitSize, encode);
+    }
+
+    int numFound = splits.size();
+    // its possible we found too many indexed so take every (numFound / numSplits) split
+    if (opts.splitSize == 0 && numFound > numSplits) {
+      var iter = splits.iterator();
+      double targetFactor = (double) numFound / numSplits;
+      log.debug("Found {} splits but requested {} picking 1 every {}", numFound, opts.numSplits,
+          targetFactor);
+
+      for (int i = 0; i < numFound; i++) {
+        double currFactor = (double) (i + 1) / desiredSplits.size();
+        String next = iter.next();
+        // unsure if this should be currFactor >= targetFactor
+        if (currFactor > targetFactor && desiredSplits.size() < numSplits) {
+          desiredSplits.add(next);
+        }
+      }

Review comment:
       The following fixes some of the issues I mentioned.  Its modeled after the code in TabletOperations.listSplits w/o the suspicious while loop.
   
   ```suggestion
         var iter = splits.iterator();
         // This is how much each of the found rows will advances twoards a desired split point.  Add one to numSplits because if we request 9 splits, there will 10 tablets and we the 9 splits evenly spaces between the 10.
         double increment = (numSplits+1.0)/numFound;
         log.debug("Found {} splits but requested {} picking 1 every {}", numFound, opts.numSplits,
             targetFactor);
   
          // Tracks how far along we are twoards the next split.
          double total = 0;
   
         for (int i = 0; i < numFound; i++) {
           total += increment;
           String next = iter.next();
           // unsure if this should be currFactor >= targetFactor
           if (total> 1 && desiredSplits.size() < numSplits) {
             desiredSplits.add(next);
             total -= 1;
           }
         }
   ```
   
   I created the following little program locally to play with the code and it seems good.
   
   ```java
   public class Test {
     public static void main(String[] args) {
       printSplits(100, 9);
       System.out.println();
       printSplits(15, 10);
       System.out.println();
       printSplits(10, 10);
     }
   
     private static void printSplits(int numFound, int numSplits) {
       // This is how much each of the found rows will advances twoards a desired split point.  Add one to numSplits because if we request 9 splits, there will 10 tablets and we the 9 splits evenly spaces between the 10.
       double increment = (numSplits+1.0)/numFound;
   
       System.out.println("increment "+increment);
       
       int added = 0;
   
       // Tracks how far along we are twoards the next split.
       double total = 0;
       
       for (int i = 0; i < numFound; i++) {
        total += increment;
       
         // unsure if this should be currFactor >= targetFactor
         if (total > 1 && added < numSplits) {
           System.out.println(i+" "+added +" "+total);
           added++;
           total -= 1;
         }
       }
     }
   }
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2368: Create GenerateSplits utility

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2368:
URL: https://github.com/apache/accumulo/pull/2368#discussion_r756362867



##########
File path: core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.BufferedWriter;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.cli.ClientOpts;
+import org.apache.accumulo.core.cli.ConfigOpts;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.crypto.CryptoServiceFactory;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.accumulo.start.spi.KeywordExecutable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.google.auto.service.AutoService;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@AutoService(KeywordExecutable.class)
+@SuppressFBWarnings(value = "PATH_TRAVERSAL_OUT",
+    justification = "app is run in same security context as user providing the filename")
+public class GenerateSplits implements KeywordExecutable {
+  private static final Logger log = LoggerFactory.getLogger(GenerateSplits.class);
+
+  static class Opts extends ConfigOpts {
+    @Parameter(names = {"-a", "--auths"}, converter = ClientOpts.AuthConverter.class,
+        description = "the authorizations to use when reading the files")
+    public Authorizations auths = Authorizations.EMPTY;
+
+    @Parameter(names = {"-n", "--num"}, description = "The number of splits to generate")
+    public long numSplits = 0;
+
+    @Parameter(names = {"-ss", "--split-size"}, description = "The split size desired in bytes")
+    public long splitSize = 0;
+
+    @Parameter(names = {"-b64", "--base64encoded"}, description = "Base 64 encode the split points")
+    public boolean base64encode = false;
+
+    @Parameter(names = {"-sf", "--splits-file"}, description = "Output the splits to a file")
+    public String outputFile;
+
+    @Parameter(description = " <file|directory> { <file> ... }")
+    public List<String> files = new ArrayList<>();
+  }
+
+  @Override
+  public String keyword() {
+    return "generate-splits";
+  }
+
+  @Override
+  public String description() {
+    return "Generate split points from a set of 1 or more rfiles";
+  }
+
+  public static void main(String[] args) throws Exception {
+    new GenerateSplits().execute(args);
+  }
+
+  @Override
+  public void execute(String[] args) throws Exception {
+    Opts opts = new Opts();
+    opts.parseArgs(GenerateSplits.class.getName(), args);
+    if (opts.files.isEmpty()) {
+      throw new IllegalArgumentException("No files were given");
+    }
+
+    Configuration hadoopConf = new Configuration();
+    SiteConfiguration siteConf = opts.getSiteConfiguration();
+    boolean encode = opts.base64encode;
+
+    TreeSet<String> splits;
+    TreeSet<String> desiredSplits = new TreeSet<>();
+
+    if (opts.numSplits > 0 && opts.splitSize > 0) {
+      throw new IllegalArgumentException("Requested number of splits and split size.");
+    }
+    if (opts.numSplits == 0 && opts.splitSize == 0) {
+      throw new IllegalArgumentException("Required number of splits or split size.");
+    }
+    long numSplits = opts.numSplits;
+    long splitSize = opts.splitSize;
+
+    FileSystem fs = FileSystem.get(hadoopConf);
+    FileSystem localFs = FileSystem.getLocal(hadoopConf);
+    List<Path> filePaths = new ArrayList<>();
+    for (String file : opts.files) {
+      Path path = new Path(file);
+      if (file.contains(":")) {
+        fs = path.getFileSystem(hadoopConf);
+      } else {
+        log.warn("Attempting to find file across filesystems. Consider providing URI "
+            + "instead of path");
+        if (!fs.exists(path))
+          fs = localFs; // fall back to local
+      }
+      // get all the files in the directory
+      if (fs.getFileStatus(path).isDirectory()) {
+        // can only explode one directory
+        if (opts.files.size() > 1)
+          throw new IllegalArgumentException("Only one directory can be specified");
+        var iter = fs.listFiles(path, true);
+        while (iter.hasNext()) {
+          filePaths.add(iter.next().getPath());
+        }
+      } else {
+        filePaths.add(path);
+      }
+    }
+
+    // if no size specified look at indexed keys first
+    if (opts.splitSize == 0) {
+      splits = getIndexKeys(siteConf, hadoopConf, fs, filePaths, encode);
+      // if there weren't enough splits indexed, try again with size = 0
+      if (splits.size() < numSplits) {
+        log.info("Only found {} indexed keys but need {}. Doing a full scan on files {}",
+            splits.size(), numSplits, filePaths);
+        splits = getSplitsBySize(siteConf, hadoopConf, filePaths, fs, 0, encode);
+      }
+    } else {
+      splits = getSplitsBySize(siteConf, hadoopConf, filePaths, fs, splitSize, encode);
+    }
+
+    int numFound = splits.size();
+    // its possible we found too many indexed so take every (numFound / numSplits) split
+    if (opts.splitSize == 0 && numFound > numSplits) {
+      var iter = splits.iterator();
+      long factor = numFound / numSplits;
+      log.debug("Found {} splits but requested {} picking 1 every {}", numFound, opts.numSplits,
+          factor);
+      for (int i = 0; i < numFound; i++) {
+        String next = iter.next();
+        if (i % factor == 0 && desiredSplits.size() < numSplits) {
+          desiredSplits.add(next);
+        }
+      }
+    } else if (numFound < numSplits) {
+      log.warn("Only found " + numFound + " splits");
+      desiredSplits = splits;
+    } else {
+      desiredSplits = splits;
+    }

Review comment:
       Fixed in 4d1fdc6




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] ctubbsii commented on a change in pull request #2368: Create GenerateSplits utility

Posted by GitBox <gi...@apache.org>.
ctubbsii commented on a change in pull request #2368:
URL: https://github.com/apache/accumulo/pull/2368#discussion_r756233337



##########
File path: core/src/test/java/org/apache/accumulo/core/file/rfile/GenerateSplitsTest.java
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import static org.apache.accumulo.core.file.rfile.GenerateSplits.main;
+import static org.apache.accumulo.core.file.rfile.RFileTest.newColFamByteSequence;
+import static org.apache.accumulo.core.file.rfile.RFileTest.newKey;
+import static org.apache.accumulo.core.file.rfile.RFileTest.newValue;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.List;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "paths provided by test")
+public class GenerateSplitsTest {
+  private static final Logger log = LoggerFactory.getLogger(GenerateSplitsTest.class);
+
+  @ClassRule
+  public static final TemporaryFolder tempFolder =
+      new TemporaryFolder(new File(System.getProperty("user.dir") + "/target"));
+
+  private static final RFileTest.TestRFile trf = new RFileTest.TestRFile(null);
+  private static String fileName;
+
+  /**
+   * Creates a test file with 84 bytes of data and 2 Locality groups.
+   */
+  @BeforeClass
+  public static void createFile() throws IOException {
+    trf.openWriter(false);
+    trf.writer.startNewLocalityGroup("lg1", newColFamByteSequence("cf1", "cf2"));
+    trf.writer.append(newKey("r1", "cf1", "cq1", "L1", 55), newValue("foo1"));
+    trf.writer.append(newKey("r2", "cf2", "cq1", "L1", 55), newValue("foo2"));
+    trf.writer.append(newKey("r3", "cf2", "cq1", "L1", 55), newValue("foo3"));
+    trf.writer.startNewLocalityGroup("lg2", newColFamByteSequence("cf3", "cf4"));
+    trf.writer.append(newKey("r4", "cf3", "cq1", "L1", 55), newValue("foo4"));
+    trf.writer.append(newKey("r5", "cf4", "cq1", "L1", 55), newValue("foo5"));
+    trf.writer.append(newKey("r6", "cf4", "cq1", "L1", 55), newValue("foo6"));
+    trf.closeWriter();
+
+    File file = tempFolder.newFile("testGenerateSplits.rf");
+    FileOutputStream fileOutputStream = new FileOutputStream(file);
+    fileOutputStream.write(trf.baos.toByteArray());
+    fileOutputStream.flush();
+    fileOutputStream.close();

Review comment:
       ```suggestion
       try (var fileOutputStream = new FileOutputStream(file)) {
         fileOutputStream.write(trf.baos.toByteArray());
       }
   ```

##########
File path: core/src/test/java/org/apache/accumulo/core/file/rfile/GenerateSplitsTest.java
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import static org.apache.accumulo.core.file.rfile.GenerateSplits.main;
+import static org.apache.accumulo.core.file.rfile.RFileTest.newColFamByteSequence;
+import static org.apache.accumulo.core.file.rfile.RFileTest.newKey;
+import static org.apache.accumulo.core.file.rfile.RFileTest.newValue;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.List;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "paths provided by test")
+public class GenerateSplitsTest {
+  private static final Logger log = LoggerFactory.getLogger(GenerateSplitsTest.class);
+
+  @ClassRule

Review comment:
       This is the first ever occurrence of `@ClassRule` in our code. We usually just use `@Rule`. I'm guessing you did this so you only need to create test RFiles once, to be shared by all tests. I'm wondering if there's any downside to having each test reuse the same temp folder and test file.

##########
File path: core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java
##########
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.BufferedWriter;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.cli.ClientOpts;
+import org.apache.accumulo.core.cli.ConfigOpts;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.crypto.CryptoServiceFactory;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.accumulo.start.spi.KeywordExecutable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.google.auto.service.AutoService;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@AutoService(KeywordExecutable.class)
+@SuppressFBWarnings(value = "PATH_TRAVERSAL_OUT",
+    justification = "app is run in same security context as user providing the filename")
+public class GenerateSplits implements KeywordExecutable {
+  private static final Logger log = LoggerFactory.getLogger(GenerateSplits.class);
+
+  static class Opts extends ConfigOpts {
+    @Parameter(names = {"-a", "--auths"}, converter = ClientOpts.AuthConverter.class,
+        description = "the authorizations to use when reading the files")
+    public Authorizations auths = Authorizations.EMPTY;
+
+    @Parameter(names = {"-n", "--num"}, description = "The number of splits to generate")
+    public long numSplits = 0;
+
+    @Parameter(names = {"-ss", "--split-size"}, description = "The split size desired in bytes")
+    public long splitSize = 0;
+
+    @Parameter(names = {"-b64", "--base64encoded"}, description = "Base 64 encode the split points")
+    public boolean base64encode = false;
+
+    @Parameter(names = {"-sf", "--splits-file"}, description = "Output the splits to a file")
+    public String outputFile;
+
+    @Parameter(description = " <file|directory> { <file> ... }")
+    public List<String> files = new ArrayList<>();
+  }
+
+  @Override
+  public String keyword() {
+    return "generate-splits";
+  }
+
+  @Override
+  public String description() {
+    return "Generate split points from a set of 1 or more rfiles";
+  }
+
+  public static void main(String[] args) throws Exception {
+    new GenerateSplits().execute(args);
+  }
+
+  @Override
+  public void execute(String[] args) throws Exception {
+    Opts opts = new Opts();
+    opts.parseArgs(GenerateSplits.class.getName(), args);
+    if (opts.files.isEmpty()) {
+      throw new IllegalArgumentException("No files were given");
+    }
+
+    Configuration hadoopConf = new Configuration();
+    SiteConfiguration siteConf = opts.getSiteConfiguration();
+    boolean encode = opts.base64encode;
+
+    TreeSet<String> splits;
+    TreeSet<String> desiredSplits = new TreeSet<>();
+
+    if (opts.numSplits > 0 && opts.splitSize > 0) {
+      throw new IllegalArgumentException("Requested number of splits and split size.");
+    }
+    if (opts.numSplits == 0 && opts.splitSize == 0) {
+      throw new IllegalArgumentException("Required number of splits or split size.");
+    }
+    long numSplits = opts.numSplits;
+    long splitSize = opts.splitSize;
+
+    FileSystem fs = FileSystem.get(hadoopConf);
+    FileSystem localFs = FileSystem.getLocal(hadoopConf);
+    List<Path> filePaths = new ArrayList<>();
+    for (String file : opts.files) {
+      Path path = new Path(file);
+      if (file.contains(":")) {
+        fs = path.getFileSystem(hadoopConf);
+      } else {
+        log.warn("Attempting to find file across filesystems. Consider providing URI "
+            + "instead of path");
+        if (!fs.exists(path))
+          fs = localFs; // fall back to local
+      }
+      // get all the files in the directory
+      if (fs.getFileStatus(path).isDirectory()) {
+        // can only explode one directory
+        if (opts.files.size() > 1)
+          throw new IllegalArgumentException("Only one directory can be specified");
+        var iter = fs.listFiles(path, true);
+        while (iter.hasNext()) {
+          filePaths.add(iter.next().getPath());
+        }
+      } else {
+        filePaths.add(path);
+      }
+    }
+
+    // if no size specified look at indexed keys first
+    if (opts.splitSize == 0) {
+      splits = getIndexKeys(siteConf, hadoopConf, fs, filePaths, encode);
+      // if there weren't enough splits indexed, try again with size = 0
+      if (splits.size() < numSplits) {
+        log.info("Only found {} indexed keys but need {}. Doing a full scan on files {}",
+            splits.size(), numSplits, filePaths);
+        splits = getSplitsBySize(siteConf, hadoopConf, filePaths, fs, 0, encode);
+      }
+    } else {
+      splits = getSplitsBySize(siteConf, hadoopConf, filePaths, fs, splitSize, encode);
+    }
+
+    int numFound = splits.size();
+    // its possible we found too many indexed so take every (numFound / numSplits) split
+    if (opts.splitSize == 0 && numFound > numSplits) {
+      var iter = splits.iterator();
+      long factor = numFound / numSplits;
+      log.debug("Found {} splits but requested {} picking 1 every {}", numFound, opts.numSplits,
+          factor);
+      for (int i = 0; i < numFound; i++) {
+        String next = iter.next();
+        if (i % factor == 0 && desiredSplits.size() < numSplits) {
+          desiredSplits.add(next);
+        }
+      }
+    } else if (numFound < numSplits) {
+      log.warn("Only found " + numFound + " splits");
+      desiredSplits = splits;
+    } else {
+      desiredSplits = splits;
+    }
+
+    log.info("Generated {} splits", desiredSplits.size());
+    if (opts.outputFile != null) {
+      log.info("Writing splits to file {} ", opts.outputFile);
+      try (var writer = new PrintWriter(new BufferedWriter(
+          new OutputStreamWriter(new FileOutputStream(opts.outputFile), UTF_8)))) {
+        desiredSplits.forEach(writer::println);
+      }
+    } else {
+      desiredSplits.forEach(System.out::println);
+    }
+  }
+
+  private static String encode(boolean encode, Text text) {
+    if (text == null) {
+      return null;
+    }
+    byte[] bytes = TextUtil.getBytes(text);
+    if (encode)
+      return Base64.getEncoder().encodeToString(bytes);
+    else {
+      // drop non printable characters
+      StringBuilder sb = new StringBuilder();
+      for (byte aByte : bytes) {
+        int c = 0xff & aByte;
+        if (c == '\\')
+          sb.append("\\\\");
+        else if (c >= 32 && c <= 126)
+          sb.append((char) c);
+        else
+          log.debug("Dropping non printable char: \\x{}", Integer.toHexString(c));
+      }
+      return sb.toString();
+    }
+  }
+
+  /**
+   * Scan the files for indexed keys since it is more efficient than a full file scan.
+   */
+  private TreeSet<String> getIndexKeys(AccumuloConfiguration accumuloConf, Configuration hadoopConf,
+      FileSystem fs, List<Path> files, boolean base64encode) throws IOException {
+    TreeSet<String> indexKeys = new TreeSet<>();
+    List<SortedKeyValueIterator<Key,Value>> readers = new ArrayList<>(files.size());
+    List<FileSKVIterator> fileReaders = new ArrayList<>(files.size());
+
+    for (Path file : files) {
+      FileSKVIterator reader = FileOperations.getInstance().newIndexReaderBuilder()
+          .forFile(file.toString(), fs, hadoopConf, CryptoServiceFactory.newDefaultInstance())
+          .withTableConfiguration(accumuloConf).build();
+      readers.add(reader);
+      fileReaders.add(reader);
+    }
+    var iterator = new MultiIterator(readers, true);
+    while (iterator.hasTop()) {
+      Key key = iterator.getTopKey();
+      indexKeys.add(encode(base64encode, key.getRow()));
+      iterator.next();
+    }
+
+    for (var r : fileReaders) {
+      r.close();
+    }
+
+    log.debug("Got {} splits from indices of {}", indexKeys.size(), files);
+    return indexKeys;
+  }
+
+  /**
+   * Get number of splits based on requested size of split. A splitSize = 0 returns all keys.
+   */
+  private TreeSet<String> getSplitsBySize(AccumuloConfiguration accumuloConf,
+      Configuration hadoopConf, List<Path> files, FileSystem fs, long splitSize,
+      boolean base64encode) throws IOException {
+    TreeSet<String> splits = new TreeSet<>();
+    List<SortedKeyValueIterator<Key,Value>> readers = new ArrayList<>(files.size());
+    List<FileSKVIterator> fileReaders = new ArrayList<>(files.size());
+    SortedKeyValueIterator<Key,Value> iterator;
+
+    long currentSplitSize = 0;
+    long totalSize = 0;
+    for (Path file : files) {
+      FileSKVIterator reader = FileOperations.getInstance().newScanReaderBuilder()
+          .forFile(file.toString(), fs, hadoopConf, CryptoServiceFactory.newDefaultInstance())
+          .withTableConfiguration(accumuloConf).overRange(new Range(), Set.of(), false).build();
+      readers.add(reader);
+      fileReaders.add(reader);
+    }
+    iterator = new MultiIterator(readers, false);
+    iterator.seek(new Range(), Collections.emptySet(), false);
+    while (iterator.hasTop()) {
+      Key key = iterator.getTopKey();
+      Value val = iterator.getTopValue();
+      int size = key.getSize() + val.getSize();
+      currentSplitSize += size;
+      totalSize += size;
+      if (currentSplitSize > splitSize) {
+        splits.add(encode(base64encode, key.getRow()));
+        currentSplitSize = 0;
+      }
+      iterator.next();
+    }
+    for (var r : fileReaders) {
+      r.close();
+    }

Review comment:
       This could be in a finally block in case there are any exceptions above after the readers are created.

##########
File path: core/src/test/java/org/apache/accumulo/core/file/rfile/GenerateSplitsTest.java
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import static org.apache.accumulo.core.file.rfile.GenerateSplits.main;
+import static org.apache.accumulo.core.file.rfile.RFileTest.newColFamByteSequence;
+import static org.apache.accumulo.core.file.rfile.RFileTest.newKey;
+import static org.apache.accumulo.core.file.rfile.RFileTest.newValue;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.List;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "paths provided by test")
+public class GenerateSplitsTest {
+  private static final Logger log = LoggerFactory.getLogger(GenerateSplitsTest.class);
+
+  @ClassRule
+  public static final TemporaryFolder tempFolder =
+      new TemporaryFolder(new File(System.getProperty("user.dir") + "/target"));
+
+  private static final RFileTest.TestRFile trf = new RFileTest.TestRFile(null);
+  private static String fileName;
+
+  /**
+   * Creates a test file with 84 bytes of data and 2 Locality groups.
+   */
+  @BeforeClass
+  public static void createFile() throws IOException {
+    trf.openWriter(false);
+    trf.writer.startNewLocalityGroup("lg1", newColFamByteSequence("cf1", "cf2"));
+    trf.writer.append(newKey("r1", "cf1", "cq1", "L1", 55), newValue("foo1"));
+    trf.writer.append(newKey("r2", "cf2", "cq1", "L1", 55), newValue("foo2"));
+    trf.writer.append(newKey("r3", "cf2", "cq1", "L1", 55), newValue("foo3"));
+    trf.writer.startNewLocalityGroup("lg2", newColFamByteSequence("cf3", "cf4"));
+    trf.writer.append(newKey("r4", "cf3", "cq1", "L1", 55), newValue("foo4"));
+    trf.writer.append(newKey("r5", "cf4", "cq1", "L1", 55), newValue("foo5"));
+    trf.writer.append(newKey("r6", "cf4", "cq1", "L1", 55), newValue("foo6"));
+    trf.closeWriter();
+
+    File file = tempFolder.newFile("testGenerateSplits.rf");
+    FileOutputStream fileOutputStream = new FileOutputStream(file);
+    fileOutputStream.write(trf.baos.toByteArray());
+    fileOutputStream.flush();
+    fileOutputStream.close();
+
+    fileName = file.getAbsolutePath();
+    log.info("Wrote to file {}", fileName);
+  }
+
+  @AfterClass
+  public static void cleanUp() {
+    File file = new File(fileName);
+    if (file.delete())
+      log.info("Cleaned up test file {}", fileName);
+  }
+
+  @Test
+  public void testNum() throws Exception {
+    List<String> args = List.of(fileName, "--num", "2");
+    log.info("Invoking GenerateSplits with {}", args);
+    PrintStream oldOut = System.out;
+    try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        PrintStream newOut = new PrintStream(baos)) {
+      System.setOut(newOut);

Review comment:
       Changing the System output stream isn't a great idea, because it can mask output, such as the error messages printed to the console from failed assert statements.
   
   A better solution, is to design the code to be testable by having a PrintStream parameter in the constructor. The main method will pass System.out, but for testing, you can pass something else. So, main just looks like:
   
   ```java
      public static void main(String[] args) {
        new GenerateSplits(System.out).execute(args);
      }
   ```
   
   Alternatively, use the `-sf` parameter you made to create a file, and then inspect the file.

##########
File path: core/src/test/java/org/apache/accumulo/core/file/rfile/GenerateSplitsTest.java
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import static org.apache.accumulo.core.file.rfile.GenerateSplits.main;
+import static org.apache.accumulo.core.file.rfile.RFileTest.newColFamByteSequence;
+import static org.apache.accumulo.core.file.rfile.RFileTest.newKey;
+import static org.apache.accumulo.core.file.rfile.RFileTest.newValue;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.List;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "paths provided by test")
+public class GenerateSplitsTest {
+  private static final Logger log = LoggerFactory.getLogger(GenerateSplitsTest.class);
+
+  @ClassRule
+  public static final TemporaryFolder tempFolder =
+      new TemporaryFolder(new File(System.getProperty("user.dir") + "/target"));
+
+  private static final RFileTest.TestRFile trf = new RFileTest.TestRFile(null);
+  private static String fileName;
+
+  /**
+   * Creates a test file with 84 bytes of data and 2 Locality groups.
+   */
+  @BeforeClass
+  public static void createFile() throws IOException {
+    trf.openWriter(false);
+    trf.writer.startNewLocalityGroup("lg1", newColFamByteSequence("cf1", "cf2"));
+    trf.writer.append(newKey("r1", "cf1", "cq1", "L1", 55), newValue("foo1"));
+    trf.writer.append(newKey("r2", "cf2", "cq1", "L1", 55), newValue("foo2"));
+    trf.writer.append(newKey("r3", "cf2", "cq1", "L1", 55), newValue("foo3"));
+    trf.writer.startNewLocalityGroup("lg2", newColFamByteSequence("cf3", "cf4"));
+    trf.writer.append(newKey("r4", "cf3", "cq1", "L1", 55), newValue("foo4"));
+    trf.writer.append(newKey("r5", "cf4", "cq1", "L1", 55), newValue("foo5"));
+    trf.writer.append(newKey("r6", "cf4", "cq1", "L1", 55), newValue("foo6"));
+    trf.closeWriter();
+
+    File file = tempFolder.newFile("testGenerateSplits.rf");
+    FileOutputStream fileOutputStream = new FileOutputStream(file);
+    fileOutputStream.write(trf.baos.toByteArray());
+    fileOutputStream.flush();
+    fileOutputStream.close();
+
+    fileName = file.getAbsolutePath();
+    log.info("Wrote to file {}", fileName);
+  }
+
+  @AfterClass
+  public static void cleanUp() {
+    File file = new File(fileName);
+    if (file.delete())
+      log.info("Cleaned up test file {}", fileName);

Review comment:
       Can usually do something like this in a one-liner, since either case are acceptable, and we probably don't need the spammy log message. We just need to know if something unexpected happened, which the following will do.
   
   ```suggestion
       assertTrue(file.delete() || !file.exists());
   ```
   
   However, I'm thinking this entire cleanup method isn't necessary at all. The file is created inside the temporary folder. By definition, it will be cleaned up when that folder is cleaned up after the tests.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] ctubbsii commented on pull request #2368: Create GenerateSplits utility

Posted by GitBox <gi...@apache.org>.
ctubbsii commented on pull request #2368:
URL: https://github.com/apache/accumulo/pull/2368#issuecomment-990224708


   > @keith-turner @ctubbsii I think this is OK to merge. Were you OK with the new dependency?
   
   New dependency works for me. Feel free to merge if you think it's ready. I skimmed, but didn't do an in-depth check of the implementation. The user experience from the options looks fine to me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2368: Create GenerateSplits utility

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2368:
URL: https://github.com/apache/accumulo/pull/2368#discussion_r764958481



##########
File path: core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java
##########
@@ -201,7 +201,9 @@ public void execute(String[] args) throws Exception {
       itemsSketch.update(row);
       iterator.next();
     }
-    return itemsSketch.getQuantiles(numSplits);
+    Text[] items = itemsSketch.getQuantiles(numSplits + 2);

Review comment:
       At that point all we can do is throw an error. I think checking the different outputs of the Datasketches API is a bit out of scope for this utility.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2368: Create GenerateSplits utility

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2368:
URL: https://github.com/apache/accumulo/pull/2368#discussion_r758595110



##########
File path: core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.BufferedWriter;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.cli.ClientOpts;
+import org.apache.accumulo.core.cli.ConfigOpts;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.crypto.CryptoServiceFactory;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.accumulo.start.spi.KeywordExecutable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.google.auto.service.AutoService;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@AutoService(KeywordExecutable.class)
+@SuppressFBWarnings(value = "PATH_TRAVERSAL_OUT",
+    justification = "app is run in same security context as user providing the filename")
+public class GenerateSplits implements KeywordExecutable {
+  private static final Logger log = LoggerFactory.getLogger(GenerateSplits.class);
+
+  static class Opts extends ConfigOpts {
+    @Parameter(names = {"-a", "--auths"}, converter = ClientOpts.AuthConverter.class,
+        description = "the authorizations to use when reading the files")
+    public Authorizations auths = Authorizations.EMPTY;
+
+    @Parameter(names = {"-n", "--num"}, description = "The number of splits to generate")
+    public long numSplits = 0;
+
+    @Parameter(names = {"-ss", "--split-size"}, description = "The split size desired in bytes")
+    public long splitSize = 0;
+
+    @Parameter(names = {"-b64", "--base64encoded"}, description = "Base 64 encode the split points")
+    public boolean base64encode = false;
+
+    @Parameter(names = {"-sf", "--splits-file"}, description = "Output the splits to a file")
+    public String outputFile;
+
+    @Parameter(description = " <file|directory> { <file> ... }")
+    public List<String> files = new ArrayList<>();
+  }
+
+  @Override
+  public String keyword() {
+    return "generate-splits";
+  }
+
+  @Override
+  public String description() {
+    return "Generate split points from a set of 1 or more rfiles";
+  }
+
+  public static void main(String[] args) throws Exception {
+    new GenerateSplits().execute(args);
+  }
+
+  @Override
+  public void execute(String[] args) throws Exception {
+    Opts opts = new Opts();
+    opts.parseArgs(GenerateSplits.class.getName(), args);
+    if (opts.files.isEmpty()) {
+      throw new IllegalArgumentException("No files were given");
+    }
+
+    Configuration hadoopConf = new Configuration();
+    SiteConfiguration siteConf = opts.getSiteConfiguration();
+    boolean encode = opts.base64encode;
+
+    TreeSet<String> splits;
+    TreeSet<String> desiredSplits = new TreeSet<>();
+
+    if (opts.numSplits > 0 && opts.splitSize > 0) {
+      throw new IllegalArgumentException("Requested number of splits and split size.");
+    }
+    if (opts.numSplits == 0 && opts.splitSize == 0) {
+      throw new IllegalArgumentException("Required number of splits or split size.");
+    }
+    long numSplits = opts.numSplits;
+    long splitSize = opts.splitSize;
+
+    FileSystem fs = FileSystem.get(hadoopConf);
+    FileSystem localFs = FileSystem.getLocal(hadoopConf);
+    List<Path> filePaths = new ArrayList<>();
+    for (String file : opts.files) {
+      Path path = new Path(file);
+      if (file.contains(":")) {
+        fs = path.getFileSystem(hadoopConf);
+      } else {
+        log.warn("Attempting to find file across filesystems. Consider providing URI "
+            + "instead of path");
+        if (!fs.exists(path))
+          fs = localFs; // fall back to local
+      }
+      // get all the files in the directory
+      if (fs.getFileStatus(path).isDirectory()) {
+        // can only explode one directory
+        if (opts.files.size() > 1)
+          throw new IllegalArgumentException("Only one directory can be specified");
+        var iter = fs.listFiles(path, true);
+        while (iter.hasNext()) {
+          filePaths.add(iter.next().getPath());
+        }
+      } else {
+        filePaths.add(path);
+      }
+    }
+
+    // if no size specified look at indexed keys first
+    if (opts.splitSize == 0) {
+      splits = getIndexKeys(siteConf, hadoopConf, fs, filePaths, encode);
+      // if there weren't enough splits indexed, try again with size = 0
+      if (splits.size() < numSplits) {
+        log.info("Only found {} indexed keys but need {}. Doing a full scan on files {}",
+            splits.size(), numSplits, filePaths);
+        splits = getSplitsBySize(siteConf, hadoopConf, filePaths, fs, 0, encode);
+      }
+    } else {
+      splits = getSplitsBySize(siteConf, hadoopConf, filePaths, fs, splitSize, encode);
+    }
+
+    int numFound = splits.size();
+    // its possible we found too many indexed so take every (numFound / numSplits) split
+    if (opts.splitSize == 0 && numFound > numSplits) {
+      var iter = splits.iterator();
+      long factor = numFound / numSplits;
+      log.debug("Found {} splits but requested {} picking 1 every {}", numFound, opts.numSplits,
+          factor);
+      for (int i = 0; i < numFound; i++) {
+        String next = iter.next();
+        if (i % factor == 0 && desiredSplits.size() < numSplits) {
+          desiredSplits.add(next);
+        }
+      }

Review comment:
       I opened #2371 for a possible bug w/ the listSplits table operation. Thinking if there is a slight bug w/ that code that it could possibly use the approach above, but not sure.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2368: Create GenerateSplits utility

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2368:
URL: https://github.com/apache/accumulo/pull/2368#discussion_r758775371



##########
File path: core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java
##########
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.BufferedWriter;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.cli.ClientOpts;
+import org.apache.accumulo.core.cli.ConfigOpts;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.crypto.CryptoServiceFactory;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.accumulo.start.spi.KeywordExecutable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.google.auto.service.AutoService;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@AutoService(KeywordExecutable.class)
+@SuppressFBWarnings(value = "PATH_TRAVERSAL_OUT",
+    justification = "app is run in same security context as user providing the filename")
+public class GenerateSplits implements KeywordExecutable {
+  private static final Logger log = LoggerFactory.getLogger(GenerateSplits.class);
+
+  static class Opts extends ConfigOpts {
+    @Parameter(names = {"-a", "--auths"}, converter = ClientOpts.AuthConverter.class,
+        description = "the authorizations to use when reading the files")
+    public Authorizations auths = Authorizations.EMPTY;
+
+    @Parameter(names = {"-n", "--num"},
+        description = "The number of splits to generate. Cannot use with the split size option.")
+    public long numSplits = 0;
+
+    @Parameter(names = {"-ss", "--split-size"},
+        description = "The split size desired in bytes. Cannot use with num splits option.")
+    public long splitSize = 0;
+
+    @Parameter(names = {"-b64", "--base64encoded"}, description = "Base 64 encode the split points")
+    public boolean base64encode = false;
+
+    @Parameter(names = {"-sf", "--splits-file"}, description = "Output the splits to a file")
+    public String outputFile;
+
+    @Parameter(description = " <file|directory> { <file> ... }")
+    public List<String> files = new ArrayList<>();
+  }
+
+  @Override
+  public String keyword() {
+    return "generate-splits";
+  }
+
+  @Override
+  public String description() {
+    return "Generate split points from a set of 1 or more rfiles";
+  }
+
+  public static void main(String[] args) throws Exception {
+    new GenerateSplits().execute(args);
+  }
+
+  @Override
+  public void execute(String[] args) throws Exception {
+    Opts opts = new Opts();
+    opts.parseArgs(GenerateSplits.class.getName(), args);
+    if (opts.files.isEmpty()) {
+      throw new IllegalArgumentException("No files were given");
+    }
+
+    Configuration hadoopConf = new Configuration();
+    SiteConfiguration siteConf = opts.getSiteConfiguration();
+    boolean encode = opts.base64encode;
+
+    TreeSet<String> splits;
+    TreeSet<String> desiredSplits = new TreeSet<>();
+
+    if (opts.numSplits > 0 && opts.splitSize > 0) {
+      throw new IllegalArgumentException("Requested number of splits and split size.");
+    }
+    if (opts.numSplits == 0 && opts.splitSize == 0) {
+      throw new IllegalArgumentException("Required number of splits or split size.");
+    }
+    long numSplits = opts.numSplits;
+    long splitSize = opts.splitSize;
+
+    FileSystem fs = FileSystem.get(hadoopConf);
+    FileSystem localFs = FileSystem.getLocal(hadoopConf);
+    List<Path> filePaths = new ArrayList<>();
+    for (String file : opts.files) {
+      Path path = new Path(file);
+      if (file.contains(":")) {
+        fs = path.getFileSystem(hadoopConf);
+      } else {
+        log.warn("Attempting to find file across filesystems. Consider providing URI "
+            + "instead of path");
+        if (!fs.exists(path))
+          fs = localFs; // fall back to local
+      }
+      // get all the files in the directory
+      if (fs.getFileStatus(path).isDirectory()) {
+        // can only explode one directory
+        if (opts.files.size() > 1)
+          throw new IllegalArgumentException("Only one directory can be specified");
+        var iter = fs.listFiles(path, true);
+        while (iter.hasNext()) {
+          filePaths.add(iter.next().getPath());
+        }
+      } else {
+        filePaths.add(path);
+      }
+    }
+
+    // if no size specified look at indexed keys first
+    if (opts.splitSize == 0) {
+      splits = getIndexKeys(siteConf, hadoopConf, fs, filePaths, encode);
+      // if there weren't enough splits indexed, try again with size = 0
+      if (splits.size() < numSplits) {
+        log.info("Only found {} indexed keys but need {}. Doing a full scan on files {}",
+            splits.size(), numSplits, filePaths);
+        splits = getSplitsBySize(siteConf, hadoopConf, filePaths, fs, 0, encode);
+      }
+    } else {
+      splits = getSplitsBySize(siteConf, hadoopConf, filePaths, fs, splitSize, encode);
+    }
+
+    int numFound = splits.size();
+    // its possible we found too many indexed so take every (numFound / numSplits) split
+    if (opts.splitSize == 0 && numFound > numSplits) {
+      var iter = splits.iterator();
+      double targetFactor = (double) numFound / numSplits;
+      log.debug("Found {} splits but requested {} picking 1 every {}", numFound, opts.numSplits,
+          targetFactor);
+
+      for (int i = 0; i < numFound; i++) {
+        double currFactor = (double) (i + 1) / desiredSplits.size();
+        String next = iter.next();
+        // unsure if this should be currFactor >= targetFactor
+        if (currFactor > targetFactor && desiredSplits.size() < numSplits) {
+          desiredSplits.add(next);
+        }
+      }

Review comment:
       The following fixes some of the issues I mentioned.  Its modeled after the code in TabletOperations.listSplits w/o the suspicious while loop.
   
   ```suggestion
         var iter = splits.iterator();
         // This is how much each of the found rows will advances twoards a desired split point.  Add one to numSplits because if we request 9 splits, there will 10 tablets and we want the 9 splits evenly spaced between the 10 tablets.
         double increment = (numSplits+1.0)/numFound;
         log.debug("Found {} splits but requested {} picking 1 every {}", numFound, opts.numSplits,
             targetFactor);
   
          // Tracks how far along we are twoards the next split.
          double total = 0;
   
         for (int i = 0; i < numFound; i++) {
           total += increment;
           String next = iter.next();
           if (total> 1 && desiredSplits.size() < numSplits) {
             desiredSplits.add(next);
             total -= 1;
           }
         }
   ```
   
   I created the following little program locally to play with the code and it seems good.
   
   ```java
   public class Test {
     public static void main(String[] args) {
       printSplits(100, 9);
       System.out.println();
       printSplits(15, 10);
       System.out.println();
       printSplits(10, 10);
     }
   
     private static void printSplits(int numFound, int numSplits) {
       double increment = (numSplits+1.0)/numFound;
   
       System.out.println("increment "+increment);
       
       int added = 0;
   
       double total = 0;
       
       for (int i = 0; i < numFound; i++) {
        total += increment;
       
         if (total > 1 && added < numSplits) {
           System.out.println(i+" "+added +" "+total);
           added++;
           total -= 1;
         }
       }
     }
   }
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2368: Create GenerateSplits utility

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2368:
URL: https://github.com/apache/accumulo/pull/2368#discussion_r758680156



##########
File path: core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.BufferedWriter;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.cli.ClientOpts;
+import org.apache.accumulo.core.cli.ConfigOpts;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.crypto.CryptoServiceFactory;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.accumulo.start.spi.KeywordExecutable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.google.auto.service.AutoService;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@AutoService(KeywordExecutable.class)
+@SuppressFBWarnings(value = "PATH_TRAVERSAL_OUT",
+    justification = "app is run in same security context as user providing the filename")
+public class GenerateSplits implements KeywordExecutable {
+  private static final Logger log = LoggerFactory.getLogger(GenerateSplits.class);
+
+  static class Opts extends ConfigOpts {
+    @Parameter(names = {"-a", "--auths"}, converter = ClientOpts.AuthConverter.class,
+        description = "the authorizations to use when reading the files")
+    public Authorizations auths = Authorizations.EMPTY;
+
+    @Parameter(names = {"-n", "--num"}, description = "The number of splits to generate")
+    public long numSplits = 0;
+
+    @Parameter(names = {"-ss", "--split-size"}, description = "The split size desired in bytes")
+    public long splitSize = 0;
+
+    @Parameter(names = {"-b64", "--base64encoded"}, description = "Base 64 encode the split points")
+    public boolean base64encode = false;
+
+    @Parameter(names = {"-sf", "--splits-file"}, description = "Output the splits to a file")
+    public String outputFile;
+
+    @Parameter(description = " <file|directory> { <file> ... }")
+    public List<String> files = new ArrayList<>();
+  }
+
+  @Override
+  public String keyword() {
+    return "generate-splits";
+  }
+
+  @Override
+  public String description() {
+    return "Generate split points from a set of 1 or more rfiles";
+  }
+
+  public static void main(String[] args) throws Exception {
+    new GenerateSplits().execute(args);
+  }
+
+  @Override
+  public void execute(String[] args) throws Exception {
+    Opts opts = new Opts();
+    opts.parseArgs(GenerateSplits.class.getName(), args);
+    if (opts.files.isEmpty()) {
+      throw new IllegalArgumentException("No files were given");
+    }
+
+    Configuration hadoopConf = new Configuration();
+    SiteConfiguration siteConf = opts.getSiteConfiguration();
+    boolean encode = opts.base64encode;
+
+    TreeSet<String> splits;
+    TreeSet<String> desiredSplits = new TreeSet<>();
+
+    if (opts.numSplits > 0 && opts.splitSize > 0) {
+      throw new IllegalArgumentException("Requested number of splits and split size.");
+    }
+    if (opts.numSplits == 0 && opts.splitSize == 0) {
+      throw new IllegalArgumentException("Required number of splits or split size.");
+    }
+    long numSplits = opts.numSplits;
+    long splitSize = opts.splitSize;
+
+    FileSystem fs = FileSystem.get(hadoopConf);
+    FileSystem localFs = FileSystem.getLocal(hadoopConf);
+    List<Path> filePaths = new ArrayList<>();
+    for (String file : opts.files) {
+      Path path = new Path(file);
+      if (file.contains(":")) {
+        fs = path.getFileSystem(hadoopConf);
+      } else {
+        log.warn("Attempting to find file across filesystems. Consider providing URI "
+            + "instead of path");
+        if (!fs.exists(path))
+          fs = localFs; // fall back to local
+      }
+      // get all the files in the directory
+      if (fs.getFileStatus(path).isDirectory()) {
+        // can only explode one directory
+        if (opts.files.size() > 1)
+          throw new IllegalArgumentException("Only one directory can be specified");
+        var iter = fs.listFiles(path, true);
+        while (iter.hasNext()) {
+          filePaths.add(iter.next().getPath());
+        }
+      } else {
+        filePaths.add(path);
+      }
+    }
+
+    // if no size specified look at indexed keys first
+    if (opts.splitSize == 0) {
+      splits = getIndexKeys(siteConf, hadoopConf, fs, filePaths, encode);
+      // if there weren't enough splits indexed, try again with size = 0
+      if (splits.size() < numSplits) {
+        log.info("Only found {} indexed keys but need {}. Doing a full scan on files {}",
+            splits.size(), numSplits, filePaths);
+        splits = getSplitsBySize(siteConf, hadoopConf, filePaths, fs, 0, encode);
+      }
+    } else {
+      splits = getSplitsBySize(siteConf, hadoopConf, filePaths, fs, splitSize, encode);
+    }
+
+    int numFound = splits.size();
+    // its possible we found too many indexed so take every (numFound / numSplits) split
+    if (opts.splitSize == 0 && numFound > numSplits) {
+      var iter = splits.iterator();
+      long factor = numFound / numSplits;
+      log.debug("Found {} splits but requested {} picking 1 every {}", numFound, opts.numSplits,
+          factor);
+      for (int i = 0; i < numFound; i++) {
+        String next = iter.next();
+        if (i % factor == 0 && desiredSplits.size() < numSplits) {
+          desiredSplits.add(next);
+        }
+      }

Review comment:
       I tested your scenario and you are correct:
   <pre>
   2021-11-29T14:36:00,483 [rfile.GenerateSplits] DEBUG: Found 150 splits but requested 100 picking 1 every 1
   2021-11-29T14:36:00,484 [rfile.GenerateSplits] INFO : Generated 100 splits
   row_0000000000
   row_0000000001
   row_0000000002
   row_0000000003
   row_0000000004
   ...
   </pre>
   It will then select the first 100 splits it sees. I will do some testing of your suggestion for using two floating point values for measuring the splits.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] ctubbsii commented on a change in pull request #2368: Create GenerateSplits utility

Posted by GitBox <gi...@apache.org>.
ctubbsii commented on a change in pull request #2368:
URL: https://github.com/apache/accumulo/pull/2368#discussion_r756575053



##########
File path: core/src/test/java/org/apache/accumulo/core/file/rfile/GenerateSplitsTest.java
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import static org.apache.accumulo.core.file.rfile.GenerateSplits.main;
+import static org.apache.accumulo.core.file.rfile.RFileTest.newColFamByteSequence;
+import static org.apache.accumulo.core.file.rfile.RFileTest.newKey;
+import static org.apache.accumulo.core.file.rfile.RFileTest.newValue;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.List;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "paths provided by test")
+public class GenerateSplitsTest {
+  private static final Logger log = LoggerFactory.getLogger(GenerateSplitsTest.class);
+
+  @ClassRule
+  public static final TemporaryFolder tempFolder =
+      new TemporaryFolder(new File(System.getProperty("user.dir") + "/target"));
+
+  private static final RFileTest.TestRFile trf = new RFileTest.TestRFile(null);
+  private static String fileName;
+
+  /**
+   * Creates a test file with 84 bytes of data and 2 Locality groups.
+   */
+  @BeforeClass
+  public static void createFile() throws IOException {
+    trf.openWriter(false);
+    trf.writer.startNewLocalityGroup("lg1", newColFamByteSequence("cf1", "cf2"));
+    trf.writer.append(newKey("r1", "cf1", "cq1", "L1", 55), newValue("foo1"));
+    trf.writer.append(newKey("r2", "cf2", "cq1", "L1", 55), newValue("foo2"));
+    trf.writer.append(newKey("r3", "cf2", "cq1", "L1", 55), newValue("foo3"));
+    trf.writer.startNewLocalityGroup("lg2", newColFamByteSequence("cf3", "cf4"));
+    trf.writer.append(newKey("r4", "cf3", "cq1", "L1", 55), newValue("foo4"));
+    trf.writer.append(newKey("r5", "cf4", "cq1", "L1", 55), newValue("foo5"));
+    trf.writer.append(newKey("r6", "cf4", "cq1", "L1", 55), newValue("foo6"));
+    trf.closeWriter();
+
+    File file = tempFolder.newFile("testGenerateSplits.rf");
+    FileOutputStream fileOutputStream = new FileOutputStream(file);
+    fileOutputStream.write(trf.baos.toByteArray());
+    fileOutputStream.flush();
+    fileOutputStream.close();
+
+    fileName = file.getAbsolutePath();
+    log.info("Wrote to file {}", fileName);
+  }
+
+  @AfterClass
+  public static void cleanUp() {
+    File file = new File(fileName);
+    if (file.delete())
+      log.info("Cleaned up test file {}", fileName);
+  }
+
+  @Test
+  public void testNum() throws Exception {
+    List<String> args = List.of(fileName, "--num", "2");
+    log.info("Invoking GenerateSplits with {}", args);
+    PrintStream oldOut = System.out;
+    try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        PrintStream newOut = new PrintStream(baos)) {
+      System.setOut(newOut);

Review comment:
       ReadWriteIT sets a bad precedence. You should not follow that precedence. Instead, PrintInfo should be updated to be more testable (like I suggest above), so that it doesn't need to do that. I checked and the only other place where we mess with the static System.out is in ShellConfigTest. We also do a similar thing with STDIN in PasswordConverterTest. Both of those should probably be fixed as well, if possible. Fixing any of those is outside the scope of this PR... I just want to make sure we don't repeat the pattern when it's avoidable.
   
   (FWIW, I can't be certain this causes the problems I've occasionally seen where I have to run `stty sane` to fix my terminal after a `mvn package` because my keystrokes aren't echo'd to the screen, but I wouldn't be surprised if it was related.)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2368: Create GenerateSplits utility

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2368:
URL: https://github.com/apache/accumulo/pull/2368#discussion_r758579182



##########
File path: core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.BufferedWriter;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.cli.ClientOpts;
+import org.apache.accumulo.core.cli.ConfigOpts;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.crypto.CryptoServiceFactory;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.accumulo.start.spi.KeywordExecutable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.google.auto.service.AutoService;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@AutoService(KeywordExecutable.class)
+@SuppressFBWarnings(value = "PATH_TRAVERSAL_OUT",
+    justification = "app is run in same security context as user providing the filename")
+public class GenerateSplits implements KeywordExecutable {
+  private static final Logger log = LoggerFactory.getLogger(GenerateSplits.class);
+
+  static class Opts extends ConfigOpts {
+    @Parameter(names = {"-a", "--auths"}, converter = ClientOpts.AuthConverter.class,
+        description = "the authorizations to use when reading the files")
+    public Authorizations auths = Authorizations.EMPTY;
+
+    @Parameter(names = {"-n", "--num"}, description = "The number of splits to generate")
+    public long numSplits = 0;
+
+    @Parameter(names = {"-ss", "--split-size"}, description = "The split size desired in bytes")
+    public long splitSize = 0;
+
+    @Parameter(names = {"-b64", "--base64encoded"}, description = "Base 64 encode the split points")
+    public boolean base64encode = false;
+
+    @Parameter(names = {"-sf", "--splits-file"}, description = "Output the splits to a file")
+    public String outputFile;
+
+    @Parameter(description = " <file|directory> { <file> ... }")
+    public List<String> files = new ArrayList<>();
+  }
+
+  @Override
+  public String keyword() {
+    return "generate-splits";
+  }
+
+  @Override
+  public String description() {
+    return "Generate split points from a set of 1 or more rfiles";
+  }
+
+  public static void main(String[] args) throws Exception {
+    new GenerateSplits().execute(args);
+  }
+
+  @Override
+  public void execute(String[] args) throws Exception {
+    Opts opts = new Opts();
+    opts.parseArgs(GenerateSplits.class.getName(), args);
+    if (opts.files.isEmpty()) {
+      throw new IllegalArgumentException("No files were given");
+    }
+
+    Configuration hadoopConf = new Configuration();
+    SiteConfiguration siteConf = opts.getSiteConfiguration();
+    boolean encode = opts.base64encode;
+
+    TreeSet<String> splits;
+    TreeSet<String> desiredSplits = new TreeSet<>();
+
+    if (opts.numSplits > 0 && opts.splitSize > 0) {
+      throw new IllegalArgumentException("Requested number of splits and split size.");
+    }
+    if (opts.numSplits == 0 && opts.splitSize == 0) {
+      throw new IllegalArgumentException("Required number of splits or split size.");
+    }
+    long numSplits = opts.numSplits;
+    long splitSize = opts.splitSize;
+
+    FileSystem fs = FileSystem.get(hadoopConf);
+    FileSystem localFs = FileSystem.getLocal(hadoopConf);
+    List<Path> filePaths = new ArrayList<>();
+    for (String file : opts.files) {
+      Path path = new Path(file);
+      if (file.contains(":")) {
+        fs = path.getFileSystem(hadoopConf);
+      } else {
+        log.warn("Attempting to find file across filesystems. Consider providing URI "
+            + "instead of path");
+        if (!fs.exists(path))
+          fs = localFs; // fall back to local
+      }
+      // get all the files in the directory
+      if (fs.getFileStatus(path).isDirectory()) {
+        // can only explode one directory
+        if (opts.files.size() > 1)
+          throw new IllegalArgumentException("Only one directory can be specified");
+        var iter = fs.listFiles(path, true);
+        while (iter.hasNext()) {
+          filePaths.add(iter.next().getPath());
+        }
+      } else {
+        filePaths.add(path);
+      }
+    }
+
+    // if no size specified look at indexed keys first
+    if (opts.splitSize == 0) {
+      splits = getIndexKeys(siteConf, hadoopConf, fs, filePaths, encode);
+      // if there weren't enough splits indexed, try again with size = 0
+      if (splits.size() < numSplits) {
+        log.info("Only found {} indexed keys but need {}. Doing a full scan on files {}",
+            splits.size(), numSplits, filePaths);
+        splits = getSplitsBySize(siteConf, hadoopConf, filePaths, fs, 0, encode);
+      }
+    } else {
+      splits = getSplitsBySize(siteConf, hadoopConf, filePaths, fs, splitSize, encode);
+    }
+
+    int numFound = splits.size();
+    // its possible we found too many indexed so take every (numFound / numSplits) split
+    if (opts.splitSize == 0 && numFound > numSplits) {
+      var iter = splits.iterator();
+      long factor = numFound / numSplits;
+      log.debug("Found {} splits but requested {} picking 1 every {}", numFound, opts.numSplits,
+          factor);
+      for (int i = 0; i < numFound; i++) {
+        String next = iter.next();
+        if (i % factor == 0 && desiredSplits.size() < numSplits) {
+          desiredSplits.add(next);
+        }
+      }

Review comment:
       This code also takes a subset of splits using floating point arithmetic. https://github.com/apache/accumulo/blob/45a4a93d2926760d6a916ae2da1d7e668ec4dcb1/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java#L693-L714




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2368: Create GenerateSplits utility

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2368:
URL: https://github.com/apache/accumulo/pull/2368#discussion_r759492146



##########
File path: core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java
##########
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.BufferedWriter;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.cli.ClientOpts;
+import org.apache.accumulo.core.cli.ConfigOpts;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.crypto.CryptoServiceFactory;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.accumulo.start.spi.KeywordExecutable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.google.auto.service.AutoService;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@AutoService(KeywordExecutable.class)
+@SuppressFBWarnings(value = "PATH_TRAVERSAL_OUT",
+    justification = "app is run in same security context as user providing the filename")
+public class GenerateSplits implements KeywordExecutable {
+  private static final Logger log = LoggerFactory.getLogger(GenerateSplits.class);
+
+  static class Opts extends ConfigOpts {
+    @Parameter(names = {"-a", "--auths"}, converter = ClientOpts.AuthConverter.class,
+        description = "the authorizations to use when reading the files")
+    public Authorizations auths = Authorizations.EMPTY;
+
+    @Parameter(names = {"-n", "--num"},
+        description = "The number of splits to generate. Cannot use with the split size option.")
+    public long numSplits = 0;
+
+    @Parameter(names = {"-ss", "--split-size"},
+        description = "The split size desired in bytes. Cannot use with num splits option.")
+    public long splitSize = 0;
+
+    @Parameter(names = {"-b64", "--base64encoded"}, description = "Base 64 encode the split points")
+    public boolean base64encode = false;
+
+    @Parameter(names = {"-sf", "--splits-file"}, description = "Output the splits to a file")
+    public String outputFile;
+
+    @Parameter(description = " <file|directory> { <file> ... }")
+    public List<String> files = new ArrayList<>();
+  }
+
+  @Override
+  public String keyword() {
+    return "generate-splits";
+  }
+
+  @Override
+  public String description() {
+    return "Generate split points from a set of 1 or more rfiles";
+  }
+
+  public static void main(String[] args) throws Exception {
+    new GenerateSplits().execute(args);
+  }
+
+  @Override
+  public void execute(String[] args) throws Exception {
+    Opts opts = new Opts();
+    opts.parseArgs(GenerateSplits.class.getName(), args);
+    if (opts.files.isEmpty()) {
+      throw new IllegalArgumentException("No files were given");
+    }
+
+    Configuration hadoopConf = new Configuration();
+    SiteConfiguration siteConf = opts.getSiteConfiguration();
+    boolean encode = opts.base64encode;
+
+    TreeSet<String> splits;
+    TreeSet<String> desiredSplits = new TreeSet<>();
+
+    if (opts.numSplits > 0 && opts.splitSize > 0) {
+      throw new IllegalArgumentException("Requested number of splits and split size.");
+    }
+    if (opts.numSplits == 0 && opts.splitSize == 0) {
+      throw new IllegalArgumentException("Required number of splits or split size.");
+    }
+    long numSplits = opts.numSplits;
+    long splitSize = opts.splitSize;
+
+    FileSystem fs = FileSystem.get(hadoopConf);
+    FileSystem localFs = FileSystem.getLocal(hadoopConf);
+    List<Path> filePaths = new ArrayList<>();
+    for (String file : opts.files) {
+      Path path = new Path(file);
+      if (file.contains(":")) {
+        fs = path.getFileSystem(hadoopConf);
+      } else {
+        log.warn("Attempting to find file across filesystems. Consider providing URI "
+            + "instead of path");
+        if (!fs.exists(path))
+          fs = localFs; // fall back to local
+      }
+      // get all the files in the directory
+      if (fs.getFileStatus(path).isDirectory()) {
+        // can only explode one directory
+        if (opts.files.size() > 1)
+          throw new IllegalArgumentException("Only one directory can be specified");
+        var iter = fs.listFiles(path, true);
+        while (iter.hasNext()) {
+          filePaths.add(iter.next().getPath());
+        }
+      } else {
+        filePaths.add(path);
+      }
+    }
+
+    // if no size specified look at indexed keys first
+    if (opts.splitSize == 0) {
+      splits = getIndexKeys(siteConf, hadoopConf, fs, filePaths, encode);
+      // if there weren't enough splits indexed, try again with size = 0
+      if (splits.size() < numSplits) {
+        log.info("Only found {} indexed keys but need {}. Doing a full scan on files {}",
+            splits.size(), numSplits, filePaths);
+        splits = getSplitsBySize(siteConf, hadoopConf, filePaths, fs, 0, encode);
+      }
+    } else {
+      splits = getSplitsBySize(siteConf, hadoopConf, filePaths, fs, splitSize, encode);
+    }
+
+    int numFound = splits.size();
+    // its possible we found too many indexed so take every (numFound / numSplits) split
+    if (opts.splitSize == 0 && numFound > numSplits) {
+      var iter = splits.iterator();
+      double targetFactor = (double) numFound / numSplits;
+      log.debug("Found {} splits but requested {} picking 1 every {}", numFound, opts.numSplits,
+          targetFactor);
+
+      for (int i = 0; i < numFound; i++) {
+        double currFactor = (double) (i + 1) / desiredSplits.size();
+        String next = iter.next();
+        // unsure if this should be currFactor >= targetFactor
+        if (currFactor > targetFactor && desiredSplits.size() < numSplits) {
+          desiredSplits.add(next);
+        }
+      }

Review comment:
       OK I understand it now. I added a comment, renamed some variables (I thought `progressToNextSplit` was more clear than `total`) and incorporated your test program into the unit test in c3d7205. Let me know if you can think of any other edge cases and I can add them to the test.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2368: Create GenerateSplits utility

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2368:
URL: https://github.com/apache/accumulo/pull/2368#discussion_r758778975



##########
File path: core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java
##########
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.BufferedWriter;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.cli.ClientOpts;
+import org.apache.accumulo.core.cli.ConfigOpts;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.crypto.CryptoServiceFactory;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.accumulo.start.spi.KeywordExecutable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.google.auto.service.AutoService;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@AutoService(KeywordExecutable.class)
+@SuppressFBWarnings(value = "PATH_TRAVERSAL_OUT",
+    justification = "app is run in same security context as user providing the filename")
+public class GenerateSplits implements KeywordExecutable {
+  private static final Logger log = LoggerFactory.getLogger(GenerateSplits.class);
+
+  static class Opts extends ConfigOpts {
+    @Parameter(names = {"-a", "--auths"}, converter = ClientOpts.AuthConverter.class,
+        description = "the authorizations to use when reading the files")
+    public Authorizations auths = Authorizations.EMPTY;
+
+    @Parameter(names = {"-n", "--num"},
+        description = "The number of splits to generate. Cannot use with the split size option.")
+    public long numSplits = 0;
+
+    @Parameter(names = {"-ss", "--split-size"},
+        description = "The split size desired in bytes. Cannot use with num splits option.")
+    public long splitSize = 0;
+
+    @Parameter(names = {"-b64", "--base64encoded"}, description = "Base 64 encode the split points")
+    public boolean base64encode = false;
+
+    @Parameter(names = {"-sf", "--splits-file"}, description = "Output the splits to a file")
+    public String outputFile;
+
+    @Parameter(description = " <file|directory> { <file> ... }")
+    public List<String> files = new ArrayList<>();
+  }
+
+  @Override
+  public String keyword() {
+    return "generate-splits";
+  }
+
+  @Override
+  public String description() {
+    return "Generate split points from a set of 1 or more rfiles";
+  }
+
+  public static void main(String[] args) throws Exception {
+    new GenerateSplits().execute(args);
+  }
+
+  @Override
+  public void execute(String[] args) throws Exception {
+    Opts opts = new Opts();
+    opts.parseArgs(GenerateSplits.class.getName(), args);
+    if (opts.files.isEmpty()) {
+      throw new IllegalArgumentException("No files were given");
+    }
+
+    Configuration hadoopConf = new Configuration();
+    SiteConfiguration siteConf = opts.getSiteConfiguration();
+    boolean encode = opts.base64encode;
+
+    TreeSet<String> splits;
+    TreeSet<String> desiredSplits = new TreeSet<>();
+
+    if (opts.numSplits > 0 && opts.splitSize > 0) {
+      throw new IllegalArgumentException("Requested number of splits and split size.");
+    }
+    if (opts.numSplits == 0 && opts.splitSize == 0) {
+      throw new IllegalArgumentException("Required number of splits or split size.");
+    }
+    long numSplits = opts.numSplits;
+    long splitSize = opts.splitSize;
+
+    FileSystem fs = FileSystem.get(hadoopConf);
+    FileSystem localFs = FileSystem.getLocal(hadoopConf);
+    List<Path> filePaths = new ArrayList<>();
+    for (String file : opts.files) {
+      Path path = new Path(file);
+      if (file.contains(":")) {
+        fs = path.getFileSystem(hadoopConf);
+      } else {
+        log.warn("Attempting to find file across filesystems. Consider providing URI "
+            + "instead of path");
+        if (!fs.exists(path))
+          fs = localFs; // fall back to local
+      }
+      // get all the files in the directory
+      if (fs.getFileStatus(path).isDirectory()) {
+        // can only explode one directory
+        if (opts.files.size() > 1)
+          throw new IllegalArgumentException("Only one directory can be specified");
+        var iter = fs.listFiles(path, true);
+        while (iter.hasNext()) {
+          filePaths.add(iter.next().getPath());
+        }
+      } else {
+        filePaths.add(path);
+      }
+    }
+
+    // if no size specified look at indexed keys first
+    if (opts.splitSize == 0) {
+      splits = getIndexKeys(siteConf, hadoopConf, fs, filePaths, encode);
+      // if there weren't enough splits indexed, try again with size = 0
+      if (splits.size() < numSplits) {
+        log.info("Only found {} indexed keys but need {}. Doing a full scan on files {}",
+            splits.size(), numSplits, filePaths);
+        splits = getSplitsBySize(siteConf, hadoopConf, filePaths, fs, 0, encode);
+      }
+    } else {
+      splits = getSplitsBySize(siteConf, hadoopConf, filePaths, fs, splitSize, encode);
+    }
+
+    int numFound = splits.size();
+    // its possible we found too many indexed so take every (numFound / numSplits) split
+    if (opts.splitSize == 0 && numFound > numSplits) {
+      var iter = splits.iterator();
+      double targetFactor = (double) numFound / numSplits;
+      log.debug("Found {} splits but requested {} picking 1 every {}", numFound, opts.numSplits,
+          targetFactor);
+
+      for (int i = 0; i < numFound; i++) {
+        double currFactor = (double) (i + 1) / desiredSplits.size();
+        String next = iter.next();
+        // unsure if this should be currFactor >= targetFactor
+        if (currFactor > targetFactor && desiredSplits.size() < numSplits) {

Review comment:
       I worked up a possible solution in another comment.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2368: Create GenerateSplits utility

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2368:
URL: https://github.com/apache/accumulo/pull/2368#discussion_r759334283



##########
File path: core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java
##########
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.BufferedWriter;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.cli.ClientOpts;
+import org.apache.accumulo.core.cli.ConfigOpts;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.crypto.CryptoServiceFactory;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.accumulo.start.spi.KeywordExecutable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.google.auto.service.AutoService;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@AutoService(KeywordExecutable.class)
+@SuppressFBWarnings(value = "PATH_TRAVERSAL_OUT",
+    justification = "app is run in same security context as user providing the filename")
+public class GenerateSplits implements KeywordExecutable {
+  private static final Logger log = LoggerFactory.getLogger(GenerateSplits.class);
+
+  static class Opts extends ConfigOpts {
+    @Parameter(names = {"-a", "--auths"}, converter = ClientOpts.AuthConverter.class,
+        description = "the authorizations to use when reading the files")
+    public Authorizations auths = Authorizations.EMPTY;
+
+    @Parameter(names = {"-n", "--num"},
+        description = "The number of splits to generate. Cannot use with the split size option.")
+    public long numSplits = 0;
+
+    @Parameter(names = {"-ss", "--split-size"},
+        description = "The split size desired in bytes. Cannot use with num splits option.")
+    public long splitSize = 0;
+
+    @Parameter(names = {"-b64", "--base64encoded"}, description = "Base 64 encode the split points")
+    public boolean base64encode = false;
+
+    @Parameter(names = {"-sf", "--splits-file"}, description = "Output the splits to a file")
+    public String outputFile;
+
+    @Parameter(description = " <file|directory> { <file> ... }")
+    public List<String> files = new ArrayList<>();
+  }
+
+  @Override
+  public String keyword() {
+    return "generate-splits";
+  }
+
+  @Override
+  public String description() {
+    return "Generate split points from a set of 1 or more rfiles";
+  }
+
+  public static void main(String[] args) throws Exception {
+    new GenerateSplits().execute(args);
+  }
+
+  @Override
+  public void execute(String[] args) throws Exception {
+    Opts opts = new Opts();
+    opts.parseArgs(GenerateSplits.class.getName(), args);
+    if (opts.files.isEmpty()) {
+      throw new IllegalArgumentException("No files were given");
+    }
+
+    Configuration hadoopConf = new Configuration();
+    SiteConfiguration siteConf = opts.getSiteConfiguration();
+    boolean encode = opts.base64encode;
+
+    TreeSet<String> splits;
+    TreeSet<String> desiredSplits = new TreeSet<>();
+
+    if (opts.numSplits > 0 && opts.splitSize > 0) {
+      throw new IllegalArgumentException("Requested number of splits and split size.");
+    }
+    if (opts.numSplits == 0 && opts.splitSize == 0) {
+      throw new IllegalArgumentException("Required number of splits or split size.");
+    }
+    long numSplits = opts.numSplits;
+    long splitSize = opts.splitSize;
+
+    FileSystem fs = FileSystem.get(hadoopConf);
+    FileSystem localFs = FileSystem.getLocal(hadoopConf);
+    List<Path> filePaths = new ArrayList<>();
+    for (String file : opts.files) {
+      Path path = new Path(file);
+      if (file.contains(":")) {
+        fs = path.getFileSystem(hadoopConf);
+      } else {
+        log.warn("Attempting to find file across filesystems. Consider providing URI "
+            + "instead of path");
+        if (!fs.exists(path))
+          fs = localFs; // fall back to local
+      }
+      // get all the files in the directory
+      if (fs.getFileStatus(path).isDirectory()) {
+        // can only explode one directory
+        if (opts.files.size() > 1)
+          throw new IllegalArgumentException("Only one directory can be specified");
+        var iter = fs.listFiles(path, true);
+        while (iter.hasNext()) {
+          filePaths.add(iter.next().getPath());
+        }
+      } else {
+        filePaths.add(path);
+      }
+    }
+
+    // if no size specified look at indexed keys first
+    if (opts.splitSize == 0) {
+      splits = getIndexKeys(siteConf, hadoopConf, fs, filePaths, encode);
+      // if there weren't enough splits indexed, try again with size = 0
+      if (splits.size() < numSplits) {
+        log.info("Only found {} indexed keys but need {}. Doing a full scan on files {}",
+            splits.size(), numSplits, filePaths);
+        splits = getSplitsBySize(siteConf, hadoopConf, filePaths, fs, 0, encode);
+      }
+    } else {
+      splits = getSplitsBySize(siteConf, hadoopConf, filePaths, fs, splitSize, encode);
+    }
+
+    int numFound = splits.size();
+    // its possible we found too many indexed so take every (numFound / numSplits) split
+    if (opts.splitSize == 0 && numFound > numSplits) {
+      var iter = splits.iterator();
+      double targetFactor = (double) numFound / numSplits;
+      log.debug("Found {} splits but requested {} picking 1 every {}", numFound, opts.numSplits,
+          targetFactor);
+
+      for (int i = 0; i < numFound; i++) {
+        double currFactor = (double) (i + 1) / desiredSplits.size();
+        String next = iter.next();
+        // unsure if this should be currFactor >= targetFactor
+        if (currFactor > targetFactor && desiredSplits.size() < numSplits) {
+          desiredSplits.add(next);
+        }
+      }

Review comment:
       Ignore that last message I was testing with a change I made to set total = 0.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on pull request #2368: Create GenerateSplits utility

Posted by GitBox <gi...@apache.org>.
milleruntime commented on pull request #2368:
URL: https://github.com/apache/accumulo/pull/2368#issuecomment-983938428


   @keith-turner my last update uses a DataSketches library to get the splits from indices and full scan, which should be more  efficient than having to read all rows into memory.  But in one of the tests, it does split the data differently. The unit test which asks for 4 splits out of the 6, was returning `r2, r3, r4, r5` for your algorithm, but produces `r1, r3, r5, r6` with the DataSketches library. Based on your comments, it sounded like you were avoiding selecting the first split, why would that be? And can you think of a reason why one set splits would be preferred over the other?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on pull request #2368: Create GenerateSplits utility

Posted by GitBox <gi...@apache.org>.
keith-turner commented on pull request #2368:
URL: https://github.com/apache/accumulo/pull/2368#issuecomment-983993160


   >I am going to take a look at the Apache Datasketches project, starting with the quantiles in ItemsSketch: https://datasketches.apache.org/api/java/snapshot/apidocs/org/apache/datasketches/quantiles/ItemsSketch.html#getQuantiles-int-
   
   @milleruntime  looking at the javadoc for the above it says
   
   > evenlySpaced - an integer that specifies the number of evenly spaced fractional ranks. This must be a positive integer greater than 1. A value of 2 will return the min and the max value. A value of 3 will return the min, the median and the max value, etc.
   
   The above behavior is not what we want. If a user request one split, we probably want the source row that is around 50% of the way through (1 is unsupported by the sketch).  If a user request 2 splits, we probably want the source rows that are around 33% and 66% of the way through (not the min and max returned by the sketch).  If a user request 3 splits, the we probably want the source rows that are around 25%,50%, and 75% of the way through (not the min, max, and median as returned by the sketch).
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime merged pull request #2368: Create GenerateSplits utility

Posted by GitBox <gi...@apache.org>.
milleruntime merged pull request #2368:
URL: https://github.com/apache/accumulo/pull/2368


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on pull request #2368: Create GenerateSplits utility

Posted by GitBox <gi...@apache.org>.
milleruntime commented on pull request #2368:
URL: https://github.com/apache/accumulo/pull/2368#issuecomment-982007784


   > unsure if this should be currFactor >= targetFactor .. need to think through some edge cases
   
   To test your comment, I added a test to GenerateSplitsTest that passes `--num 4` using the 6 splits. Based on the results, I don't think we want the "=". With a comparison of `currFactor > targetFactor` we get the splits: `r1, r2, r4, r5`. But with a comparison of `currFactor >= targetFactor` we get the splits: `r1, r2, r3, r5`. It seems like we get a better spread using `currFactor > targetFactor`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on pull request #2368: Create GenerateSplits utility

Posted by GitBox <gi...@apache.org>.
keith-turner commented on pull request #2368:
URL: https://github.com/apache/accumulo/pull/2368#issuecomment-983983979


   > Based on your comments, it sounded like you were avoiding selecting the first split, why would that be? 
   
   That comment was based on a bug in the first cut of the selection alogrithm which always used the first source row.  For larger amounts of data this would not be desirable.  If there are 10,000 source rows from which you want to derive 100 splits, would not want to use the first source row as the first split.   Would want the ~100th source row for the first split.  For the case you are testing w/ 6 source rows and 4 desired split, it probably does not matter if the first source row is chosen.  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] ctubbsii edited a comment on pull request #2368: Create GenerateSplits utility

Posted by GitBox <gi...@apache.org>.
ctubbsii edited a comment on pull request #2368:
URL: https://github.com/apache/accumulo/pull/2368#issuecomment-983999498


   Also, we should be clear in the tool's usage/docs whether we return `n-1` splits that correspond to `n` requested tablets, or `n` splits that correspond to `n` requested splits that result in `n+1` tablets.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2368: Create GenerateSplits utility

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2368:
URL: https://github.com/apache/accumulo/pull/2368#discussion_r760564817



##########
File path: core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java
##########
@@ -201,7 +201,9 @@ public void execute(String[] args) throws Exception {
       itemsSketch.update(row);
       iterator.next();
     }
-    return itemsSketch.getQuantiles(numSplits);
+    Text[] items = itemsSketch.getQuantiles(numSplits + 2);

Review comment:
       ItemSketch has a method named getRetainedItems(), I wonder if this is the max that could be requested for getQuantiles()?  If so, what should be done if numSplits+2>itemsSketch.getRetainedItems()?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on pull request #2368: Create GenerateSplits utility

Posted by GitBox <gi...@apache.org>.
keith-turner commented on pull request #2368:
URL: https://github.com/apache/accumulo/pull/2368#issuecomment-984001118


   > Based on the behavior, we probably want to just drop the first and last items in the returned array, then.
   
   Yeah could try calling `getQuantiles(numSplits+2)` and see if the returned array discarding the first and last looks like what we want.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on pull request #2368: Create GenerateSplits utility

Posted by GitBox <gi...@apache.org>.
milleruntime commented on pull request #2368:
URL: https://github.com/apache/accumulo/pull/2368#issuecomment-984014038


   > Yeah could try calling `getQuantiles(numSplits+2)` and see if the returned array discarding the first and last looks like what we want.
   
   Good catch. I will give that a try.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2368: Create GenerateSplits utility

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2368:
URL: https://github.com/apache/accumulo/pull/2368#discussion_r756351745



##########
File path: core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java
##########
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.BufferedWriter;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.cli.ClientOpts;
+import org.apache.accumulo.core.cli.ConfigOpts;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.crypto.CryptoServiceFactory;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.accumulo.start.spi.KeywordExecutable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.google.auto.service.AutoService;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@AutoService(KeywordExecutable.class)
+@SuppressFBWarnings(value = "PATH_TRAVERSAL_OUT",
+    justification = "app is run in same security context as user providing the filename")
+public class GenerateSplits implements KeywordExecutable {
+  private static final Logger log = LoggerFactory.getLogger(GenerateSplits.class);
+
+  static class Opts extends ConfigOpts {
+    @Parameter(names = {"-a", "--auths"}, converter = ClientOpts.AuthConverter.class,
+        description = "the authorizations to use when reading the files")
+    public Authorizations auths = Authorizations.EMPTY;
+
+    @Parameter(names = {"-n", "--num"}, description = "The number of splits to generate")
+    public long numSplits = 0;
+
+    @Parameter(names = {"-ss", "--split-size"}, description = "The split size desired in bytes")
+    public long splitSize = 0;
+
+    @Parameter(names = {"-b64", "--base64encoded"}, description = "Base 64 encode the split points")
+    public boolean base64encode = false;
+
+    @Parameter(names = {"-sf", "--splits-file"}, description = "Output the splits to a file")
+    public String outputFile;
+
+    @Parameter(description = " <file|directory> { <file> ... }")
+    public List<String> files = new ArrayList<>();
+  }
+
+  @Override
+  public String keyword() {
+    return "generate-splits";
+  }
+
+  @Override
+  public String description() {
+    return "Generate split points from a set of 1 or more rfiles";
+  }
+
+  public static void main(String[] args) throws Exception {
+    new GenerateSplits().execute(args);
+  }
+
+  @Override
+  public void execute(String[] args) throws Exception {
+    Opts opts = new Opts();
+    opts.parseArgs(GenerateSplits.class.getName(), args);
+    if (opts.files.isEmpty()) {
+      throw new IllegalArgumentException("No files were given");
+    }
+
+    Configuration hadoopConf = new Configuration();
+    SiteConfiguration siteConf = opts.getSiteConfiguration();
+    boolean encode = opts.base64encode;
+
+    TreeSet<String> splits;
+    TreeSet<String> desiredSplits = new TreeSet<>();
+
+    if (opts.numSplits > 0 && opts.splitSize > 0) {
+      throw new IllegalArgumentException("Requested number of splits and split size.");
+    }
+    if (opts.numSplits == 0 && opts.splitSize == 0) {
+      throw new IllegalArgumentException("Required number of splits or split size.");
+    }
+    long numSplits = opts.numSplits;
+    long splitSize = opts.splitSize;
+
+    FileSystem fs = FileSystem.get(hadoopConf);
+    FileSystem localFs = FileSystem.getLocal(hadoopConf);
+    List<Path> filePaths = new ArrayList<>();
+    for (String file : opts.files) {
+      Path path = new Path(file);
+      if (file.contains(":")) {
+        fs = path.getFileSystem(hadoopConf);
+      } else {
+        log.warn("Attempting to find file across filesystems. Consider providing URI "
+            + "instead of path");
+        if (!fs.exists(path))
+          fs = localFs; // fall back to local
+      }
+      // get all the files in the directory
+      if (fs.getFileStatus(path).isDirectory()) {
+        // can only explode one directory
+        if (opts.files.size() > 1)
+          throw new IllegalArgumentException("Only one directory can be specified");
+        var iter = fs.listFiles(path, true);
+        while (iter.hasNext()) {
+          filePaths.add(iter.next().getPath());
+        }
+      } else {
+        filePaths.add(path);
+      }
+    }
+
+    // if no size specified look at indexed keys first
+    if (opts.splitSize == 0) {
+      splits = getIndexKeys(siteConf, hadoopConf, fs, filePaths, encode);
+      // if there weren't enough splits indexed, try again with size = 0
+      if (splits.size() < numSplits) {
+        log.info("Only found {} indexed keys but need {}. Doing a full scan on files {}",
+            splits.size(), numSplits, filePaths);
+        splits = getSplitsBySize(siteConf, hadoopConf, filePaths, fs, 0, encode);
+      }
+    } else {
+      splits = getSplitsBySize(siteConf, hadoopConf, filePaths, fs, splitSize, encode);
+    }
+
+    int numFound = splits.size();
+    // its possible we found too many indexed so take every (numFound / numSplits) split
+    if (opts.splitSize == 0 && numFound > numSplits) {
+      var iter = splits.iterator();
+      long factor = numFound / numSplits;
+      log.debug("Found {} splits but requested {} picking 1 every {}", numFound, opts.numSplits,
+          factor);
+      for (int i = 0; i < numFound; i++) {
+        String next = iter.next();
+        if (i % factor == 0 && desiredSplits.size() < numSplits) {
+          desiredSplits.add(next);
+        }
+      }
+    } else if (numFound < numSplits) {
+      log.warn("Only found " + numFound + " splits");
+      desiredSplits = splits;
+    } else {
+      desiredSplits = splits;
+    }
+
+    log.info("Generated {} splits", desiredSplits.size());
+    if (opts.outputFile != null) {
+      log.info("Writing splits to file {} ", opts.outputFile);
+      try (var writer = new PrintWriter(new BufferedWriter(
+          new OutputStreamWriter(new FileOutputStream(opts.outputFile), UTF_8)))) {
+        desiredSplits.forEach(writer::println);
+      }
+    } else {
+      desiredSplits.forEach(System.out::println);
+    }
+  }
+
+  private static String encode(boolean encode, Text text) {
+    if (text == null) {
+      return null;
+    }
+    byte[] bytes = TextUtil.getBytes(text);
+    if (encode)
+      return Base64.getEncoder().encodeToString(bytes);
+    else {
+      // drop non printable characters
+      StringBuilder sb = new StringBuilder();
+      for (byte aByte : bytes) {
+        int c = 0xff & aByte;
+        if (c == '\\')
+          sb.append("\\\\");
+        else if (c >= 32 && c <= 126)
+          sb.append((char) c);
+        else
+          log.debug("Dropping non printable char: \\x{}", Integer.toHexString(c));
+      }
+      return sb.toString();
+    }
+  }
+
+  /**
+   * Scan the files for indexed keys since it is more efficient than a full file scan.
+   */
+  private TreeSet<String> getIndexKeys(AccumuloConfiguration accumuloConf, Configuration hadoopConf,
+      FileSystem fs, List<Path> files, boolean base64encode) throws IOException {
+    TreeSet<String> indexKeys = new TreeSet<>();
+    List<SortedKeyValueIterator<Key,Value>> readers = new ArrayList<>(files.size());
+    List<FileSKVIterator> fileReaders = new ArrayList<>(files.size());
+
+    for (Path file : files) {
+      FileSKVIterator reader = FileOperations.getInstance().newIndexReaderBuilder()
+          .forFile(file.toString(), fs, hadoopConf, CryptoServiceFactory.newDefaultInstance())
+          .withTableConfiguration(accumuloConf).build();
+      readers.add(reader);
+      fileReaders.add(reader);
+    }
+    var iterator = new MultiIterator(readers, true);
+    while (iterator.hasTop()) {
+      Key key = iterator.getTopKey();
+      indexKeys.add(encode(base64encode, key.getRow()));
+      iterator.next();
+    }
+
+    for (var r : fileReaders) {
+      r.close();
+    }
+
+    log.debug("Got {} splits from indices of {}", indexKeys.size(), files);
+    return indexKeys;
+  }
+
+  /**
+   * Get number of splits based on requested size of split. A splitSize = 0 returns all keys.
+   */
+  private TreeSet<String> getSplitsBySize(AccumuloConfiguration accumuloConf,
+      Configuration hadoopConf, List<Path> files, FileSystem fs, long splitSize,
+      boolean base64encode) throws IOException {
+    TreeSet<String> splits = new TreeSet<>();
+    List<SortedKeyValueIterator<Key,Value>> readers = new ArrayList<>(files.size());
+    List<FileSKVIterator> fileReaders = new ArrayList<>(files.size());
+    SortedKeyValueIterator<Key,Value> iterator;
+
+    long currentSplitSize = 0;
+    long totalSize = 0;
+    for (Path file : files) {
+      FileSKVIterator reader = FileOperations.getInstance().newScanReaderBuilder()
+          .forFile(file.toString(), fs, hadoopConf, CryptoServiceFactory.newDefaultInstance())
+          .withTableConfiguration(accumuloConf).overRange(new Range(), Set.of(), false).build();
+      readers.add(reader);
+      fileReaders.add(reader);
+    }
+    iterator = new MultiIterator(readers, false);
+    iterator.seek(new Range(), Collections.emptySet(), false);
+    while (iterator.hasTop()) {
+      Key key = iterator.getTopKey();
+      Value val = iterator.getTopValue();
+      int size = key.getSize() + val.getSize();
+      currentSplitSize += size;
+      totalSize += size;
+      if (currentSplitSize > splitSize) {
+        splits.add(encode(base64encode, key.getRow()));
+        currentSplitSize = 0;
+      }
+      iterator.next();
+    }
+    for (var r : fileReaders) {
+      r.close();
+    }

Review comment:
       Fixed in 7513858




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] DomGarguilo commented on a change in pull request #2368: Create GenerateSplits utility

Posted by GitBox <gi...@apache.org>.
DomGarguilo commented on a change in pull request #2368:
URL: https://github.com/apache/accumulo/pull/2368#discussion_r756352895



##########
File path: core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.BufferedWriter;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.cli.ClientOpts;
+import org.apache.accumulo.core.cli.ConfigOpts;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.crypto.CryptoServiceFactory;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.accumulo.start.spi.KeywordExecutable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.google.auto.service.AutoService;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@AutoService(KeywordExecutable.class)
+@SuppressFBWarnings(value = "PATH_TRAVERSAL_OUT",
+    justification = "app is run in same security context as user providing the filename")
+public class GenerateSplits implements KeywordExecutable {
+  private static final Logger log = LoggerFactory.getLogger(GenerateSplits.class);
+
+  static class Opts extends ConfigOpts {
+    @Parameter(names = {"-a", "--auths"}, converter = ClientOpts.AuthConverter.class,
+        description = "the authorizations to use when reading the files")
+    public Authorizations auths = Authorizations.EMPTY;
+
+    @Parameter(names = {"-n", "--num"}, description = "The number of splits to generate")
+    public long numSplits = 0;
+
+    @Parameter(names = {"-ss", "--split-size"}, description = "The split size desired in bytes")
+    public long splitSize = 0;
+
+    @Parameter(names = {"-b64", "--base64encoded"}, description = "Base 64 encode the split points")
+    public boolean base64encode = false;
+
+    @Parameter(names = {"-sf", "--splits-file"}, description = "Output the splits to a file")
+    public String outputFile;
+
+    @Parameter(description = " <file|directory> { <file> ... }")
+    public List<String> files = new ArrayList<>();
+  }
+
+  @Override
+  public String keyword() {
+    return "generate-splits";
+  }
+
+  @Override
+  public String description() {
+    return "Generate split points from a set of 1 or more rfiles";
+  }
+
+  public static void main(String[] args) throws Exception {
+    new GenerateSplits().execute(args);
+  }
+
+  @Override
+  public void execute(String[] args) throws Exception {
+    Opts opts = new Opts();
+    opts.parseArgs(GenerateSplits.class.getName(), args);
+    if (opts.files.isEmpty()) {
+      throw new IllegalArgumentException("No files were given");
+    }
+
+    Configuration hadoopConf = new Configuration();
+    SiteConfiguration siteConf = opts.getSiteConfiguration();
+    boolean encode = opts.base64encode;
+
+    TreeSet<String> splits;
+    TreeSet<String> desiredSplits = new TreeSet<>();
+
+    if (opts.numSplits > 0 && opts.splitSize > 0) {
+      throw new IllegalArgumentException("Requested number of splits and split size.");
+    }
+    if (opts.numSplits == 0 && opts.splitSize == 0) {
+      throw new IllegalArgumentException("Required number of splits or split size.");
+    }
+    long numSplits = opts.numSplits;
+    long splitSize = opts.splitSize;
+
+    FileSystem fs = FileSystem.get(hadoopConf);
+    FileSystem localFs = FileSystem.getLocal(hadoopConf);
+    List<Path> filePaths = new ArrayList<>();
+    for (String file : opts.files) {
+      Path path = new Path(file);
+      if (file.contains(":")) {
+        fs = path.getFileSystem(hadoopConf);
+      } else {
+        log.warn("Attempting to find file across filesystems. Consider providing URI "
+            + "instead of path");
+        if (!fs.exists(path))
+          fs = localFs; // fall back to local
+      }
+      // get all the files in the directory
+      if (fs.getFileStatus(path).isDirectory()) {
+        // can only explode one directory
+        if (opts.files.size() > 1)
+          throw new IllegalArgumentException("Only one directory can be specified");
+        var iter = fs.listFiles(path, true);
+        while (iter.hasNext()) {
+          filePaths.add(iter.next().getPath());
+        }
+      } else {
+        filePaths.add(path);
+      }
+    }
+
+    // if no size specified look at indexed keys first
+    if (opts.splitSize == 0) {
+      splits = getIndexKeys(siteConf, hadoopConf, fs, filePaths, encode);
+      // if there weren't enough splits indexed, try again with size = 0
+      if (splits.size() < numSplits) {
+        log.info("Only found {} indexed keys but need {}. Doing a full scan on files {}",
+            splits.size(), numSplits, filePaths);
+        splits = getSplitsBySize(siteConf, hadoopConf, filePaths, fs, 0, encode);
+      }
+    } else {
+      splits = getSplitsBySize(siteConf, hadoopConf, filePaths, fs, splitSize, encode);
+    }
+
+    int numFound = splits.size();
+    // its possible we found too many indexed so take every (numFound / numSplits) split
+    if (opts.splitSize == 0 && numFound > numSplits) {
+      var iter = splits.iterator();
+      long factor = numFound / numSplits;
+      log.debug("Found {} splits but requested {} picking 1 every {}", numFound, opts.numSplits,
+          factor);
+      for (int i = 0; i < numFound; i++) {
+        String next = iter.next();
+        if (i % factor == 0 && desiredSplits.size() < numSplits) {
+          desiredSplits.add(next);
+        }
+      }
+    } else if (numFound < numSplits) {
+      log.warn("Only found " + numFound + " splits");
+      desiredSplits = splits;
+    } else {
+      desiredSplits = splits;
+    }

Review comment:
       ```suggestion
       } else {
         if (numFound < numSplits)
           log.warn("Only found " + numFound + " splits");
         desiredSplits = splits;
       }
   ```
   Not sure if this is any nicer but its a bit shorter.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org