You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by dl...@apache.org on 2023/05/15 14:06:53 UTC
[accumulo] branch elasticity updated: Added ScanServer tests for different tablet hosting goals (#3388)
This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push:
new 770d949bfd Added ScanServer tests for different tablet hosting goals (#3388)
770d949bfd is described below
commit 770d949bfdc592a641f406cf3e73460ff4d34474
Author: Dave Marion <dl...@apache.org>
AuthorDate: Mon May 15 10:06:47 2023 -0400
Added ScanServer tests for different tablet hosting goals (#3388)
Co-authored-by: DomGarguilo <do...@gmail.com>
---
.../org/apache/accumulo/test/ScanServerIT.java | 113 +++++++++++++++++++++
.../accumulo/test/ScanServerIT_NoServers.java | 63 ++++++++++++
2 files changed, 176 insertions(+)
diff --git a/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java b/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java
index 7576413129..b68f311871 100644
--- a/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java
@@ -24,10 +24,16 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
import java.util.Properties;
+import java.util.SortedSet;
+import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.Accumulo;
@@ -38,10 +44,13 @@ import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel;
import org.apache.accumulo.core.client.TimedOutException;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.client.admin.TabletHostingGoal;
import org.apache.accumulo.core.conf.ClientProperty;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
import org.apache.accumulo.harness.SharedMiniClusterBase;
@@ -49,6 +58,8 @@ import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.test.functional.ReadWriteIT;
import org.apache.accumulo.test.functional.SlowIterator;
+import org.apache.accumulo.test.util.Wait;
+import org.apache.hadoop.io.Text;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
@@ -74,6 +85,10 @@ public class ScanServerIT extends SharedMiniClusterBase {
// Configure the scan server to only have 1 scan executor thread. This means
// that the scan server will run scans serially, not concurrently.
cfg.setProperty(Property.SSERV_SCAN_EXECUTORS_DEFAULT_THREADS, "1");
+
+ cfg.setProperty(Property.MANAGER_TABLET_GROUP_WATCHER_INTERVAL, "5");
+ cfg.setProperty(Property.TSERV_ONDEMAND_UNLOADER_INTERVAL, "10");
+ cfg.setProperty("table.custom.ondemand.unloader.inactivity.threshold.seconds", "15");
}
}
@@ -205,6 +220,96 @@ public class ScanServerIT extends SharedMiniClusterBase {
}
}
+ @Test
+ public void testScanWithTabletHostingMix() throws Exception {
+ try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
+ String tableName = getUniqueNames(1)[0];
+
+ final int ingestedEntryCount = setupTableWithHostingMix(client, tableName);
+
+ try (Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY)) {
+ scanner.setRange(new Range());
+ scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+ assertEquals(ingestedEntryCount, Iterables.size(scanner),
+ "The scan server scanner should have seen all ingested and flushed entries");
+ // Throws an exception because of the tablets with the NEVER hosting goal
+ scanner.setConsistencyLevel(ConsistencyLevel.IMMEDIATE);
+ assertThrows(RuntimeException.class, () -> Iterables.size(scanner));
+
+ // Test that hosted ranges work
+ scanner.setRange(new Range(null, "row_0000000003"));
+ assertEquals(40, Iterables.size(scanner));
+
+ scanner.setRange(new Range("row_0000000008", null));
+ assertEquals(20, Iterables.size(scanner));
+ } // when the scanner is closed, all open sessions should be closed
+ }
+ }
+
+ @Test
+ public void testBatchScanWithTabletHostingMix() throws Exception {
+ try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
+ String tableName = getUniqueNames(1)[0];
+
+ final int ingestedEntryCount = setupTableWithHostingMix(client, tableName);
+
+ try (BatchScanner scanner = client.createBatchScanner(tableName, Authorizations.EMPTY)) {
+ scanner.setRanges(Collections.singleton(new Range()));
+ scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+ assertEquals(ingestedEntryCount, Iterables.size(scanner),
+ "The scan server scanner should have seen all ingested and flushed entries");
+ // Throws an exception because of the tablets with the NEVER hosting goal
+ scanner.setConsistencyLevel(ConsistencyLevel.IMMEDIATE);
+ assertThrows(RuntimeException.class, () -> Iterables.size(scanner));
+
+ // Test that hosted ranges work
+ Collection<Range> ranges = new ArrayList<>();
+ ranges.add(new Range(null, "row_0000000003"));
+ ranges.add(new Range("row_0000000008", null));
+ scanner.setRanges(ranges);
+ assertEquals(60, Iterables.size(scanner));
+ } // when the scanner is closed, all open sessions should be closed
+ }
+ }
+
+ /**
+ * Sets up a table with a mix of tablet hosting goals. Specific ranges of rows are set to ALWAYS,
+ * NEVER, and ONDEMAND hosting goals. The method waits for the NEVER and ONDEMAND tablets to be
+ * unloaded due to inactivity before returning.
+ *
+ * @param client The AccumuloClient to use for the operation
+ * @param tableName The name of the table to be created and set up
+ * @return The count of ingested entries
+ */
+ protected static int setupTableWithHostingMix(AccumuloClient client, String tableName)
+ throws Exception {
+ SortedSet<Text> splits =
+ IntStream.rangeClosed(1, 9).mapToObj(i -> new Text("row_000000000" + i))
+ .collect(Collectors.toCollection(TreeSet::new));
+
+ NewTableConfiguration ntc = new NewTableConfiguration();
+ ntc.withSplits(splits);
+ ntc.withInitialHostingGoal(TabletHostingGoal.ALWAYS); // speed up ingest
+ final int ingestedEntryCount = createTableAndIngest(client, tableName, ntc, 10, 10, "colf");
+
+ String tableId = client.tableOperations().tableIdMap().get(tableName);
+
+ // row 1 -> 3 are always
+ client.tableOperations().setTabletHostingGoal(tableName,
+ new Range(null, true, "row_0000000003", true), TabletHostingGoal.ALWAYS);
+ // row 4 -> 7 are never
+ client.tableOperations().setTabletHostingGoal(tableName,
+ new Range("row_0000000004", true, "row_0000000007", true), TabletHostingGoal.NEVER);
+ // row 8 and 9 are ondemand
+ client.tableOperations().setTabletHostingGoal(tableName,
+ new Range("row_0000000008", true, null, true), TabletHostingGoal.ONDEMAND);
+
+ // Wait for the NEVER and ONDEMAND tablets to be unloaded due to inactivity
+ Wait.waitFor(() -> ScanServerIT.getNumHostedTablets(client, tableId) == 3, 30_000, 1_000);
+
+ return ingestedEntryCount;
+ }
+
/**
* Create a table with the given name and the given client. Then, ingest into the table using
* {@link #ingest(AccumuloClient, String, int, int, int, String, boolean)}
@@ -253,4 +358,12 @@ public class ScanServerIT extends SharedMiniClusterBase {
return ingestedEntriesCount;
}
+
+ protected static int getNumHostedTablets(AccumuloClient client, String tableId) throws Exception {
+ try (Scanner scanner = client.createScanner(MetadataTable.NAME)) {
+ scanner.setRange(new Range(tableId, tableId + "<"));
+ scanner.fetchColumnFamily(CurrentLocationColumnFamily.NAME);
+ return Iterables.size(scanner);
+ }
+ }
}
diff --git a/test/src/main/java/org/apache/accumulo/test/ScanServerIT_NoServers.java b/test/src/main/java/org/apache/accumulo/test/ScanServerIT_NoServers.java
index 782cb56739..3e3bd7a183 100644
--- a/test/src/main/java/org/apache/accumulo/test/ScanServerIT_NoServers.java
+++ b/test/src/main/java/org/apache/accumulo/test/ScanServerIT_NoServers.java
@@ -21,9 +21,12 @@ package org.apache.accumulo.test;
import static org.apache.accumulo.harness.AccumuloITBase.MINI_CLUSTER_ONLY;
import static org.apache.accumulo.test.ScanServerIT.createTableAndIngest;
import static org.apache.accumulo.test.ScanServerIT.ingest;
+import static org.apache.accumulo.test.ScanServerIT.setupTableWithHostingMix;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
@@ -68,6 +71,10 @@ public class ScanServerIT_NoServers extends SharedMiniClusterBase {
// Configure the scan server to only have 1 scan executor thread. This means
// that the scan server will run scans serially, not concurrently.
cfg.setProperty(Property.SSERV_SCAN_EXECUTORS_DEFAULT_THREADS, "1");
+
+ cfg.setProperty(Property.MANAGER_TABLET_GROUP_WATCHER_INTERVAL, "5");
+ cfg.setProperty(Property.TSERV_ONDEMAND_UNLOADER_INTERVAL, "10");
+ cfg.setProperty("table.custom.ondemand.unloader.inactivity.threshold.seconds", "15");
}
}
@@ -181,4 +188,60 @@ public class ScanServerIT_NoServers extends SharedMiniClusterBase {
});
}
}
+
+ @Test
+ public void testScanWithTabletHostingMix() throws Exception {
+ try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
+ String tableName = getUniqueNames(1)[0];
+
+ setupTableWithHostingMix(client, tableName);
+
+ try (Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY)) {
+ scanner.setRange(new Range());
+ scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+ // Throws an exception because no scan servers and falls back to tablet server with tablets
+ // with the NEVER hosting goal
+ assertThrows(RuntimeException.class, () -> Iterables.size(scanner));
+ // Throws an exception because of the tablets with the NEVER hosting goal
+ scanner.setConsistencyLevel(ConsistencyLevel.IMMEDIATE);
+ assertThrows(RuntimeException.class, () -> Iterables.size(scanner));
+
+ // Test that hosted ranges work
+ scanner.setRange(new Range(null, "row_0000000003"));
+ assertEquals(40, Iterables.size(scanner));
+
+ scanner.setRange(new Range("row_0000000008", null));
+ assertEquals(20, Iterables.size(scanner));
+
+ } // when the scanner is closed, all open sessions should be closed
+ }
+ }
+
+ @Test
+ public void testBatchScanWithTabletHostingMix() throws Exception {
+ try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
+ final String tableName = getUniqueNames(1)[0];
+
+ setupTableWithHostingMix(client, tableName);
+
+ try (BatchScanner scanner = client.createBatchScanner(tableName, Authorizations.EMPTY)) {
+ scanner.setRanges(Collections.singleton(new Range()));
+ scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+ // Throws an exception because no scan servers and falls back to tablet server with tablets
+ // with the NEVER hosting goal
+ assertThrows(RuntimeException.class, () -> Iterables.size(scanner));
+ // Throws an exception because of the tablets with the NEVER hosting goal
+ scanner.setConsistencyLevel(ConsistencyLevel.IMMEDIATE);
+ assertThrows(RuntimeException.class, () -> Iterables.size(scanner));
+
+ // Test that hosted ranges work
+ Collection<Range> ranges = new ArrayList<>();
+ ranges.add(new Range(null, "row_0000000003"));
+ ranges.add(new Range("row_0000000008", null));
+ scanner.setRanges(ranges);
+ assertEquals(60, Iterables.size(scanner));
+ } // when the scanner is closed, all open sessions should be closed
+ }
+ }
+
}