You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by dl...@apache.org on 2023/05/15 14:06:53 UTC

[accumulo] branch elasticity updated: Added ScanServer tests for different tablet hosting goals (#3388)

This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new 770d949bfd Added ScanServer tests for different tablet hosting goals (#3388)
770d949bfd is described below

commit 770d949bfdc592a641f406cf3e73460ff4d34474
Author: Dave Marion <dl...@apache.org>
AuthorDate: Mon May 15 10:06:47 2023 -0400

    Added ScanServer tests for different tablet hosting goals (#3388)
    
    
    Co-authored-by: DomGarguilo <do...@gmail.com>
---
 .../org/apache/accumulo/test/ScanServerIT.java     | 113 +++++++++++++++++++++
 .../accumulo/test/ScanServerIT_NoServers.java      |  63 ++++++++++++
 2 files changed, 176 insertions(+)

diff --git a/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java b/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java
index 7576413129..b68f311871 100644
--- a/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java
@@ -24,10 +24,16 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Objects;
 import java.util.Properties;
+import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.Accumulo;
@@ -38,10 +44,13 @@ import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel;
 import org.apache.accumulo.core.client.TimedOutException;
 import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.client.admin.TabletHostingGoal;
 import org.apache.accumulo.core.conf.ClientProperty;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
 import org.apache.accumulo.harness.SharedMiniClusterBase;
@@ -49,6 +58,8 @@ import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
 import org.apache.accumulo.test.functional.ReadWriteIT;
 import org.apache.accumulo.test.functional.SlowIterator;
+import org.apache.accumulo.test.util.Wait;
+import org.apache.hadoop.io.Text;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Disabled;
@@ -74,6 +85,10 @@ public class ScanServerIT extends SharedMiniClusterBase {
       // Configure the scan server to only have 1 scan executor thread. This means
       // that the scan server will run scans serially, not concurrently.
       cfg.setProperty(Property.SSERV_SCAN_EXECUTORS_DEFAULT_THREADS, "1");
+
+      cfg.setProperty(Property.MANAGER_TABLET_GROUP_WATCHER_INTERVAL, "5");
+      cfg.setProperty(Property.TSERV_ONDEMAND_UNLOADER_INTERVAL, "10");
+      cfg.setProperty("table.custom.ondemand.unloader.inactivity.threshold.seconds", "15");
     }
   }
 
@@ -205,6 +220,96 @@ public class ScanServerIT extends SharedMiniClusterBase {
     }
   }
 
+  @Test
+  public void testScanWithTabletHostingMix() throws Exception {
+    try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
+      String tableName = getUniqueNames(1)[0];
+
+      final int ingestedEntryCount = setupTableWithHostingMix(client, tableName);
+
+      try (Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY)) {
+        scanner.setRange(new Range());
+        scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+        assertEquals(ingestedEntryCount, Iterables.size(scanner),
+            "The scan server scanner should have seen all ingested and flushed entries");
+        // Throws an exception because of the tablets with the NEVER hosting goal
+        scanner.setConsistencyLevel(ConsistencyLevel.IMMEDIATE);
+        assertThrows(RuntimeException.class, () -> Iterables.size(scanner));
+
+        // Test that hosted ranges work
+        scanner.setRange(new Range(null, "row_0000000003"));
+        assertEquals(40, Iterables.size(scanner));
+
+        scanner.setRange(new Range("row_0000000008", null));
+        assertEquals(20, Iterables.size(scanner));
+      } // when the scanner is closed, all open sessions should be closed
+    }
+  }
+
+  @Test
+  public void testBatchScanWithTabletHostingMix() throws Exception {
+    try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
+      String tableName = getUniqueNames(1)[0];
+
+      final int ingestedEntryCount = setupTableWithHostingMix(client, tableName);
+
+      try (BatchScanner scanner = client.createBatchScanner(tableName, Authorizations.EMPTY)) {
+        scanner.setRanges(Collections.singleton(new Range()));
+        scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+        assertEquals(ingestedEntryCount, Iterables.size(scanner),
+            "The scan server scanner should have seen all ingested and flushed entries");
+        // Throws an exception because of the tablets with the NEVER hosting goal
+        scanner.setConsistencyLevel(ConsistencyLevel.IMMEDIATE);
+        assertThrows(RuntimeException.class, () -> Iterables.size(scanner));
+
+        // Test that hosted ranges work
+        Collection<Range> ranges = new ArrayList<>();
+        ranges.add(new Range(null, "row_0000000003"));
+        ranges.add(new Range("row_0000000008", null));
+        scanner.setRanges(ranges);
+        assertEquals(60, Iterables.size(scanner));
+      } // when the scanner is closed, all open sessions should be closed
+    }
+  }
+
+  /**
+   * Sets up a table with a mix of tablet hosting goals. Specific ranges of rows are set to ALWAYS,
+   * NEVER, and ONDEMAND hosting goals. The method waits for the NEVER and ONDEMAND tablets to be
+   * unloaded due to inactivity before returning.
+   *
+   * @param client The AccumuloClient to use for the operation
+   * @param tableName The name of the table to be created and set up
+   * @return The count of ingested entries
+   */
+  protected static int setupTableWithHostingMix(AccumuloClient client, String tableName)
+      throws Exception {
+    SortedSet<Text> splits =
+        IntStream.rangeClosed(1, 9).mapToObj(i -> new Text("row_000000000" + i))
+            .collect(Collectors.toCollection(TreeSet::new));
+
+    NewTableConfiguration ntc = new NewTableConfiguration();
+    ntc.withSplits(splits);
+    ntc.withInitialHostingGoal(TabletHostingGoal.ALWAYS); // speed up ingest
+    final int ingestedEntryCount = createTableAndIngest(client, tableName, ntc, 10, 10, "colf");
+
+    String tableId = client.tableOperations().tableIdMap().get(tableName);
+
+    // row 1 -> 3 are always
+    client.tableOperations().setTabletHostingGoal(tableName,
+        new Range(null, true, "row_0000000003", true), TabletHostingGoal.ALWAYS);
+    // row 4 -> 7 are never
+    client.tableOperations().setTabletHostingGoal(tableName,
+        new Range("row_0000000004", true, "row_0000000007", true), TabletHostingGoal.NEVER);
+    // row 8 and 9 are ondemand
+    client.tableOperations().setTabletHostingGoal(tableName,
+        new Range("row_0000000008", true, null, true), TabletHostingGoal.ONDEMAND);
+
+    // Wait for the NEVER and ONDEMAND tablets to be unloaded due to inactivity
+    Wait.waitFor(() -> ScanServerIT.getNumHostedTablets(client, tableId) == 3, 30_000, 1_000);
+
+    return ingestedEntryCount;
+  }
+
   /**
    * Create a table with the given name and the given client. Then, ingest into the table using
    * {@link #ingest(AccumuloClient, String, int, int, int, String, boolean)}
@@ -253,4 +358,12 @@ public class ScanServerIT extends SharedMiniClusterBase {
 
     return ingestedEntriesCount;
   }
+
+  protected static int getNumHostedTablets(AccumuloClient client, String tableId) throws Exception {
+    try (Scanner scanner = client.createScanner(MetadataTable.NAME)) {
+      scanner.setRange(new Range(tableId, tableId + "<"));
+      scanner.fetchColumnFamily(CurrentLocationColumnFamily.NAME);
+      return Iterables.size(scanner);
+    }
+  }
 }
diff --git a/test/src/main/java/org/apache/accumulo/test/ScanServerIT_NoServers.java b/test/src/main/java/org/apache/accumulo/test/ScanServerIT_NoServers.java
index 782cb56739..3e3bd7a183 100644
--- a/test/src/main/java/org/apache/accumulo/test/ScanServerIT_NoServers.java
+++ b/test/src/main/java/org/apache/accumulo/test/ScanServerIT_NoServers.java
@@ -21,9 +21,12 @@ package org.apache.accumulo.test;
 import static org.apache.accumulo.harness.AccumuloITBase.MINI_CLUSTER_ONLY;
 import static org.apache.accumulo.test.ScanServerIT.createTableAndIngest;
 import static org.apache.accumulo.test.ScanServerIT.ingest;
+import static org.apache.accumulo.test.ScanServerIT.setupTableWithHostingMix;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
@@ -68,6 +71,10 @@ public class ScanServerIT_NoServers extends SharedMiniClusterBase {
       // Configure the scan server to only have 1 scan executor thread. This means
       // that the scan server will run scans serially, not concurrently.
       cfg.setProperty(Property.SSERV_SCAN_EXECUTORS_DEFAULT_THREADS, "1");
+
+      cfg.setProperty(Property.MANAGER_TABLET_GROUP_WATCHER_INTERVAL, "5");
+      cfg.setProperty(Property.TSERV_ONDEMAND_UNLOADER_INTERVAL, "10");
+      cfg.setProperty("table.custom.ondemand.unloader.inactivity.threshold.seconds", "15");
     }
   }
 
@@ -181,4 +188,60 @@ public class ScanServerIT_NoServers extends SharedMiniClusterBase {
       });
     }
   }
+
+  @Test
+  public void testScanWithTabletHostingMix() throws Exception {
+    try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
+      String tableName = getUniqueNames(1)[0];
+
+      setupTableWithHostingMix(client, tableName);
+
+      try (Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY)) {
+        scanner.setRange(new Range());
+        scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+        // Throws an exception because no scan servers and falls back to tablet server with tablets
+        // with the NEVER hosting goal
+        assertThrows(RuntimeException.class, () -> Iterables.size(scanner));
+        // Throws an exception because of the tablets with the NEVER hosting goal
+        scanner.setConsistencyLevel(ConsistencyLevel.IMMEDIATE);
+        assertThrows(RuntimeException.class, () -> Iterables.size(scanner));
+
+        // Test that hosted ranges work
+        scanner.setRange(new Range(null, "row_0000000003"));
+        assertEquals(40, Iterables.size(scanner));
+
+        scanner.setRange(new Range("row_0000000008", null));
+        assertEquals(20, Iterables.size(scanner));
+
+      } // when the scanner is closed, all open sessions should be closed
+    }
+  }
+
+  @Test
+  public void testBatchScanWithTabletHostingMix() throws Exception {
+    try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
+      final String tableName = getUniqueNames(1)[0];
+
+      setupTableWithHostingMix(client, tableName);
+
+      try (BatchScanner scanner = client.createBatchScanner(tableName, Authorizations.EMPTY)) {
+        scanner.setRanges(Collections.singleton(new Range()));
+        scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+        // Throws an exception because no scan servers and falls back to tablet server with tablets
+        // with the NEVER hosting goal
+        assertThrows(RuntimeException.class, () -> Iterables.size(scanner));
+        // Throws an exception because of the tablets with the NEVER hosting goal
+        scanner.setConsistencyLevel(ConsistencyLevel.IMMEDIATE);
+        assertThrows(RuntimeException.class, () -> Iterables.size(scanner));
+
+        // Test that hosted ranges work
+        Collection<Range> ranges = new ArrayList<>();
+        ranges.add(new Range(null, "row_0000000003"));
+        ranges.add(new Range("row_0000000008", null));
+        scanner.setRanges(ranges);
+        assertEquals(60, Iterables.size(scanner));
+      } // when the scanner is closed, all open sessions should be closed
+    }
+  }
+
 }