You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mm...@apache.org on 2021/02/09 21:49:01 UTC
[accumulo-testing] branch main updated: Add a Bulk import to
randomwalk MultiTable (#134)
This is an automated email from the ASF dual-hosted git repository.
mmiller pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo-testing.git
The following commit(s) were added to refs/heads/main by this push:
new 689d6d4 Add a Bulk import to randomwalk MultiTable (#134)
689d6d4 is described below
commit 689d6d411e5fda3aeca292656049de1718e949d5
Author: Mike Miller <mm...@apache.org>
AuthorDate: Tue Feb 9 16:48:55 2021 -0500
Add a Bulk import to randomwalk MultiTable (#134)
* Create new BulkImport test in MultiTable for more realistic case
* Tweak MultiTable.xml to drop 20 tables, same as create. This will keep
the number of talbes down when running for long period.
* Also set log4j testing logging to DEBUG
---
conf/log4j.properties | 2 +-
.../testing/randomwalk/multitable/BulkImport.java | 119 +++++++++++++++++++++
.../randomwalk/multitable/MultiTableFixture.java | 3 +
.../resources/randomwalk/modules/MultiTable.xml | 10 +-
4 files changed, 130 insertions(+), 4 deletions(-)
diff --git a/conf/log4j.properties b/conf/log4j.properties
index 726fb2c..525a6e4 100644
--- a/conf/log4j.properties
+++ b/conf/log4j.properties
@@ -19,7 +19,7 @@ log4j.appender.CA.layout=org.apache.log4j.PatternLayout
log4j.appender.CA.layout.ConversionPattern=%d{ISO8601} [%c{3}] %-5p: %m%n
log4j.logger.org.apache.accumulo=WARN
-log4j.logger.org.apache.accumulo.testing=INFO
+log4j.logger.org.apache.accumulo.testing=DEBUG
log4j.logger.org.apache.curator=ERROR
log4j.logger.org.apache.hadoop=WARN
log4j.logger.org.apache.hadoop.mapreduce=ERROR
diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/BulkImport.java b/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/BulkImport.java
new file mode 100644
index 0000000..4c43b0a
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/BulkImport.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.randomwalk.multitable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.accumulo.core.client.IteratorSetting.Column;
+import org.apache.accumulo.core.client.rfile.RFile;
+import org.apache.accumulo.core.client.rfile.RFileWriter;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.testing.randomwalk.RandWalkEnv;
+import org.apache.accumulo.testing.randomwalk.State;
+import org.apache.accumulo.testing.randomwalk.Test;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+
+public class BulkImport extends Test {
+
+ public static final int LOTS = 100000;
+ public static final int COLS = 10;
+ public static final List<Column> COLNAMES = new ArrayList<>();
+ public static final Text CHECK_COLUMN_FAMILY = new Text("cf");
+ static {
+ for (int i = 0; i < COLS; i++) {
+ COLNAMES.add(new Column(CHECK_COLUMN_FAMILY, new Text(String.format("%03d", i))));
+ }
+ }
+ public static final Text MARKER_CF = new Text("marker");
+ static final AtomicLong counter = new AtomicLong();
+
+ private static final Value ONE = new Value("1".getBytes());
+
+ /**
+ * Tests both the legacy (deprecated) and new bulk import methods.
+ */
+ @SuppressWarnings({"deprecation", "unchecked"})
+ public void visit(final State state, final RandWalkEnv env, Properties props) throws Exception {
+ List<String> tables = (List<String>) state.get("tableList");
+
+ if (tables.isEmpty()) {
+ log.debug("No tables to ingest into");
+ return;
+ }
+
+ Random rand = new Random();
+ String tableName = tables.get(rand.nextInt(tables.size()));
+
+ String uuid = UUID.randomUUID().toString();
+ final Path dir = new Path("/tmp/bulk", uuid);
+ final Path fail = new Path(dir.toString() + "_fail");
+ final FileSystem fs = (FileSystem) state.get("fs");
+ fs.mkdirs(fail);
+ final int parts = rand.nextInt(10) + 1;
+ final boolean useLegacyBulk = rand.nextBoolean();
+
+ TreeSet<String> rows = new TreeSet<>();
+ for (int i = 0; i < LOTS; i++)
+ rows.add(uuid + String.format("__%05d", i));
+
+ String markerColumnQualifier = String.format("%07d", counter.incrementAndGet());
+ log.debug("Preparing {} bulk import to {}", useLegacyBulk ? "legacy" : "new", tableName);
+
+ for (int i = 0; i < parts; i++) {
+ String fileName = dir + "/" + String.format("part_%d.rf", i);
+ RFileWriter f = RFile.newWriter().to(fileName).withFileSystem(fs).build();
+ f.startDefaultLocalityGroup();
+ for (String r : rows) {
+ Text row = new Text(r);
+ for (Column col : COLNAMES) {
+ f.append(new Key(row, col.getColumnFamily(), col.getColumnQualifier()), ONE);
+ }
+ f.append(new Key(row, MARKER_CF, new Text(markerColumnQualifier)), ONE);
+ }
+ f.close();
+ }
+ if (useLegacyBulk) {
+ env.getAccumuloClient().tableOperations().importDirectory(tableName, dir.toString(),
+ fail.toString(), true);
+ FileStatus[] failures = fs.listStatus(fail);
+ if (failures != null && failures.length > 0) {
+ state.set("bulkImportSuccess", "false");
+ throw new Exception(failures.length + " failure files found importing files from " + dir);
+ }
+ } else {
+ env.getAccumuloClient().tableOperations().importDirectory(dir.toString()).to(tableName)
+ .tableTime(true).load();
+ }
+
+ fs.delete(dir, true);
+ fs.delete(fail, true);
+ log.debug("Finished {} bulk import to {} start: {} last: {} marker: {}",
+ useLegacyBulk ? "legacy" : "new", tableName, rows.first(), rows.last(),
+ markerColumnQualifier);
+ }
+
+}
diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/MultiTableFixture.java b/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/MultiTableFixture.java
index 1c65118..0549bca 100644
--- a/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/MultiTableFixture.java
+++ b/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/MultiTableFixture.java
@@ -27,6 +27,8 @@ import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.testing.randomwalk.Fixture;
import org.apache.accumulo.testing.randomwalk.RandWalkEnv;
import org.apache.accumulo.testing.randomwalk.State;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
public class MultiTableFixture extends Fixture {
@@ -41,6 +43,7 @@ public class MultiTableFixture extends Fixture {
state.set("numWrites", Long.valueOf(0));
state.set("totalWrites", Long.valueOf(0));
state.set("tableList", new CopyOnWriteArrayList<String>());
+ state.set("fs", FileSystem.get(new Configuration()));
}
@Override
diff --git a/src/main/resources/randomwalk/modules/MultiTable.xml b/src/main/resources/randomwalk/modules/MultiTable.xml
index a28d75e..55b7f0b 100644
--- a/src/main/resources/randomwalk/modules/MultiTable.xml
+++ b/src/main/resources/randomwalk/modules/MultiTable.xml
@@ -29,11 +29,11 @@
<node id="dummy.ToAll">
<edge id="mt.CreateTable" weight="20"/>
- <edge id="mt.Write" weight="100"/>
+ <edge id="mt.Write" weight="10"/>
<edge id="mt.CopyTable" weight="5"/>
+ <edge id="mt.BulkImport" weight="100"/>
<edge id="mt.OfflineTable" weight="10"/>
- <edge id="mt.DropTable" weight="3"/>
- <edge id="END" weight="1"/>
+ <edge id="mt.DropTable" weight="20"/>
</node>
<node id="mt.Write">
@@ -57,4 +57,8 @@
<edge id="dummy.ToAll" weight="1"/>
</node>
+<node id="mt.BulkImport">
+ <edge id="dummy.ToAll" weight="1"/>
+</node>
+
</module>