You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by do...@apache.org on 2023/06/01 19:28:52 UTC
[accumulo-testing] branch main updated: Resolve build issues with accumulo 3.0 (#261)
This is an automated email from the ASF dual-hosted git repository.
domgarguilo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo-testing.git
The following commit(s) were added to refs/heads/main by this push:
new 9a55b3b Resolve build issues with accumulo 3.0 (#261)
9a55b3b is described below
commit 9a55b3b13566fff33435f30c11d4d86cf76bf153
Author: Dom G <do...@apache.org>
AuthorDate: Thu Jun 1 15:28:47 2023 -0400
Resolve build issues with accumulo 3.0 (#261)
* Remove 'legacy' bulk import usage and resolve other build errors
---
docs/bulk-test.md | 2 +-
.../accumulo/testing/continuous/CreateTable.java | 4 +--
.../testing/ingest/BulkImportDirectory.java | 15 ++-------
.../testing/randomwalk/concurrent/Config.java | 6 ----
.../testing/randomwalk/multitable/BulkImport.java | 28 ++++------------
.../testing/randomwalk/shard/BulkInsert.java | 39 ++++------------------
6 files changed, 18 insertions(+), 76 deletions(-)
diff --git a/docs/bulk-test.md b/docs/bulk-test.md
index 2a77f95..f203df1 100644
--- a/docs/bulk-test.md
+++ b/docs/bulk-test.md
@@ -21,7 +21,7 @@
# Running a bulk ingest test
-Continous ingest supports bulk ingest in addition to live ingest. A map reduce
+Continuous ingest supports bulk ingest in addition to live ingest. A map reduce
job that generates rfiles using the tables splits can be run. This can be run
in a loop like the following to continually bulk import data.
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/CreateTable.java b/src/main/java/org/apache/accumulo/testing/continuous/CreateTable.java
index 511cd45..061c9a0 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/CreateTable.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/CreateTable.java
@@ -62,9 +62,7 @@ public class CreateTable {
// retrieve and set tserver props
Map<String,String> props = getProps(env, TestProps.CI_COMMON_ACCUMULO_SERVER_PROPS);
try {
- for (Map.Entry<String,String> entry : props.entrySet()) {
- client.instanceOperations().setProperty(entry.getKey(), entry.getValue());
- }
+ client.instanceOperations().modifyProperties(properties -> properties.putAll(props));
} catch (AccumuloException | AccumuloSecurityException e) {
log.error("Failed to set tserver props");
throw new Exception(e);
diff --git a/src/main/java/org/apache/accumulo/testing/ingest/BulkImportDirectory.java b/src/main/java/org/apache/accumulo/testing/ingest/BulkImportDirectory.java
index c125fcb..92dc5e4 100644
--- a/src/main/java/org/apache/accumulo/testing/ingest/BulkImportDirectory.java
+++ b/src/main/java/org/apache/accumulo/testing/ingest/BulkImportDirectory.java
@@ -26,9 +26,6 @@ import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.testing.cli.ClientOpts;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import com.beust.jcommander.Parameter;
@@ -38,23 +35,15 @@ public class BulkImportDirectory {
String tableName;
@Parameter(names = {"-s", "--source"}, description = "directory to import from")
String source = null;
- @Parameter(names = {"-f", "--failures"},
- description = "directory to copy failures into: will be deleted before the bulk import")
- String failures = null;
}
- @SuppressWarnings("deprecation")
public static void main(String[] args)
throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException {
- final FileSystem fs = FileSystem.get(new Configuration());
Opts opts = new Opts();
- System.err.println(
- "Deprecated syntax for BulkImportDirectory, please use the new style (see --help)");
opts.parseArgs(BulkImportDirectory.class.getName(), args);
- fs.delete(new Path(opts.failures), true);
- fs.mkdirs(new Path(opts.failures));
try (AccumuloClient client = Accumulo.newClient().from(opts.getClientProps()).build()) {
- client.tableOperations().importDirectory(opts.tableName, opts.source, opts.failures, false);
+ client.tableOperations().importDirectory(opts.source).to(opts.tableName).tableTime(false)
+ .load();
}
}
}
diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/Config.java b/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/Config.java
index bea6fd9..fb2937d 100644
--- a/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/Config.java
+++ b/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/Config.java
@@ -58,10 +58,6 @@ public class Config extends Test {
// @formatter:off
final Setting[] settings = {
s(Property.TSERV_BLOOM_LOAD_MAXCONCURRENT, 1, 10),
- s(Property.TSERV_BULK_PROCESS_THREADS, 1, 10),
- s(Property.TSERV_BULK_RETRY, 1, 10),
- s(Property.TSERV_BULK_TIMEOUT, 10, 600),
- s(Property.TSERV_BULK_ASSIGNMENT_THREADS, 1, 10),
s(Property.TSERV_DATACACHE_SIZE, 0, 1000000000L),
s(Property.TSERV_INDEXCACHE_SIZE, 0, 1000000000L),
s(Property.TSERV_CLIENT_TIMEOUT, 100, 10000),
@@ -84,8 +80,6 @@ public class Config extends Test {
s(Property.TSERV_TABLET_SPLIT_FINDMIDPOINT_MAXOPEN, 5, 100),
s(Property.TSERV_WAL_BLOCKSIZE, 1024 * 1024,1024 * 1024 * 1024 * 10L),
s(Property.TSERV_WORKQ_THREADS, 1, 10),
- s(Property.MANAGER_BULK_THREADPOOL_SIZE, 1, 10),
- s(Property.MANAGER_BULK_RETRIES, 1, 10),
s(Property.MANAGER_BULK_TIMEOUT, 10, 600),
s(Property.MANAGER_FATE_THREADPOOL_SIZE, 1, 100),
s(Property.MANAGER_RECOVERY_DELAY, 0, 100),
diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/BulkImport.java b/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/BulkImport.java
index b9525ac..8f79447 100644
--- a/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/BulkImport.java
+++ b/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/BulkImport.java
@@ -35,7 +35,6 @@ import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.testing.randomwalk.RandWalkEnv;
import org.apache.accumulo.testing.randomwalk.State;
import org.apache.accumulo.testing.randomwalk.Test;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
@@ -55,9 +54,8 @@ public class BulkImport extends Test {
private static final Value ONE = new Value("1".getBytes());
/**
- * Tests both the legacy (deprecated) and new bulk import methods.
+ * Tests both the bulk import methods.
*/
- @SuppressWarnings({"deprecation", "unchecked"})
public void visit(final State state, final RandWalkEnv env, Properties props) throws Exception {
List<String> tables = (List<String>) state.get("tableList");
@@ -74,14 +72,13 @@ public class BulkImport extends Test {
final FileSystem fs = (FileSystem) state.get("fs");
fs.mkdirs(fail);
final int parts = env.getRandom().nextInt(10) + 1;
- final boolean useLegacyBulk = env.getRandom().nextBoolean();
TreeSet<String> rows = new TreeSet<>();
for (int i = 0; i < ROWS; i++)
rows.add(uuid + String.format("__%06d", i));
String markerColumnQualifier = String.format("%07d", counter.incrementAndGet());
- log.debug("Preparing {} bulk import to {}", useLegacyBulk ? "legacy" : "new", tableName);
+ log.debug("Preparing bulk import to {}", tableName);
for (int i = 0; i < parts; i++) {
String fileName = dir + "/" + String.format("part_%d.rf", i);
@@ -96,26 +93,15 @@ public class BulkImport extends Test {
}
}
}
- log.debug("Starting {} bulk import to {}", useLegacyBulk ? "legacy" : "new", tableName);
+ log.debug("Starting bulk import to {}", tableName);
try {
- if (useLegacyBulk) {
- env.getAccumuloClient().tableOperations().importDirectory(tableName, dir.toString(),
- fail.toString(), true);
- FileStatus[] failures = fs.listStatus(fail);
- if (failures != null && failures.length > 0) {
- state.set("bulkImportSuccess", "false");
- throw new Exception(failures.length + " failure files found importing files from " + dir);
- }
- } else {
- env.getAccumuloClient().tableOperations().importDirectory(dir.toString()).to(tableName)
- .tableTime(true).load();
- }
+ env.getAccumuloClient().tableOperations().importDirectory(dir.toString()).to(tableName)
+ .tableTime(true).load();
fs.delete(dir, true);
fs.delete(fail, true);
- log.debug("Finished {} bulk import to {} start: {} last: {} marker: {}",
- useLegacyBulk ? "legacy" : "new", tableName, rows.first(), rows.last(),
- markerColumnQualifier);
+ log.debug("Finished bulk import to {} start: {} last: {} marker: {}", tableName, rows.first(),
+ rows.last(), markerColumnQualifier);
} catch (TableNotFoundException tnfe) {
log.debug("Table {} was deleted", tableName);
tables.remove(tableName);
diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/shard/BulkInsert.java b/src/main/java/org/apache/accumulo/testing/randomwalk/shard/BulkInsert.java
index 4a92447..d6a9dc9 100644
--- a/src/main/java/org/apache/accumulo/testing/randomwalk/shard/BulkInsert.java
+++ b/src/main/java/org/apache/accumulo/testing/randomwalk/shard/BulkInsert.java
@@ -18,7 +18,6 @@
*/
package org.apache.accumulo.testing.randomwalk.shard;
-import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
import static java.nio.charset.StandardCharsets.UTF_8;
import java.io.BufferedOutputStream;
@@ -29,7 +28,6 @@ import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.Random;
-import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchWriter;
@@ -43,7 +41,6 @@ import org.apache.accumulo.testing.randomwalk.State;
import org.apache.accumulo.testing.randomwalk.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
@@ -137,39 +134,17 @@ public class BulkInsert extends Test {
sort(env, fs, indexTableName, rootDir + "/index.seq", rootDir + "/index_bulk",
rootDir + "/index_work", maxSplits);
- bulkImport(fs, env, dataTableName, rootDir, "data");
- bulkImport(fs, env, indexTableName, rootDir, "index");
+ bulkImport(env, dataTableName, rootDir, "data");
+ bulkImport(env, indexTableName, rootDir, "index");
fs.delete(new Path(rootDir), true);
}
- @SuppressWarnings("deprecation")
- private void bulkImport(FileSystem fs, RandWalkEnv env, String tableName, String rootDir,
- String prefix) throws Exception {
- while (true) {
- String bulkDir = rootDir + "/" + prefix + "_bulk";
- String failDir = rootDir + "/" + prefix + "_failure";
- Path failPath = new Path(failDir);
- fs.delete(failPath, true);
- fs.mkdirs(failPath);
- env.getAccumuloClient().tableOperations().importDirectory(tableName, bulkDir, failDir, true);
-
- FileStatus[] failures = fs.listStatus(failPath);
-
- if (failures == null || failures.length == 0)
- break;
-
- log.warn("Failed to bulk import some files, retrying ");
-
- for (FileStatus failure : failures) {
- if (!failure.getPath().getName().endsWith(".seq"))
- fs.rename(failure.getPath(), new Path(new Path(bulkDir), failure.getPath().getName()));
- else
- log.debug("Ignoring " + failure.getPath());
- }
- sleepUninterruptibly(3, TimeUnit.SECONDS);
-
- }
+ private void bulkImport(RandWalkEnv env, String tableName, String rootDir, String prefix)
+ throws Exception {
+ String bulkDir = rootDir + "/" + prefix + "_bulk";
+ env.getAccumuloClient().tableOperations().importDirectory(bulkDir).to(tableName).tableTime(true)
+ .load();
}
private void sort(RandWalkEnv env, FileSystem fs, String tableName, String seqFile,