You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by do...@apache.org on 2023/05/30 15:52:54 UTC
[accumulo] branch elasticity updated: Add a concurrent splits test to SplitIT (#3415)
This is an automated email from the ASF dual-hosted git repository.
domgarguilo pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push:
new a432a81999 Add a concurrent splits test to SplitIT (#3415)
a432a81999 is described below
commit a432a8199923acf0eaaaf56dc893c235537c3729
Author: Dom G <do...@apache.org>
AuthorDate: Tue May 30 11:52:49 2023 -0400
Add a concurrent splits test to SplitIT (#3415)
* add split IT with concurrent splits
* fix potential bug in TestIngest.getSplitPoints
---------
Co-authored-by: Keith Turner <kt...@apache.org>
---
.../java/org/apache/accumulo/test/TestIngest.java | 8 +-
.../apache/accumulo/test/functional/SplitIT.java | 99 ++++++++++++++++++++++
2 files changed, 105 insertions(+), 2 deletions(-)
diff --git a/test/src/main/java/org/apache/accumulo/test/TestIngest.java b/test/src/main/java/org/apache/accumulo/test/TestIngest.java
index 99551e411c..101cbbf073 100644
--- a/test/src/main/java/org/apache/accumulo/test/TestIngest.java
+++ b/test/src/main/java/org/apache/accumulo/test/TestIngest.java
@@ -193,10 +193,14 @@ public class TestIngest {
public static TreeSet<Text> getSplitPoints(long start, long end, long numsplits) {
long splitSize = (end - start) / numsplits;
- long pos = start + splitSize;
-
TreeSet<Text> splits = new TreeSet<>();
+ if (splitSize < 1) {
+ return splits;
+ }
+
+ long pos = start + splitSize;
+
while (pos < end) {
splits.add(new Text(String.format("row_%010d", pos)));
pos += splitSize;
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java b/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java
index 3a342b78e8..33a93f40b6 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java
@@ -19,17 +19,26 @@
package org.apache.accumulo.test.functional;
import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.accumulo.test.VerifyIngest.verifyIngest;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assumptions.assumeTrue;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
@@ -37,15 +46,18 @@ import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.admin.InstanceOperations;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.client.admin.TabletHostingGoal;
import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
@@ -61,6 +73,8 @@ import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
+
public class SplitIT extends AccumuloClusterHarness {
private static final Logger log = LoggerFactory.getLogger(SplitIT.class);
@@ -242,4 +256,89 @@ public class SplitIT extends AccumuloClusterHarness {
assertEquals(splits1, new TreeSet<>(c.tableOperations().listSplits(tableName)));
}
}
+
+ @Test
+ public void concurrentSplit() throws Exception {
+ try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
+
+ final String tableName = getUniqueNames(1)[0];
+
+ log.debug("Creating table {}", tableName);
+ c.tableOperations().create(tableName);
+
+ final int numRows = 100_000;
+ log.debug("Ingesting {} rows into {}", numRows, tableName);
+ VerifyParams params = new VerifyParams(getClientProps(), tableName, numRows);
+ TestIngest.ingest(c, params);
+
+ log.debug("Verifying {} rows ingested into {}", numRows, tableName);
+ VerifyIngest.verifyIngest(c, params);
+
+ log.debug("Creating futures that add random splits to the table");
+ ExecutorService es = Executors.newFixedThreadPool(10);
+ final int totalFutures = 100;
+ final int splitsPerFuture = 4;
+ final Set<Text> totalSplits = new HashSet<>();
+ List<Callable<Void>> tasks = new ArrayList<>(totalFutures);
+ for (int i = 0; i < totalFutures; i++) {
+ final Pair<Integer,Integer> splitBounds = getRandomSplitBounds(numRows);
+ final TreeSet<Text> splits = TestIngest.getSplitPoints(splitBounds.getFirst().longValue(),
+ splitBounds.getSecond().longValue(), splitsPerFuture);
+ totalSplits.addAll(splits);
+ tasks.add(() -> {
+ c.tableOperations().addSplits(tableName, splits);
+ return null;
+ });
+ }
+
+ log.debug("Submitting futures");
+ List<Future<Void>> futures =
+ tasks.parallelStream().map(es::submit).collect(Collectors.toList());
+
+ log.debug("Waiting for futures to complete");
+ for (Future<?> f : futures) {
+ f.get();
+ }
+ es.shutdown();
+
+ log.debug("Checking that {} splits were created ", totalSplits.size());
+
+ assertEquals(totalSplits, new HashSet<>(c.tableOperations().listSplits(tableName)),
+ "Did not see expected splits");
+
+ // ELASTICITY_TODO the following could be removed after #3309. Currently scanning an ondemand
+ // table with lots of tablets will cause the test to timeout.
+ c.tableOperations().setTabletHostingGoal(tableName, new Range(), TabletHostingGoal.ALWAYS);
+
+ log.debug("Verifying {} rows ingested into {}", numRows, tableName);
+ VerifyIngest.verifyIngest(c, params);
+ }
+ }
+
+ /**
+ * Generates a pair of integers that represent the start and end of a range of splits. The start
+ * and end are randomly generated between 0 and upperBound. The start is guaranteed to be less
+ * than the end and the two bounds are guaranteed to be different values.
+ *
+ * @param upperBound the upper bound of the range of splits
+ * @return a pair of integers that represent the start and end of a range of splits
+ */
+ private Pair<Integer,Integer> getRandomSplitBounds(int upperBound) {
+ Preconditions.checkArgument(upperBound > 1, "upperBound must be greater than 1");
+
+ int start = random.nextInt(upperBound);
+ int end = random.nextInt(upperBound - 1);
+
+ // ensure start is less than end and that end is not equal to start
+ if (end >= start) {
+ end += 1;
+ } else {
+ int tmp = start;
+ start = end;
+ end = tmp;
+ }
+
+ return new Pair<>(start, end);
+ }
+
}