You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by do...@apache.org on 2023/05/30 15:52:54 UTC

[accumulo] branch elasticity updated: Add a concurrent splits test to SplitIT (#3415)

This is an automated email from the ASF dual-hosted git repository.

domgarguilo pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new a432a81999 Add a concurrent splits test to SplitIT (#3415)
a432a81999 is described below

commit a432a8199923acf0eaaaf56dc893c235537c3729
Author: Dom G <do...@apache.org>
AuthorDate: Tue May 30 11:52:49 2023 -0400

    Add a concurrent splits test to SplitIT (#3415)
    
    * add split IT with concurrent splits
    * fix potential bug in TestIngest.getSplitPoints
    
    ---------
    
    Co-authored-by: Keith Turner <kt...@apache.org>
---
 .../java/org/apache/accumulo/test/TestIngest.java  |  8 +-
 .../apache/accumulo/test/functional/SplitIT.java   | 99 ++++++++++++++++++++++
 2 files changed, 105 insertions(+), 2 deletions(-)

diff --git a/test/src/main/java/org/apache/accumulo/test/TestIngest.java b/test/src/main/java/org/apache/accumulo/test/TestIngest.java
index 99551e411c..101cbbf073 100644
--- a/test/src/main/java/org/apache/accumulo/test/TestIngest.java
+++ b/test/src/main/java/org/apache/accumulo/test/TestIngest.java
@@ -193,10 +193,14 @@ public class TestIngest {
   public static TreeSet<Text> getSplitPoints(long start, long end, long numsplits) {
     long splitSize = (end - start) / numsplits;
 
-    long pos = start + splitSize;
-
     TreeSet<Text> splits = new TreeSet<>();
 
+    if (splitSize < 1) {
+      return splits;
+    }
+
+    long pos = start + splitSize;
+
     while (pos < end) {
       splits.add(new Text(String.format("row_%010d", pos)));
       pos += splitSize;
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java b/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java
index 3a342b78e8..33a93f40b6 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java
@@ -19,17 +19,26 @@
 package org.apache.accumulo.test.functional;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.accumulo.test.VerifyIngest.verifyIngest;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assumptions.assumeTrue;
 
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
 
 import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.AccumuloClient;
@@ -37,15 +46,18 @@ import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.admin.InstanceOperations;
 import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.client.admin.TabletHostingGoal;
 import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
 import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.harness.AccumuloClusterHarness;
 import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
@@ -61,6 +73,8 @@ import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
+
 public class SplitIT extends AccumuloClusterHarness {
   private static final Logger log = LoggerFactory.getLogger(SplitIT.class);
 
@@ -242,4 +256,89 @@ public class SplitIT extends AccumuloClusterHarness {
       assertEquals(splits1, new TreeSet<>(c.tableOperations().listSplits(tableName)));
     }
   }
+
+  @Test
+  public void concurrentSplit() throws Exception {
+    try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
+
+      final String tableName = getUniqueNames(1)[0];
+
+      log.debug("Creating table {}", tableName);
+      c.tableOperations().create(tableName);
+
+      final int numRows = 100_000;
+      log.debug("Ingesting {} rows into {}", numRows, tableName);
+      VerifyParams params = new VerifyParams(getClientProps(), tableName, numRows);
+      TestIngest.ingest(c, params);
+
+      log.debug("Verifying {} rows ingested into {}", numRows, tableName);
+      VerifyIngest.verifyIngest(c, params);
+
+      log.debug("Creating futures that add random splits to the table");
+      ExecutorService es = Executors.newFixedThreadPool(10);
+      final int totalFutures = 100;
+      final int splitsPerFuture = 4;
+      final Set<Text> totalSplits = new HashSet<>();
+      List<Callable<Void>> tasks = new ArrayList<>(totalFutures);
+      for (int i = 0; i < totalFutures; i++) {
+        final Pair<Integer,Integer> splitBounds = getRandomSplitBounds(numRows);
+        final TreeSet<Text> splits = TestIngest.getSplitPoints(splitBounds.getFirst().longValue(),
+            splitBounds.getSecond().longValue(), splitsPerFuture);
+        totalSplits.addAll(splits);
+        tasks.add(() -> {
+          c.tableOperations().addSplits(tableName, splits);
+          return null;
+        });
+      }
+
+      log.debug("Submitting futures");
+      List<Future<Void>> futures =
+          tasks.parallelStream().map(es::submit).collect(Collectors.toList());
+
+      log.debug("Waiting for futures to complete");
+      for (Future<?> f : futures) {
+        f.get();
+      }
+      es.shutdown();
+
+      log.debug("Checking that {} splits were created ", totalSplits.size());
+
+      assertEquals(totalSplits, new HashSet<>(c.tableOperations().listSplits(tableName)),
+          "Did not see expected splits");
+
+      // ELASTICITY_TODO the following could be removed after #3309. Currently scanning an ondemand
+      // table with lots of tablets will cause the test to timeout.
+      c.tableOperations().setTabletHostingGoal(tableName, new Range(), TabletHostingGoal.ALWAYS);
+
+      log.debug("Verifying {} rows ingested into {}", numRows, tableName);
+      VerifyIngest.verifyIngest(c, params);
+    }
+  }
+
+  /**
+   * Generates a pair of integers that represent the start and end of a range of splits. The start
+   * and end are randomly generated between 0 and upperBound. The start is guaranteed to be less
+   * than the end and the two bounds are guaranteed to be different values.
+   *
+   * @param upperBound the upper bound of the range of splits
+   * @return a pair of integers that represent the start and end of a range of splits
+   */
+  private Pair<Integer,Integer> getRandomSplitBounds(int upperBound) {
+    Preconditions.checkArgument(upperBound > 1, "upperBound must be greater than 1");
+
+    int start = random.nextInt(upperBound);
+    int end = random.nextInt(upperBound - 1);
+
+    // ensure start is less than end and that end is not equal to start
+    if (end >= start) {
+      end += 1;
+    } else {
+      int tmp = start;
+      start = end;
+      end = tmp;
+    }
+
+    return new Pair<>(start, end);
+  }
+
 }