You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mm...@apache.org on 2019/09/17 15:45:30 UTC

[accumulo-testing] branch master updated: Move performance tests to accumulo-testing #1200 (#90)

This is an automated email from the ASF dual-hosted git repository.

mmiller pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo-testing.git


The following commit(s) were added to refs/heads/master by this push:
     new 3579908  Move performance tests to accumulo-testing #1200 (#90)
3579908 is described below

commit 3579908e459b957059d40447425b91887122c2b6
Author: Laura Schanno <lb...@gmail.com>
AuthorDate: Tue Sep 17 11:45:25 2019 -0400

    Move performance tests to accumulo-testing #1200 (#90)
    
    * Move ManySplitIT as HighSplitCreationPT.
    * Move BalanceFasterIT as SplitBalancingPT.
    * Move DeleteTableDuringSplitIT as TableDeletionDuringSplitPT.
    * Move RollWALPerformanceIT as RollWALPT.
    * Use Result.result() instead of Result.parameter() when reporting
    resulting values so that they will be put in the appropriate section
    when the JSON files are written.
    
    Related: #1200
---
 .../performance/tests/HighSplitCreationPT.java     |  69 +++++++++
 .../testing/performance/tests/RollWALPT.java       | 155 +++++++++++++++++++
 .../performance/tests/SplitBalancingPT.java        | 101 +++++++++++++
 .../tests/TableDeletionDuringSplitPT.java          | 168 +++++++++++++++++++++
 4 files changed, 493 insertions(+)

