You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mm...@apache.org on 2021/02/09 21:49:01 UTC

[accumulo-testing] branch main updated: Add a Bulk import to randomwalk MultiTable (#134)

This is an automated email from the ASF dual-hosted git repository.

mmiller pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo-testing.git


The following commit(s) were added to refs/heads/main by this push:
     new 689d6d4  Add a Bulk import to randomwalk MultiTable (#134)
689d6d4 is described below

commit 689d6d411e5fda3aeca292656049de1718e949d5
Author: Mike Miller <mm...@apache.org>
AuthorDate: Tue Feb 9 16:48:55 2021 -0500

    Add a Bulk import to randomwalk MultiTable (#134)
    
    * Create new BulkImport test in MultiTable for more realistic case
    * Tweak MultiTable.xml to drop 20 tables, same as create. This will keep
    the number of talbes down when running for long period.
    * Also set log4j testing logging to DEBUG
---
 conf/log4j.properties                              |   2 +-
 .../testing/randomwalk/multitable/BulkImport.java  | 119 +++++++++++++++++++++
 .../randomwalk/multitable/MultiTableFixture.java   |   3 +
 .../resources/randomwalk/modules/MultiTable.xml    |  10 +-
 4 files changed, 130 insertions(+), 4 deletions(-)

diff --git a/conf/log4j.properties b/conf/log4j.properties
index 726fb2c..525a6e4 100644
--- a/conf/log4j.properties
+++ b/conf/log4j.properties
@@ -19,7 +19,7 @@ log4j.appender.CA.layout=org.apache.log4j.PatternLayout
 log4j.appender.CA.layout.ConversionPattern=%d{ISO8601} [%c{3}] %-5p: %m%n
 
 log4j.logger.org.apache.accumulo=WARN
-log4j.logger.org.apache.accumulo.testing=INFO
+log4j.logger.org.apache.accumulo.testing=DEBUG
 log4j.logger.org.apache.curator=ERROR
 log4j.logger.org.apache.hadoop=WARN
 log4j.logger.org.apache.hadoop.mapreduce=ERROR
diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/BulkImport.java b/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/BulkImport.java
new file mode 100644
index 0000000..4c43b0a
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/BulkImport.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.randomwalk.multitable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.accumulo.core.client.IteratorSetting.Column;
+import org.apache.accumulo.core.client.rfile.RFile;
+import org.apache.accumulo.core.client.rfile.RFileWriter;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.testing.randomwalk.RandWalkEnv;
+import org.apache.accumulo.testing.randomwalk.State;
+import org.apache.accumulo.testing.randomwalk.Test;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+
+public class BulkImport extends Test {
+
+  public static final int LOTS = 100000;
+  public static final int COLS = 10;
+  public static final List<Column> COLNAMES = new ArrayList<>();
+  public static final Text CHECK_COLUMN_FAMILY = new Text("cf");
+  static {
+    for (int i = 0; i < COLS; i++) {
+      COLNAMES.add(new Column(CHECK_COLUMN_FAMILY, new Text(String.format("%03d", i))));
+    }
+  }
+  public static final Text MARKER_CF = new Text("marker");
+  static final AtomicLong counter = new AtomicLong();
+
+  private static final Value ONE = new Value("1".getBytes());
+
+  /**
+   * Tests both the legacy (deprecated) and new bulk import methods.
+   */
+  @SuppressWarnings({"deprecation", "unchecked"})
+  public void visit(final State state, final RandWalkEnv env, Properties props) throws Exception {
+    List<String> tables = (List<String>) state.get("tableList");
+
+    if (tables.isEmpty()) {
+      log.debug("No tables to ingest into");
+      return;
+    }
+
+    Random rand = new Random();
+    String tableName = tables.get(rand.nextInt(tables.size()));
+
+    String uuid = UUID.randomUUID().toString();
+    final Path dir = new Path("/tmp/bulk", uuid);
+    final Path fail = new Path(dir.toString() + "_fail");
+    final FileSystem fs = (FileSystem) state.get("fs");
+    fs.mkdirs(fail);
+    final int parts = rand.nextInt(10) + 1;
+    final boolean useLegacyBulk = rand.nextBoolean();
+
+    TreeSet<String> rows = new TreeSet<>();
+    for (int i = 0; i < LOTS; i++)
+      rows.add(uuid + String.format("__%05d", i));
+
+    String markerColumnQualifier = String.format("%07d", counter.incrementAndGet());
+    log.debug("Preparing {} bulk import to {}", useLegacyBulk ? "legacy" : "new", tableName);
+
+    for (int i = 0; i < parts; i++) {
+      String fileName = dir + "/" + String.format("part_%d.rf", i);
+      RFileWriter f = RFile.newWriter().to(fileName).withFileSystem(fs).build();
+      f.startDefaultLocalityGroup();
+      for (String r : rows) {
+        Text row = new Text(r);
+        for (Column col : COLNAMES) {
+          f.append(new Key(row, col.getColumnFamily(), col.getColumnQualifier()), ONE);
+        }
+        f.append(new Key(row, MARKER_CF, new Text(markerColumnQualifier)), ONE);
+      }
+      f.close();
+    }
+    if (useLegacyBulk) {
+      env.getAccumuloClient().tableOperations().importDirectory(tableName, dir.toString(),
+          fail.toString(), true);
+      FileStatus[] failures = fs.listStatus(fail);
+      if (failures != null && failures.length > 0) {
+        state.set("bulkImportSuccess", "false");
+        throw new Exception(failures.length + " failure files found importing files from " + dir);
+      }
+    } else {
+      env.getAccumuloClient().tableOperations().importDirectory(dir.toString()).to(tableName)
+          .tableTime(true).load();
+    }
+
+    fs.delete(dir, true);
+    fs.delete(fail, true);
+    log.debug("Finished {} bulk import to {} start: {} last: {} marker: {}",
+        useLegacyBulk ? "legacy" : "new", tableName, rows.first(), rows.last(),
+        markerColumnQualifier);
+  }
+
+}
diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/MultiTableFixture.java b/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/MultiTableFixture.java
index 1c65118..0549bca 100644
--- a/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/MultiTableFixture.java
+++ b/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/MultiTableFixture.java
@@ -27,6 +27,8 @@ import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.testing.randomwalk.Fixture;
 import org.apache.accumulo.testing.randomwalk.RandWalkEnv;
 import org.apache.accumulo.testing.randomwalk.State;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 
 public class MultiTableFixture extends Fixture {
 
@@ -41,6 +43,7 @@ public class MultiTableFixture extends Fixture {
     state.set("numWrites", Long.valueOf(0));
     state.set("totalWrites", Long.valueOf(0));
     state.set("tableList", new CopyOnWriteArrayList<String>());
+    state.set("fs", FileSystem.get(new Configuration()));
   }
 
   @Override
diff --git a/src/main/resources/randomwalk/modules/MultiTable.xml b/src/main/resources/randomwalk/modules/MultiTable.xml
index a28d75e..55b7f0b 100644
--- a/src/main/resources/randomwalk/modules/MultiTable.xml
+++ b/src/main/resources/randomwalk/modules/MultiTable.xml
@@ -29,11 +29,11 @@
 
 <node id="dummy.ToAll">
   <edge id="mt.CreateTable" weight="20"/>
-  <edge id="mt.Write" weight="100"/>
+  <edge id="mt.Write" weight="10"/>
   <edge id="mt.CopyTable" weight="5"/>
+  <edge id="mt.BulkImport" weight="100"/>
   <edge id="mt.OfflineTable" weight="10"/>
-  <edge id="mt.DropTable" weight="3"/>
-  <edge id="END" weight="1"/>
+  <edge id="mt.DropTable" weight="20"/>
 </node>
 
 <node id="mt.Write">
@@ -57,4 +57,8 @@
   <edge id="dummy.ToAll" weight="1"/>
 </node>
 
+<node id="mt.BulkImport">
+  <edge id="dummy.ToAll" weight="1"/>
+</node>
+
 </module>