You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by do...@apache.org on 2023/06/01 19:28:52 UTC

[accumulo-testing] branch main updated: Resolve build issues with accumulo 3.0 (#261)

This is an automated email from the ASF dual-hosted git repository.

domgarguilo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo-testing.git


The following commit(s) were added to refs/heads/main by this push:
     new 9a55b3b  Resolve build issues with accumulo 3.0 (#261)
9a55b3b is described below

commit 9a55b3b13566fff33435f30c11d4d86cf76bf153
Author: Dom G <do...@apache.org>
AuthorDate: Thu Jun 1 15:28:47 2023 -0400

    Resolve build issues with accumulo 3.0 (#261)
    
    * Remove 'legacy' bulk import usage and resolve other build errors
---
 docs/bulk-test.md                                  |  2 +-
 .../accumulo/testing/continuous/CreateTable.java   |  4 +--
 .../testing/ingest/BulkImportDirectory.java        | 15 ++-------
 .../testing/randomwalk/concurrent/Config.java      |  6 ----
 .../testing/randomwalk/multitable/BulkImport.java  | 28 ++++------------
 .../testing/randomwalk/shard/BulkInsert.java       | 39 ++++------------------
 6 files changed, 18 insertions(+), 76 deletions(-)

diff --git a/docs/bulk-test.md b/docs/bulk-test.md
index 2a77f95..f203df1 100644
--- a/docs/bulk-test.md
+++ b/docs/bulk-test.md
@@ -21,7 +21,7 @@
 
 # Running a bulk ingest test
 
-Continous ingest supports bulk ingest in addition to live ingest. A map reduce
+Continuous ingest supports bulk ingest in addition to live ingest. A map reduce
 job that generates rfiles using the tables splits can be run.  This can be run
 in a loop like the following to continually bulk import data.
 
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/CreateTable.java b/src/main/java/org/apache/accumulo/testing/continuous/CreateTable.java
index 511cd45..061c9a0 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/CreateTable.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/CreateTable.java
@@ -62,9 +62,7 @@ public class CreateTable {
       // retrieve and set tserver props
       Map<String,String> props = getProps(env, TestProps.CI_COMMON_ACCUMULO_SERVER_PROPS);
       try {
-        for (Map.Entry<String,String> entry : props.entrySet()) {
-          client.instanceOperations().setProperty(entry.getKey(), entry.getValue());
-        }
+        client.instanceOperations().modifyProperties(properties -> properties.putAll(props));
       } catch (AccumuloException | AccumuloSecurityException e) {
         log.error("Failed to set tserver props");
         throw new Exception(e);
diff --git a/src/main/java/org/apache/accumulo/testing/ingest/BulkImportDirectory.java b/src/main/java/org/apache/accumulo/testing/ingest/BulkImportDirectory.java
index c125fcb..92dc5e4 100644
--- a/src/main/java/org/apache/accumulo/testing/ingest/BulkImportDirectory.java
+++ b/src/main/java/org/apache/accumulo/testing/ingest/BulkImportDirectory.java
@@ -26,9 +26,6 @@ import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.testing.cli.ClientOpts;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 
 import com.beust.jcommander.Parameter;
 
@@ -38,23 +35,15 @@ public class BulkImportDirectory {
     String tableName;
     @Parameter(names = {"-s", "--source"}, description = "directory to import from")
     String source = null;
-    @Parameter(names = {"-f", "--failures"},
-        description = "directory to copy failures into: will be deleted before the bulk import")
-    String failures = null;
   }
 
-  @SuppressWarnings("deprecation")
   public static void main(String[] args)
       throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException {
-    final FileSystem fs = FileSystem.get(new Configuration());
     Opts opts = new Opts();
-    System.err.println(
-        "Deprecated syntax for BulkImportDirectory, please use the new style (see --help)");
     opts.parseArgs(BulkImportDirectory.class.getName(), args);
-    fs.delete(new Path(opts.failures), true);
-    fs.mkdirs(new Path(opts.failures));
     try (AccumuloClient client = Accumulo.newClient().from(opts.getClientProps()).build()) {
-      client.tableOperations().importDirectory(opts.tableName, opts.source, opts.failures, false);
+      client.tableOperations().importDirectory(opts.source).to(opts.tableName).tableTime(false)
+          .load();
     }
   }
 }
diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/Config.java b/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/Config.java
index bea6fd9..fb2937d 100644
--- a/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/Config.java
+++ b/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/Config.java
@@ -58,10 +58,6 @@ public class Config extends Test {
   // @formatter:off
   final Setting[] settings = {
 			s(Property.TSERV_BLOOM_LOAD_MAXCONCURRENT, 1, 10),
-			s(Property.TSERV_BULK_PROCESS_THREADS, 1, 10),
-			s(Property.TSERV_BULK_RETRY, 1, 10),
-			s(Property.TSERV_BULK_TIMEOUT, 10, 600),
-			s(Property.TSERV_BULK_ASSIGNMENT_THREADS, 1, 10),
 			s(Property.TSERV_DATACACHE_SIZE, 0, 1000000000L),
 			s(Property.TSERV_INDEXCACHE_SIZE, 0, 1000000000L),
 			s(Property.TSERV_CLIENT_TIMEOUT, 100, 10000),
@@ -84,8 +80,6 @@ public class Config extends Test {
 			s(Property.TSERV_TABLET_SPLIT_FINDMIDPOINT_MAXOPEN, 5, 100),
 			s(Property.TSERV_WAL_BLOCKSIZE, 1024 * 1024,1024 * 1024 * 1024 * 10L),
 			s(Property.TSERV_WORKQ_THREADS, 1, 10),
-			s(Property.MANAGER_BULK_THREADPOOL_SIZE, 1, 10),
-			s(Property.MANAGER_BULK_RETRIES, 1, 10),
 			s(Property.MANAGER_BULK_TIMEOUT, 10, 600),
 			s(Property.MANAGER_FATE_THREADPOOL_SIZE, 1, 100),
 			s(Property.MANAGER_RECOVERY_DELAY, 0, 100),
diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/BulkImport.java b/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/BulkImport.java
index b9525ac..8f79447 100644
--- a/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/BulkImport.java
+++ b/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/BulkImport.java
@@ -35,7 +35,6 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.testing.randomwalk.RandWalkEnv;
 import org.apache.accumulo.testing.randomwalk.State;
 import org.apache.accumulo.testing.randomwalk.Test;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
@@ -55,9 +54,8 @@ public class BulkImport extends Test {
   private static final Value ONE = new Value("1".getBytes());
 
   /**
-   * Tests both the legacy (deprecated) and new bulk import methods.
+   * Tests both the bulk import methods.
    */
-  @SuppressWarnings({"deprecation", "unchecked"})
   public void visit(final State state, final RandWalkEnv env, Properties props) throws Exception {
     List<String> tables = (List<String>) state.get("tableList");
 
@@ -74,14 +72,13 @@ public class BulkImport extends Test {
     final FileSystem fs = (FileSystem) state.get("fs");
     fs.mkdirs(fail);
     final int parts = env.getRandom().nextInt(10) + 1;
-    final boolean useLegacyBulk = env.getRandom().nextBoolean();
 
     TreeSet<String> rows = new TreeSet<>();
     for (int i = 0; i < ROWS; i++)
       rows.add(uuid + String.format("__%06d", i));
 
     String markerColumnQualifier = String.format("%07d", counter.incrementAndGet());
-    log.debug("Preparing {} bulk import to {}", useLegacyBulk ? "legacy" : "new", tableName);
+    log.debug("Preparing bulk import to {}", tableName);
 
     for (int i = 0; i < parts; i++) {
       String fileName = dir + "/" + String.format("part_%d.rf", i);
@@ -96,26 +93,15 @@ public class BulkImport extends Test {
         }
       }
     }
-    log.debug("Starting {} bulk import to {}", useLegacyBulk ? "legacy" : "new", tableName);
+    log.debug("Starting bulk import to {}", tableName);
     try {
-      if (useLegacyBulk) {
-        env.getAccumuloClient().tableOperations().importDirectory(tableName, dir.toString(),
-            fail.toString(), true);
-        FileStatus[] failures = fs.listStatus(fail);
-        if (failures != null && failures.length > 0) {
-          state.set("bulkImportSuccess", "false");
-          throw new Exception(failures.length + " failure files found importing files from " + dir);
-        }
-      } else {
-        env.getAccumuloClient().tableOperations().importDirectory(dir.toString()).to(tableName)
-            .tableTime(true).load();
-      }
+      env.getAccumuloClient().tableOperations().importDirectory(dir.toString()).to(tableName)
+          .tableTime(true).load();
 
       fs.delete(dir, true);
       fs.delete(fail, true);
-      log.debug("Finished {} bulk import to {} start: {} last: {} marker: {}",
-          useLegacyBulk ? "legacy" : "new", tableName, rows.first(), rows.last(),
-          markerColumnQualifier);
+      log.debug("Finished bulk import to {} start: {} last: {} marker: {}", tableName, rows.first(),
+          rows.last(), markerColumnQualifier);
     } catch (TableNotFoundException tnfe) {
       log.debug("Table {} was deleted", tableName);
       tables.remove(tableName);
diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/shard/BulkInsert.java b/src/main/java/org/apache/accumulo/testing/randomwalk/shard/BulkInsert.java
index 4a92447..d6a9dc9 100644
--- a/src/main/java/org/apache/accumulo/testing/randomwalk/shard/BulkInsert.java
+++ b/src/main/java/org/apache/accumulo/testing/randomwalk/shard/BulkInsert.java
@@ -18,7 +18,6 @@
  */
 package org.apache.accumulo.testing.randomwalk.shard;
 
-import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 import static java.nio.charset.StandardCharsets.UTF_8;
 
 import java.io.BufferedOutputStream;
@@ -29,7 +28,6 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Properties;
 import java.util.Random;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.BatchWriter;
@@ -43,7 +41,6 @@ import org.apache.accumulo.testing.randomwalk.State;
 import org.apache.accumulo.testing.randomwalk.Test;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
@@ -137,39 +134,17 @@ public class BulkInsert extends Test {
     sort(env, fs, indexTableName, rootDir + "/index.seq", rootDir + "/index_bulk",
         rootDir + "/index_work", maxSplits);
 
-    bulkImport(fs, env, dataTableName, rootDir, "data");
-    bulkImport(fs, env, indexTableName, rootDir, "index");
+    bulkImport(env, dataTableName, rootDir, "data");
+    bulkImport(env, indexTableName, rootDir, "index");
 
     fs.delete(new Path(rootDir), true);
   }
 
-  @SuppressWarnings("deprecation")
-  private void bulkImport(FileSystem fs, RandWalkEnv env, String tableName, String rootDir,
-      String prefix) throws Exception {
-    while (true) {
-      String bulkDir = rootDir + "/" + prefix + "_bulk";
-      String failDir = rootDir + "/" + prefix + "_failure";
-      Path failPath = new Path(failDir);
-      fs.delete(failPath, true);
-      fs.mkdirs(failPath);
-      env.getAccumuloClient().tableOperations().importDirectory(tableName, bulkDir, failDir, true);
-
-      FileStatus[] failures = fs.listStatus(failPath);
-
-      if (failures == null || failures.length == 0)
-        break;
-
-      log.warn("Failed to bulk import some files, retrying ");
-
-      for (FileStatus failure : failures) {
-        if (!failure.getPath().getName().endsWith(".seq"))
-          fs.rename(failure.getPath(), new Path(new Path(bulkDir), failure.getPath().getName()));
-        else
-          log.debug("Ignoring " + failure.getPath());
-      }
-      sleepUninterruptibly(3, TimeUnit.SECONDS);
-
-    }
+  private void bulkImport(RandWalkEnv env, String tableName, String rootDir, String prefix)
+      throws Exception {
+    String bulkDir = rootDir + "/" + prefix + "_bulk";
+    env.getAccumuloClient().tableOperations().importDirectory(bulkDir).to(tableName).tableTime(true)
+        .load();
   }
 
   private void sort(RandWalkEnv env, FileSystem fs, String tableName, String seqFile,