You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mm...@apache.org on 2019/09/17 15:45:30 UTC
[accumulo-testing] branch master updated: Move performance tests to
accumulo-testing #1200 (#90)
This is an automated email from the ASF dual-hosted git repository.
mmiller pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo-testing.git
The following commit(s) were added to refs/heads/master by this push:
new 3579908 Move performance tests to accumulo-testing #1200 (#90)
3579908 is described below
commit 3579908e459b957059d40447425b91887122c2b6
Author: Laura Schanno <lb...@gmail.com>
AuthorDate: Tue Sep 17 11:45:25 2019 -0400
Move performance tests to accumulo-testing #1200 (#90)
* Move ManySplitIT as HighSplitCreationPT.
* Move BalanceFasterIT as SplitBalancingPT.
* Move DeleteTableDuringSplitIT as TableDeletionDuringSplitPT.
* Move RollWALPerformanceIT as RollWALPT.
* Use Result.result() instead of Result.parameter() when reporting
resulting values so that they will be put in the appropriate section
when the JSON files are written.
Related: #1200
---
.../performance/tests/HighSplitCreationPT.java | 69 +++++++++
.../testing/performance/tests/RollWALPT.java | 155 +++++++++++++++++++
.../performance/tests/SplitBalancingPT.java | 101 +++++++++++++
.../tests/TableDeletionDuringSplitPT.java | 168 +++++++++++++++++++++
4 files changed, 493 insertions(+)
diff --git a/src/main/java/org/apache/accumulo/testing/performance/tests/HighSplitCreationPT.java b/src/main/java/org/apache/accumulo/testing/performance/tests/HighSplitCreationPT.java
new file mode 100644
index 0000000..fe15fba
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/testing/performance/tests/HighSplitCreationPT.java
@@ -0,0 +1,69 @@
+package org.apache.accumulo.testing.performance.tests;
+
+import java.nio.charset.StandardCharsets;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.testing.performance.Environment;
+import org.apache.accumulo.testing.performance.PerformanceTest;
+import org.apache.accumulo.testing.performance.Report;
+import org.apache.accumulo.testing.performance.SystemConfiguration;
+import org.apache.hadoop.io.Text;
+
+public class HighSplitCreationPT implements PerformanceTest {
+
+ private static final int NUM_SPLITS = 10_000;
+ private static final int MIN_REQUIRED_SPLITS_PER_SECOND = 100;
+ private static final int ONE_SECOND = 1000;
+ private static final String TABLE_NAME = "highSplitCreation";
+ private static final String METADATA_TABLE_SPLITS = "123456789abcde";
+
+ @Override
+ public SystemConfiguration getSystemConfig() {
+ return new SystemConfiguration();
+ }
+
+ @Override
+ public Report runTest(final Environment env) throws Exception {
+ Report.Builder reportBuilder = Report.builder().id("high_split_creation")
+ .description("Evaluate the speed of creating many splits.")
+ .parameter("table_name", TABLE_NAME, "The name of the test table")
+ .parameter("num_splits", NUM_SPLITS, "The high number of splits to add.")
+ .parameter("min_required_splits_per_second", MIN_REQUIRED_SPLITS_PER_SECOND,
+ "The minimum average number of splits that must be created per second before performance is considered too slow.");
+
+ AccumuloClient client = env.getClient();
+ client.tableOperations().create(TABLE_NAME);
+ client.tableOperations().addSplits(MetadataTable.NAME, getMetadataTableSplits());
+
+ SortedSet<Text> splits = getTestTableSplits();
+
+ long start = System.currentTimeMillis();
+ client.tableOperations().addSplits(TABLE_NAME, splits);
+ long totalTime = System.currentTimeMillis() - start;
+ double splitsPerSecond = NUM_SPLITS / (totalTime / ONE_SECOND);
+
+ reportBuilder.result("splits_per_second", splitsPerSecond,
+ "The average number of splits created per second.");
+
+ return reportBuilder.build();
+ }
+
+ private SortedSet<Text> getMetadataTableSplits() {
+ SortedSet<Text> splits = new TreeSet<>();
+ for (byte b : METADATA_TABLE_SPLITS.getBytes(StandardCharsets.UTF_8)) {
+ splits.add(new Text(new byte[] {'1', ';', b}));
+ }
+ return splits;
+ }
+
+ private SortedSet<Text> getTestTableSplits() {
+ SortedSet<Text> splits = new TreeSet<>();
+ for (int i = 0; i < NUM_SPLITS; i++) {
+ splits.add(new Text(Integer.toHexString(i)));
+ }
+ return splits;
+ }
+}
diff --git a/src/main/java/org/apache/accumulo/testing/performance/tests/RollWALPT.java b/src/main/java/org/apache/accumulo/testing/performance/tests/RollWALPT.java
new file mode 100644
index 0000000..bae6cf5
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/testing/performance/tests/RollWALPT.java
@@ -0,0 +1,155 @@
+package org.apache.accumulo.testing.performance.tests;
+
+import java.security.SecureRandom;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.UUID;
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.testing.performance.Environment;
+import org.apache.accumulo.testing.performance.PerformanceTest;
+import org.apache.accumulo.testing.performance.Report;
+import org.apache.accumulo.testing.performance.SystemConfiguration;
+import org.apache.hadoop.io.Text;
+
+public class RollWALPT implements PerformanceTest {
+
+ private static final String TABLE_SMALL_WAL = "SmallRollWAL";
+ private static final String TABLE_LARGE_WAL = "LargeRollWAL";
+ private static final String SIZE_SMALL_WAL = "5M";
+ private static final String SIZE_LARGE_WAL = "1G";
+ private static final long NUM_SPLITS = 100L;
+ private static final long SPLIT_DISTANCE = Long.MAX_VALUE / NUM_SPLITS;
+ private static final int NUM_ENTRIES = 50_000;
+
+ private final SecureRandom random = new SecureRandom();
+
+ @Override
+ public SystemConfiguration getSystemConfig() {
+ Map<String,String> config = new HashMap<>();
+
+ config.put(Property.TSERV_WAL_REPLICATION.getKey(), "1");
+ config.put(Property.TSERV_WALOG_MAX_REFERENCED.getKey(), "100");
+ config.put(Property.GC_CYCLE_START.getKey(), "1s");
+ config.put(Property.GC_CYCLE_DELAY.getKey(), "1s");
+
+ return new SystemConfiguration().setAccumuloConfig(config);
+ }
+
+ @Override
+ public Report runTest(final Environment env) throws Exception {
+ Report.Builder reportBuilder = Report.builder().id("rollWAL").description(
+ "Evaluate the performance of ingesting a large number of entries across numerous splits given both a small and large maximum WAL size.")
+ .parameter("small_wal_table", TABLE_SMALL_WAL,
+ "The name of the table used for evaluating performance with a small WAL.")
+ .parameter("large_wal_table", TABLE_LARGE_WAL,
+ "The name of the table used for evaluating performance with a large WAL.")
+ .parameter("num_splits", NUM_SPLITS,
+ "The number of splits that will be added to the tables.")
+ .parameter("split_distance", SPLIT_DISTANCE, "The distance between each split.")
+ .parameter("num_entries", NUM_ENTRIES,
+ "The number of entries that will be written to the tables.")
+ .parameter("small_wal_size", SIZE_SMALL_WAL,
+ "The size of the small WAL used to force many rollovers.")
+ .parameter("large_wal_size", SIZE_LARGE_WAL,
+ "The size of the large WAL used to avoid many rollovers");
+
+ AccumuloClient client = env.getClient();
+ final long smallWALTime = evalSmallWAL(client);
+ reportBuilder.result("small_wal_write_time", smallWALTime,
+ "The time (in ns) it took to write entries to the table with a small WAL of "
+ + SIZE_SMALL_WAL);
+
+ final long largeWALTime = evalLargeWAL(client);
+ reportBuilder.result("large_wal_write_time", largeWALTime,
+ "The time (in ns) it took to write entries to the table with a large WAL of "
+ + SIZE_LARGE_WAL);
+ return reportBuilder.build();
+ }
+
+ private long evalSmallWAL(final AccumuloClient client) throws AccumuloSecurityException,
+ AccumuloException, TableExistsException, TableNotFoundException {
+ setMaxWALSize(SIZE_SMALL_WAL, client);
+ initTable(TABLE_SMALL_WAL, client);
+ return getTimeToWriteEntries(TABLE_SMALL_WAL, client);
+ }
+
+ private long evalLargeWAL(final AccumuloClient client) throws AccumuloSecurityException,
+ AccumuloException, TableExistsException, TableNotFoundException {
+ setMaxWALSize(SIZE_LARGE_WAL, client);
+ initTable(TABLE_LARGE_WAL, client);
+ return getTimeToWriteEntries(TABLE_LARGE_WAL, client);
+ }
+
+ private void setMaxWALSize(final String size, final AccumuloClient client)
+ throws AccumuloSecurityException, AccumuloException {
+ client.instanceOperations().setProperty(Property.TSERV_WALOG_MAX_SIZE.getKey(), size);
+ }
+
+ private void initTable(final String tableName, final AccumuloClient client)
+ throws AccumuloSecurityException, TableNotFoundException, AccumuloException,
+ TableExistsException {
+ client.tableOperations().create(tableName);
+ client.tableOperations().addSplits(tableName, getSplits());
+ client.instanceOperations().waitForBalance();
+ }
+
+ private SortedSet<Text> getSplits() {
+ SortedSet<Text> splits = new TreeSet<>();
+ for (long i = 0L; i < NUM_SPLITS; i++) {
+ splits.add(new Text(String.format("%016x", i + SPLIT_DISTANCE)));
+ }
+ return splits;
+ }
+
+ private long getTimeToWriteEntries(final String tableName, final AccumuloClient client)
+ throws TableNotFoundException, MutationsRejectedException {
+ long start = System.nanoTime();
+ writeEntries(tableName, client);
+ return System.nanoTime() - start;
+ }
+
+ private void writeEntries(final String tableName, final AccumuloClient client)
+ throws TableNotFoundException, MutationsRejectedException {
+ BatchWriter bw = client.createBatchWriter(tableName);
+
+ String instanceId = UUID.randomUUID().toString();
+ final ColumnVisibility cv = new ColumnVisibility();
+ for (int i = 0; i < NUM_ENTRIES; i++) {
+ String value = instanceId + i;
+ Mutation m = genMutation(cv, value);
+ bw.addMutation(m);
+ }
+
+ bw.close();
+ }
+
+ private Mutation genMutation(final ColumnVisibility colVis, final String value) {
+ byte[] rowStr = toZeroPaddedString(getRandomLong(), 16);
+ byte[] colFamStr = toZeroPaddedString(random.nextInt(Short.MAX_VALUE), 4);
+ byte[] colQualStr = toZeroPaddedString(random.nextInt(Short.MAX_VALUE), 4);
+ Mutation mutation = new Mutation(new Text(rowStr));
+ mutation.put(new Text(colFamStr), new Text(colQualStr), colVis, new Value(value));
+ return mutation;
+ }
+
+ private long getRandomLong() {
+ return ((random.nextLong() & 0x7fffffffffffffffL) % (Long.MAX_VALUE));
+ }
+
+ private byte[] toZeroPaddedString(long num, int width) {
+ return new byte[Math.max(Long.toString(num, 16).length(), width)];
+ }
+}
diff --git a/src/main/java/org/apache/accumulo/testing/performance/tests/SplitBalancingPT.java b/src/main/java/org/apache/accumulo/testing/performance/tests/SplitBalancingPT.java
new file mode 100644
index 0000000..22a3dbd
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/testing/performance/tests/SplitBalancingPT.java
@@ -0,0 +1,101 @@
+package org.apache.accumulo.testing.performance.tests;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.testing.performance.Environment;
+import org.apache.accumulo.testing.performance.PerformanceTest;
+import org.apache.accumulo.testing.performance.Report;
+import org.apache.accumulo.testing.performance.SystemConfiguration;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SplitBalancingPT implements PerformanceTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SplitBalancingPT.class);
+
+ private static final String TABLE_NAME = "splitBalancing";
+ private static final String RESERVED_PREFIX = "~";
+ private static final int NUM_SPLITS = 1_000;
+ private static final int MARGIN = 3;
+ private static final Text TSERVER_ASSIGNED_TABLETS_COL_FAM = new Text("loc");
+
+ @Override
+ public SystemConfiguration getSystemConfig() {
+ return new SystemConfiguration();
+ }
+
+ @Override
+ public Report runTest(final Environment env) throws Exception {
+ AccumuloClient client = env.getClient();
+ client.tableOperations().create(TABLE_NAME);
+ client.tableOperations().addSplits(TABLE_NAME, getSplits());
+ client.instanceOperations().waitForBalance();
+
+ int totalTabletServers = client.instanceOperations().getTabletServers().size();
+ int expectedAllocation = NUM_SPLITS / totalTabletServers;
+ int min = expectedAllocation - MARGIN;
+ int max = expectedAllocation + MARGIN;
+
+ Report.Builder reportBuilder = Report.builder().id("split_balancing").description(
+ "Evaluate and verify that when a high number of splits are created, that the tablets are balanced equally among tablet servers.")
+ .parameter("num_splits", NUM_SPLITS, "The number of splits")
+ .parameter("num_tservers", totalTabletServers, "The number of tablet servers")
+ .parameter("tserver_min", min,
+ "The minimum number of tablets that should be assigned to a tablet server.")
+ .parameter("tserver_max", max,
+ "The maximum number of tablets that should be assigned to a tablet server.");
+
+ boolean allServersBalanced = true;
+ Map<String,Integer> tablets = getTablets(client);
+ for (String tabletServer : tablets.keySet()) {
+ int count = tablets.get(tabletServer);
+ boolean balanced = count >= min && count <= max;
+ allServersBalanced = allServersBalanced & balanced;
+
+ reportBuilder.result("size_tserver_" + tabletServer, count,
+ "Total tablets assigned to tablet server " + tabletServer);
+ }
+
+ return reportBuilder.build();
+ }
+
+ private SortedSet<Text> getSplits() {
+ SortedSet<Text> splits = new TreeSet<>();
+ for (int i = 0; i < NUM_SPLITS; i++) {
+ splits.add(new Text(String.valueOf(i)));
+ }
+ return splits;
+ }
+
+ private Map<String,Integer> getTablets(final AccumuloClient client) {
+ Map<String,Integer> tablets = new HashMap<>();
+ try (Scanner scanner = client.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
+ scanner.fetchColumnFamily(TSERVER_ASSIGNED_TABLETS_COL_FAM);
+ Range range = new Range(null, false, RESERVED_PREFIX, false);
+ scanner.setRange(range);
+
+ for (Map.Entry<Key,Value> entry : scanner) {
+ String host = entry.getValue().toString();
+ if (tablets.containsKey(host)) {
+ tablets.put(host, tablets.get(host) + 1);
+ } else {
+ tablets.put(host, 1);
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Error occurred during scan:", e);
+ }
+ return tablets;
+ }
+}
diff --git a/src/main/java/org/apache/accumulo/testing/performance/tests/TableDeletionDuringSplitPT.java b/src/main/java/org/apache/accumulo/testing/performance/tests/TableDeletionDuringSplitPT.java
new file mode 100644
index 0000000..cfe943e
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/testing/performance/tests/TableDeletionDuringSplitPT.java
@@ -0,0 +1,168 @@
+package org.apache.accumulo.testing.performance.tests;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.LongAdder;
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.testing.performance.Environment;
+import org.apache.accumulo.testing.performance.PerformanceTest;
+import org.apache.accumulo.testing.performance.Report;
+import org.apache.accumulo.testing.performance.SystemConfiguration;
+import org.apache.hadoop.io.Text;
+
+public class TableDeletionDuringSplitPT implements PerformanceTest {
+
+ private static final int NUM_BATCHES = 12;
+ private static final int BATCH_SIZE = 8;
+ private static final int MAX_THREADS = BATCH_SIZE + 2;
+ private static final int NUM_TABLES = NUM_BATCHES * BATCH_SIZE;
+ private static final int NUM_SPLITS = 100;
+ private static final int HALF_SECOND = 500;
+ private static final String BASE_TABLE_NAME = "tableDeletionDuringSplit";
+ private static final String THREAD_NAME = "concurrent-api-requests";
+
+ @Override
+ public SystemConfiguration getSystemConfig() {
+ return new SystemConfiguration();
+ }
+
+ @Override
+ public Report runTest(final Environment env) throws Exception {
+ Report.Builder reportBuilder = Report.builder().id("TableDeletionDuringSplit")
+ .description("Evaluates the performance of deleting tables during split operations.")
+ .parameter("num_tables", NUM_TABLES, "The number of tables that will be created/deleted.")
+ .parameter("base_table_name", BASE_TABLE_NAME, "The base table name.")
+ .parameter("num_splits", NUM_SPLITS,
+ "The number of splits that will be added to each table.")
+ .parameter("base_thread_name", THREAD_NAME, "The thread name used for the thread pool");
+
+ AccumuloClient client = env.getClient();
+ String[] tableNames = getTableNames();
+ createTables(tableNames, client);
+ splitAndDeleteTables(tableNames, client, reportBuilder);
+
+ return reportBuilder.build();
+ }
+
+ private String[] getTableNames() {
+ String[] names = new String[NUM_TABLES];
+ for (int i = 0; i < NUM_TABLES; i++) {
+ names[i] = BASE_TABLE_NAME + i;
+ }
+ return names;
+ }
+
+ private void createTables(final String[] tableNames, final AccumuloClient client)
+ throws TableExistsException, AccumuloSecurityException, AccumuloException {
+ for (String tableName : tableNames) {
+ client.tableOperations().create(tableName);
+ }
+ }
+
+ private void splitAndDeleteTables(final String[] tableNames, final AccumuloClient client,
+ final Report.Builder reportBuilder) throws ExecutionException, InterruptedException {
+ final LongAdder deletionTimes = new LongAdder();
+ final AtomicInteger deletedTables = new AtomicInteger(0);
+ Iterator<Runnable> iter = getTasks(tableNames, client, deletionTimes, deletedTables).iterator();
+ ExecutorService pool = Executors.newFixedThreadPool(MAX_THREADS);
+
+ List<Future<?>> results = new ArrayList<>();
+ for (int batch = 0; batch < NUM_BATCHES; batch++) {
+ for (int i = 0; i < BATCH_SIZE; i++) {
+ results.add(pool.submit(iter.next()));
+ results.add(pool.submit(iter.next()));
+ }
+
+ for (Future<?> future : results) {
+ future.get();
+ }
+ results.clear();
+ }
+
+ List<Runnable> queued = pool.shutdownNow();
+
+ reportBuilder.result("remaining_pending_tasks", countRemaining(iter),
+ "The number of remaining pending tasks.");
+ reportBuilder.result("remaining_submitted_tasks", queued.size(),
+ "The number of remaining submitted tasks.");
+
+ long totalRemainingTables = Arrays.stream(tableNames)
+ .filter((name) -> client.tableOperations().exists(name)).count();
+ reportBuilder.result("total_remaining_tables", totalRemainingTables,
+ "The total number of unsuccessfully deleted tables.");
+ Long deletionTime = deletionTimes.sum() / deletedTables.get();
+ reportBuilder.result("avg_deletion_time", deletionTime,
+ "The average deletion time (in ms) to delete a table.");
+ }
+
+ private List<Runnable> getTasks(final String[] tableNames, final AccumuloClient client,
+ final LongAdder deletionTime, final AtomicInteger deletedTables) {
+ List<Runnable> tasks = new ArrayList<>();
+ final SortedSet<Text> splits = getSplits();
+ for (String tableName : tableNames) {
+ tasks.add(getSplitTask(tableName, client, splits));
+ tasks.add(getDeletionTask(tableName, client, deletionTime, deletedTables));
+ }
+ return tasks;
+ }
+
+ private SortedSet<Text> getSplits() {
+ SortedSet<Text> splits = new TreeSet<>();
+ for (byte i = 0; i < NUM_SPLITS; i++) {
+ splits.add(new Text(new byte[] {0, 0, i}));
+ }
+ return splits;
+ }
+
+ private Runnable getSplitTask(final String tableName, final AccumuloClient client,
+ final SortedSet<Text> splits) {
+ return () -> {
+ try {
+ client.tableOperations().addSplits(tableName, splits);
+ } catch (TableNotFoundException ex) {
+ // Expected, ignore.
+ } catch (Exception e) {
+ throw new RuntimeException(tableName, e);
+ }
+ };
+ }
+
+ private Runnable getDeletionTask(final String tableName, final AccumuloClient client,
+ final LongAdder timeAdder, final AtomicInteger deletedTables) {
+ return () -> {
+ try {
+ Thread.sleep(HALF_SECOND);
+ long start = System.currentTimeMillis();
+ client.tableOperations().delete(tableName);
+ long time = System.currentTimeMillis() - start;
+ timeAdder.add(time);
+ deletedTables.getAndIncrement();
+ } catch (Exception e) {
+ throw new RuntimeException(tableName, e);
+ }
+ };
+ }
+
+ private int countRemaining(final Iterator<?> i) {
+ int count = 0;
+ while (i.hasNext()) {
+ i.next();
+ count++;
+ }
+ return count;
+ }
+}