diff --git a/src/main/java/org/apache/accumulo/testing/performance/tests/HighSplitCreationPT.java b/src/main/java/org/apache/accumulo/testing/performance/tests/HighSplitCreationPT.java
new file mode 100644
index 0000000..fe15fba
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/testing/performance/tests/HighSplitCreationPT.java
@@ -0,0 +1,69 @@
+package org.apache.accumulo.testing.performance.tests;
+
+import java.nio.charset.StandardCharsets;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.testing.performance.Environment;
+import org.apache.accumulo.testing.performance.PerformanceTest;
+import org.apache.accumulo.testing.performance.Report;
+import org.apache.accumulo.testing.performance.SystemConfiguration;
+import org.apache.hadoop.io.Text;
+
+public class HighSplitCreationPT implements PerformanceTest {
+
+  private static final int NUM_SPLITS = 10_000;
+  private static final int MIN_REQUIRED_SPLITS_PER_SECOND = 100;
+  private static final int ONE_SECOND = 1000;
+  private static final String TABLE_NAME = "highSplitCreation";
+  private static final String METADATA_TABLE_SPLITS = "123456789abcde";
+
+  @Override
+  public SystemConfiguration getSystemConfig() {
+    return new SystemConfiguration();
+  }
+
+  @Override
+  public Report runTest(final Environment env) throws Exception {
+    Report.Builder reportBuilder = Report.builder().id("high_split_creation")
+        .description("Evaluate the speed of creating many splits.")
+        .parameter("table_name", TABLE_NAME, "The name of the test table")
+        .parameter("num_splits", NUM_SPLITS, "The high number of splits to add.")
+        .parameter("min_required_splits_per_second", MIN_REQUIRED_SPLITS_PER_SECOND,
+            "The minimum average number of splits that must be created per second before performance is considered too slow.");
+
+    AccumuloClient client = env.getClient();
+    client.tableOperations().create(TABLE_NAME);
+    client.tableOperations().addSplits(MetadataTable.NAME, getMetadataTableSplits());
+
+    SortedSet<Text> splits = getTestTableSplits();
+
+    long start = System.currentTimeMillis();
+    client.tableOperations().addSplits(TABLE_NAME, splits);
+    long totalTime = System.currentTimeMillis() - start;
+    double splitsPerSecond = NUM_SPLITS / (totalTime / ONE_SECOND);
+
+    reportBuilder.result("splits_per_second", splitsPerSecond,
+        "The average number of splits created per second.");
+
+    return reportBuilder.build();
+  }
+
+  private SortedSet<Text> getMetadataTableSplits() {
+    SortedSet<Text> splits = new TreeSet<>();
+    for (byte b : METADATA_TABLE_SPLITS.getBytes(StandardCharsets.UTF_8)) {
+      splits.add(new Text(new byte[] {'1', ';', b}));
+    }
+    return splits;
+  }
+
+  private SortedSet<Text> getTestTableSplits() {
+    SortedSet<Text> splits = new TreeSet<>();
+    for (int i = 0; i < NUM_SPLITS; i++) {
+      splits.add(new Text(Integer.toHexString(i)));
+    }
+    return splits;
+  }
+}
diff --git a/src/main/java/org/apache/accumulo/testing/performance/tests/RollWALPT.java b/src/main/java/org/apache/accumulo/testing/performance/tests/RollWALPT.java
new file mode 100644
index 0000000..bae6cf5
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/testing/performance/tests/RollWALPT.java
@@ -0,0 +1,155 @@
+package org.apache.accumulo.testing.performance.tests;
+
+import java.security.SecureRandom;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.UUID;
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.testing.performance.Environment;
+import org.apache.accumulo.testing.performance.PerformanceTest;
+import org.apache.accumulo.testing.performance.Report;
+import org.apache.accumulo.testing.performance.SystemConfiguration;
+import org.apache.hadoop.io.Text;
+
+public class RollWALPT implements PerformanceTest {
+
+  private static final String TABLE_SMALL_WAL = "SmallRollWAL";
+  private static final String TABLE_LARGE_WAL = "LargeRollWAL";
+  private static final String SIZE_SMALL_WAL = "5M";
+  private static final String SIZE_LARGE_WAL = "1G";
+  private static final long NUM_SPLITS = 100L;
+  private static final long SPLIT_DISTANCE = Long.MAX_VALUE / NUM_SPLITS;
+  private static final int NUM_ENTRIES = 50_000;
+
+  private final SecureRandom random = new SecureRandom();
+
+  @Override
+  public SystemConfiguration getSystemConfig() {
+    Map<String,String> config = new HashMap<>();
+
+    config.put(Property.TSERV_WAL_REPLICATION.getKey(), "1");
+    config.put(Property.TSERV_WALOG_MAX_REFERENCED.getKey(), "100");
+    config.put(Property.GC_CYCLE_START.getKey(), "1s");
+    config.put(Property.GC_CYCLE_DELAY.getKey(), "1s");
+
+    return new SystemConfiguration().setAccumuloConfig(config);
+  }
+
+  @Override
+  public Report runTest(final Environment env) throws Exception {
+    Report.Builder reportBuilder = Report.builder().id("rollWAL").description(
+        "Evaluate the performance of ingesting a large number of entries across numerous splits given both a small and large maximum WAL size.")
+        .parameter("small_wal_table", TABLE_SMALL_WAL,
+            "The name of the table used for evaluating performance with a small WAL.")
+        .parameter("large_wal_table", TABLE_LARGE_WAL,
+            "The name of the table used for evaluating performance with a large WAL.")
+        .parameter("num_splits", NUM_SPLITS,
+            "The number of splits that will be added to the tables.")
+        .parameter("split_distance", SPLIT_DISTANCE, "The distance between each split.")
+        .parameter("num_entries", NUM_ENTRIES,
+            "The number of entries that will be written to the tables.")
+        .parameter("small_wal_size", SIZE_SMALL_WAL,
+            "The size of the small WAL used to force many rollovers.")
+        .parameter("large_wal_size", SIZE_LARGE_WAL,
+            "The size of the large WAL used to avoid many rollovers");
+
+    AccumuloClient client = env.getClient();
+    final long smallWALTime = evalSmallWAL(client);
+    reportBuilder.result("small_wal_write_time", smallWALTime,
+        "The time (in ns) it took to write entries to the table with a small WAL of "
+            + SIZE_SMALL_WAL);
+
+    final long largeWALTime = evalLargeWAL(client);
+    reportBuilder.result("large_wal_write_time", largeWALTime,
+        "The time (in ns) it took to write entries to the table with a large WAL of "
+            + SIZE_LARGE_WAL);
+    return reportBuilder.build();
+  }
+
+  private long evalSmallWAL(final AccumuloClient client) throws AccumuloSecurityException,
+      AccumuloException, TableExistsException, TableNotFoundException {
+    setMaxWALSize(SIZE_SMALL_WAL, client);
+    initTable(TABLE_SMALL_WAL, client);
+    return getTimeToWriteEntries(TABLE_SMALL_WAL, client);
+  }
+
+  private long evalLargeWAL(final AccumuloClient client) throws AccumuloSecurityException,
+      AccumuloException, TableExistsException, TableNotFoundException {
+    setMaxWALSize(SIZE_LARGE_WAL, client);
+    initTable(TABLE_LARGE_WAL, client);
+    return getTimeToWriteEntries(TABLE_LARGE_WAL, client);
+  }
+
+  private void setMaxWALSize(final String size, final AccumuloClient client)
+      throws AccumuloSecurityException, AccumuloException {
+    client.instanceOperations().setProperty(Property.TSERV_WALOG_MAX_SIZE.getKey(), size);
+  }
+
+  private void initTable(final String tableName, final AccumuloClient client)
+      throws AccumuloSecurityException, TableNotFoundException, AccumuloException,
+      TableExistsException {
+    client.tableOperations().create(tableName);
+    client.tableOperations().addSplits(tableName, getSplits());
+    client.instanceOperations().waitForBalance();
+  }
+
+  private SortedSet<Text> getSplits() {
+    SortedSet<Text> splits = new TreeSet<>();
+    for (long i = 0L; i < NUM_SPLITS; i++) {
+      splits.add(new Text(String.format("%016x", i + SPLIT_DISTANCE)));
+    }
+    return splits;
+  }
+
+  private long getTimeToWriteEntries(final String tableName, final AccumuloClient client)
+      throws TableNotFoundException, MutationsRejectedException {
+    long start = System.nanoTime();
+    writeEntries(tableName, client);
+    return System.nanoTime() - start;
+  }
+
+  private void writeEntries(final String tableName, final AccumuloClient client)
+      throws TableNotFoundException, MutationsRejectedException {
+    BatchWriter bw = client.createBatchWriter(tableName);
+
+    String instanceId = UUID.randomUUID().toString();
+    final ColumnVisibility cv = new ColumnVisibility();
+    for (int i = 0; i < NUM_ENTRIES; i++) {
+      String value = instanceId + i;
+      Mutation m = genMutation(cv, value);
+      bw.addMutation(m);
+    }
+
+    bw.close();
+  }
+
+  private Mutation genMutation(final ColumnVisibility colVis, final String value) {
+    byte[] rowStr = toZeroPaddedString(getRandomLong(), 16);
+    byte[] colFamStr = toZeroPaddedString(random.nextInt(Short.MAX_VALUE), 4);
+    byte[] colQualStr = toZeroPaddedString(random.nextInt(Short.MAX_VALUE), 4);
+    Mutation mutation = new Mutation(new Text(rowStr));
+    mutation.put(new Text(colFamStr), new Text(colQualStr), colVis, new Value(value));
+    return mutation;
+  }
+
+  private long getRandomLong() {
+    return ((random.nextLong() & 0x7fffffffffffffffL) % (Long.MAX_VALUE));
+  }
+
+  private byte[] toZeroPaddedString(long num, int width) {
+    return new byte[Math.max(Long.toString(num, 16).length(), width)];
+  }
+}
diff --git a/src/main/java/org/apache/accumulo/testing/performance/tests/SplitBalancingPT.java b/src/main/java/org/apache/accumulo/testing/performance/tests/SplitBalancingPT.java
new file mode 100644
index 0000000..22a3dbd
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/testing/performance/tests/SplitBalancingPT.java
@@ -0,0 +1,101 @@
+package org.apache.accumulo.testing.performance.tests;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.testing.performance.Environment;
+import org.apache.accumulo.testing.performance.PerformanceTest;
+import org.apache.accumulo.testing.performance.Report;
+import org.apache.accumulo.testing.performance.SystemConfiguration;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SplitBalancingPT implements PerformanceTest {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SplitBalancingPT.class);
+
+  private static final String TABLE_NAME = "splitBalancing";
+  private static final String RESERVED_PREFIX = "~";
+  private static final int NUM_SPLITS = 1_000;
+  private static final int MARGIN = 3;
+  private static final Text TSERVER_ASSIGNED_TABLETS_COL_FAM = new Text("loc");
+
+  @Override
+  public SystemConfiguration getSystemConfig() {
+    return new SystemConfiguration();
+  }
+
+  @Override
+  public Report runTest(final Environment env) throws Exception {
+    AccumuloClient client = env.getClient();
+    client.tableOperations().create(TABLE_NAME);
+    client.tableOperations().addSplits(TABLE_NAME, getSplits());
+    client.instanceOperations().waitForBalance();
+
+    int totalTabletServers = client.instanceOperations().getTabletServers().size();
+    int expectedAllocation = NUM_SPLITS / totalTabletServers;
+    int min = expectedAllocation - MARGIN;
+    int max = expectedAllocation + MARGIN;
+
+    Report.Builder reportBuilder = Report.builder().id("split_balancing").description(
+        "Evaluate and verify that when a high number of splits are created, that the tablets are balanced equally among tablet servers.")
+        .parameter("num_splits", NUM_SPLITS, "The number of splits")
+        .parameter("num_tservers", totalTabletServers, "The number of tablet servers")
+        .parameter("tserver_min", min,
+            "The minimum number of tablets that should be assigned to a tablet server.")
+        .parameter("tserver_max", max,
+            "The maximum number of tablets that should be assigned to a tablet server.");
+
+    boolean allServersBalanced = true;
+    Map<String,Integer> tablets = getTablets(client);
+    for (String tabletServer : tablets.keySet()) {
+      int count = tablets.get(tabletServer);
+      boolean balanced = count >= min && count <= max;
+      allServersBalanced = allServersBalanced & balanced;
+
+      reportBuilder.result("size_tserver_" + tabletServer, count,
+          "Total tablets assigned to tablet server " + tabletServer);
+    }
+
+    return reportBuilder.build();
+  }
+
+  private SortedSet<Text> getSplits() {
+    SortedSet<Text> splits = new TreeSet<>();
+    for (int i = 0; i < NUM_SPLITS; i++) {
+      splits.add(new Text(String.valueOf(i)));
+    }
+    return splits;
+  }
+
+  private Map<String,Integer> getTablets(final AccumuloClient client) {
+    Map<String,Integer> tablets = new HashMap<>();
+    try (Scanner scanner = client.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
+      scanner.fetchColumnFamily(TSERVER_ASSIGNED_TABLETS_COL_FAM);
+      Range range = new Range(null, false, RESERVED_PREFIX, false);
+      scanner.setRange(range);
+
+      for (Map.Entry<Key,Value> entry : scanner) {
+        String host = entry.getValue().toString();
+        if (tablets.containsKey(host)) {
+          tablets.put(host, tablets.get(host) + 1);
+        } else {
+          tablets.put(host, 1);
+        }
+      }
+    } catch (Exception e) {
+      LOG.error("Error occurred during scan:", e);
+    }
+    return tablets;
+  }
+}
diff --git a/src/main/java/org/apache/accumulo/testing/performance/tests/TableDeletionDuringSplitPT.java b/src/main/java/org/apache/accumulo/testing/performance/tests/TableDeletionDuringSplitPT.java
new file mode 100644
index 0000000..cfe943e
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/testing/performance/tests/TableDeletionDuringSplitPT.java
@@ -0,0 +1,168 @@
+package org.apache.accumulo.testing.performance.tests;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.LongAdder;
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.testing.performance.Environment;
+import org.apache.accumulo.testing.performance.PerformanceTest;
+import org.apache.accumulo.testing.performance.Report;
+import org.apache.accumulo.testing.performance.SystemConfiguration;
+import org.apache.hadoop.io.Text;
+
+public class TableDeletionDuringSplitPT implements PerformanceTest {
+
+  private static final int NUM_BATCHES = 12;
+  private static final int BATCH_SIZE = 8;
+  private static final int MAX_THREADS = BATCH_SIZE + 2;
+  private static final int NUM_TABLES = NUM_BATCHES * BATCH_SIZE;
+  private static final int NUM_SPLITS = 100;
+  private static final int HALF_SECOND = 500;
+  private static final String BASE_TABLE_NAME = "tableDeletionDuringSplit";
+  private static final String THREAD_NAME = "concurrent-api-requests";
+
+  @Override
+  public SystemConfiguration getSystemConfig() {
+    return new SystemConfiguration();
+  }
+
+  @Override
+  public Report runTest(final Environment env) throws Exception {
+    Report.Builder reportBuilder = Report.builder().id("TableDeletionDuringSplit")
+        .description("Evaluates the performance of deleting tables during split operations.")
+        .parameter("num_tables", NUM_TABLES, "The number of tables that will be created/deleted.")
+        .parameter("base_table_name", BASE_TABLE_NAME, "The base table name.")
+        .parameter("num_splits", NUM_SPLITS,
+            "The number of splits that will be added to each table.")
+        .parameter("base_thread_name", THREAD_NAME, "The thread name used for the thread pool");
+
+    AccumuloClient client = env.getClient();
+    String[] tableNames = getTableNames();
+    createTables(tableNames, client);
+    splitAndDeleteTables(tableNames, client, reportBuilder);
+
+    return reportBuilder.build();
+  }
+
+  private String[] getTableNames() {
+    String[] names = new String[NUM_TABLES];
+    for (int i = 0; i < NUM_TABLES; i++) {
+      names[i] = BASE_TABLE_NAME + i;
+    }
+    return names;
+  }
+
+  private void createTables(final String[] tableNames, final AccumuloClient client)
+      throws TableExistsException, AccumuloSecurityException, AccumuloException {
+    for (String tableName : tableNames) {
+      client.tableOperations().create(tableName);
+    }
+  }
+
+  private void splitAndDeleteTables(final String[] tableNames, final AccumuloClient client,
+      final Report.Builder reportBuilder) throws ExecutionException, InterruptedException {
+    final LongAdder deletionTimes = new LongAdder();
+    final AtomicInteger deletedTables = new AtomicInteger(0);
+    Iterator<Runnable> iter = getTasks(tableNames, client, deletionTimes, deletedTables).iterator();
+    ExecutorService pool = Executors.newFixedThreadPool(MAX_THREADS);
+
+    List<Future<?>> results = new ArrayList<>();
+    for (int batch = 0; batch < NUM_BATCHES; batch++) {
+      for (int i = 0; i < BATCH_SIZE; i++) {
+        results.add(pool.submit(iter.next()));
+        results.add(pool.submit(iter.next()));
+      }
+
+      for (Future<?> future : results) {
+        future.get();
+      }
+      results.clear();
+    }
+
+    List<Runnable> queued = pool.shutdownNow();
+
+    reportBuilder.result("remaining_pending_tasks", countRemaining(iter),
+        "The number of remaining pending tasks.");
+    reportBuilder.result("remaining_submitted_tasks", queued.size(),
+        "The number of remaining submitted tasks.");
+
+    long totalRemainingTables = Arrays.stream(tableNames)
+        .filter((name) -> client.tableOperations().exists(name)).count();
+    reportBuilder.result("total_remaining_tables", totalRemainingTables,
+        "The total number of unsuccessfully deleted tables.");
+    Long deletionTime = deletionTimes.sum() / deletedTables.get();
+    reportBuilder.result("avg_deletion_time", deletionTime,
+        "The average deletion time (in ms) to delete a table.");
+  }
+
+  private List<Runnable> getTasks(final String[] tableNames, final AccumuloClient client,
+      final LongAdder deletionTime, final AtomicInteger deletedTables) {
+    List<Runnable> tasks = new ArrayList<>();
+    final SortedSet<Text> splits = getSplits();
+    for (String tableName : tableNames) {
+      tasks.add(getSplitTask(tableName, client, splits));
+      tasks.add(getDeletionTask(tableName, client, deletionTime, deletedTables));
+    }
+    return tasks;
+  }
+
+  private SortedSet<Text> getSplits() {
+    SortedSet<Text> splits = new TreeSet<>();
+    for (byte i = 0; i < NUM_SPLITS; i++) {
+      splits.add(new Text(new byte[] {0, 0, i}));
+    }
+    return splits;
+  }
+
+  private Runnable getSplitTask(final String tableName, final AccumuloClient client,
+      final SortedSet<Text> splits) {
+    return () -> {
+      try {
+        client.tableOperations().addSplits(tableName, splits);
+      } catch (TableNotFoundException ex) {
+        // Expected, ignore.
+      } catch (Exception e) {
+        throw new RuntimeException(tableName, e);
+      }
+    };
+  }
+
+  private Runnable getDeletionTask(final String tableName, final AccumuloClient client,
+      final LongAdder timeAdder, final AtomicInteger deletedTables) {
+    return () -> {
+      try {
+        Thread.sleep(HALF_SECOND);
+        long start = System.currentTimeMillis();
+        client.tableOperations().delete(tableName);
+        long time = System.currentTimeMillis() - start;
+        timeAdder.add(time);
+        deletedTables.getAndIncrement();
+      } catch (Exception e) {
+        throw new RuntimeException(tableName, e);
+      }
+    };
+  }
+
+  private int countRemaining(final Iterator<?> i) {
+    int count = 0;
+    while (i.hasNext()) {
+      i.next();
+      count++;
+    }
+    return count;
+  }
+}