You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by dl...@apache.org on 2023/07/03 12:25:34 UTC
[accumulo] 01/01: Merge branch 'main' into elasticity
This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit a4a3e53baa55bb6a68792ec8eeddb4bd3d7565a0
Merge: 8fddfb6b63 4b1c1b17eb
Author: Dave Marion <dl...@apache.org>
AuthorDate: Mon Jul 3 12:25:00 2023 +0000
Merge branch 'main' into elasticity
.github/ISSUE_TEMPLATE/post_vote_checklist.md | 3 +-
.github/workflows/maven-full-its.yaml | 2 +-
.github/workflows/maven.yaml | 6 +-
.github/workflows/scripts.yaml | 12 +-
README.md | 2 +-
contrib/accumulo-logo.png | Bin 11427 -> 0 bytes
contrib/javadoc11.patch | 91 ----------
contrib/scripts/check-tservers.py | 202 ---------------------
core/pom.xml | 2 +-
.../core/client/admin/NewTableConfiguration.java | 6 +
.../core/clientImpl/TableOperationsImpl.java | 7 +-
.../accumulo/core/clientImpl/ThriftScanner.java | 5 +-
.../core/clientImpl/ThriftTransportPool.java | 7 +-
.../org/apache/accumulo/core/conf/Property.java | 8 +-
.../org/apache/accumulo/core/fate/ZooStore.java | 5 +-
.../accumulo/core/fate/zookeeper/ZooCache.java | 5 +-
.../accumulo/core/fate/zookeeper/ZooSession.java | 6 +-
.../accumulo/core/file/BloomFilterLayer.java | 7 +-
.../core/metadata/ScanServerRefTabletFile.java | 5 +-
.../accumulo/core/metadata/StoredTabletFile.java | 10 +-
.../accumulo/core/metadata/ValidationUtil.java | 23 ++-
.../core/metadata/schema/MetadataSchema.java | 6 +-
.../org/apache/accumulo/core/rpc/ThriftUtil.java | 6 +-
.../spi/balancer/HostRegexTableLoadBalancer.java | 5 +-
.../accumulo/core/spi/crypto/AESCryptoService.java | 10 +-
.../accumulo/core/spi/fs/RandomVolumeChooser.java | 6 +-
.../core/spi/fs/SpaceAwareVolumeChooser.java | 6 +-
.../spi/scan/ConfigurableScanServerSelector.java | 7 +-
.../apache/accumulo/core/util/LazySingletons.java | 6 +
.../java/org/apache/accumulo/core/util/Retry.java | 5 +-
.../accumulo/core/cli/PasswordConverterTest.java | 6 +-
.../core/client/rfile/RFileClientTest.java | 6 +-
.../bulk/ConcurrentKeyExtentCacheTest.java | 12 +-
.../apache/accumulo/core/crypto/CryptoTest.java | 14 +-
.../core/file/BloomFilterLayerLookupTest.java | 7 +-
.../file/blockfile/cache/TestLruBlockCache.java | 7 +-
.../core/file/rfile/MultiLevelIndexTest.java | 5 +-
.../core/file/rfile/MultiThreadedRFileTest.java | 5 +-
.../apache/accumulo/core/file/rfile/RFileTest.java | 28 ++-
.../accumulo/core/file/rfile/RollingStatsTest.java | 17 +-
.../file/streams/RateLimitedInputStreamTest.java | 8 +-
.../file/streams/RateLimitedOutputStreamTest.java | 6 +-
.../iterators/user/IndexedDocIteratorTest.java | 7 +-
.../iterators/user/IntersectingIteratorTest.java | 7 +-
.../core/iterators/user/TestCfCqSlice.java | 5 +-
.../accumulo/core/metadata/ValidationUtilTest.java | 50 +++++
.../core/security/AuthenticationTokenTest.java | 6 +-
.../core/spi/balancer/GroupBalancerTest.java | 14 +-
.../scan/ConfigurableScanServerSelectorTest.java | 7 +-
.../hadoopImpl/mapred/AccumuloRecordReader.java | 7 +-
.../hadoopImpl/mapreduce/AccumuloRecordReader.java | 7 +-
.../testcases/IsolatedDeepCopiesTestCase.java | 9 +-
.../testcases/MultipleHasTopCalls.java | 9 +-
.../iteratortest/testcases/ReSeekTestCase.java | 6 +-
pom.xml | 175 +++++++-----------
.../server/constraints/MetadataConstraints.java | 25 ++-
.../accumulo/server/fs/VolumeManagerImpl.java | 17 +-
.../server/tablets/UniqueNameAllocator.java | 6 +-
.../org/apache/accumulo/server/util/FileUtil.java | 7 +-
.../apache/accumulo/server/util/RandomWriter.java | 8 +-
.../server/zookeeper/DistributedWorkQueue.java | 7 +-
.../conf/codec/VersionedPropEncryptCodec.java | 7 +-
.../constraints/MetadataConstraintsTest.java | 122 ++++++++++---
.../org/apache/accumulo/compactor/Compactor.java | 6 +-
.../main/java/org/apache/accumulo/gc/GCRun.java | 8 +-
.../java/org/apache/accumulo/manager/Manager.java | 10 +-
.../accumulo/manager/TabletGroupWatcher.java | 11 +-
server/monitor/pom.xml | 2 +-
server/tserver/pom.xml | 2 +-
.../org/apache/accumulo/tserver/TabletServer.java | 5 +-
.../compactions/InternalCompactionExecutor.java | 11 +-
.../metrics/CompactionExecutorsMetrics.java | 34 ++--
.../accumulo/tserver/session/SessionManager.java | 8 +-
.../accumulo/tserver/tablet/MinorCompactor.java | 5 +-
.../shell/commands/CreateTableCommand.java | 26 ++-
.../accumulo/shell/commands/HiddenCommand.java | 5 +-
{contrib => src/build}/ci/find-unapproved-chars.sh | 0
{contrib => src/build}/ci/find-unapproved-junit.sh | 0
{contrib => src/build}/ci/install-shfmt.sh | 0
{contrib => src/build}/ci/install-thrift.sh | 2 +-
{contrib => src/build}/ci/it-matrix.sh | 0
{contrib => src/build}/ci/run-shellcheck.sh | 0
{contrib => src/build}/ci/run-shfmt.sh | 0
{contrib => src/build}/ci/run-thrift.sh | 0
.../build/eclipse-codestyle.xml | 0
{contrib => src/build}/license-header.txt | 0
src/site/site.xml | 44 -----
.../apache/accumulo/harness/AccumuloITBase.java | 4 +-
.../accumulo/harness/SharedMiniClusterBase.java | 3 +-
.../apache/accumulo/test/ChaoticLoadBalancer.java | 8 +-
.../apache/accumulo/test/ConditionalWriterIT.java | 15 +-
.../org/apache/accumulo/test/ImportExportIT.java | 5 +-
.../apache/accumulo/test/MetaGetsReadersIT.java | 3 +-
.../apache/accumulo/test/MultiTableRecoveryIT.java | 3 +-
.../accumulo/test/TableConfigurationUpdateIT.java | 3 +-
.../org/apache/accumulo/test/TestBinaryRows.java | 4 +-
.../org/apache/accumulo/test/TotalQueuedIT.java | 5 +-
.../accumulo/test/VerifySerialRecoveryIT.java | 3 +-
.../test/compaction/CompactionExecutorIT.java | 6 +-
.../test/compaction/CompactionRateLimitingIT.java | 7 +-
.../accumulo/test/conf/PropStoreConfigIT.java | 5 +-
.../accumulo/test/functional/BatchScanSplitIT.java | 4 +-
.../test/functional/BatchWriterFlushIT.java | 3 +-
.../accumulo/test/functional/BloomFilterIT.java | 4 +-
.../accumulo/test/functional/CacheTestWriter.java | 14 +-
.../accumulo/test/functional/CompactionIT.java | 5 +-
.../test/functional/ConcurrentDeleteTableIT.java | 3 +-
.../test/functional/CreateInitialSplitsIT.java | 3 +-
.../accumulo/test/functional/FateStarvationIT.java | 6 +-
.../test/functional/ManyWriteAheadLogsIT.java | 5 +-
.../test/functional/MemoryConsumingIterator.java | 9 +-
.../accumulo/test/functional/MonitorSslIT.java | 3 +-
.../accumulo/test/functional/NativeMapIT.java | 10 +-
.../accumulo/test/functional/ReadWriteIT.java | 3 +-
.../apache/accumulo/test/functional/ScanIdIT.java | 3 +-
.../apache/accumulo/test/functional/SplitIT.java | 5 +-
.../apache/accumulo/test/functional/SummaryIT.java | 5 +-
.../accumulo/test/functional/WALSunnyDayIT.java | 7 +-
.../accumulo/test/functional/ZombieTServer.java | 4 +-
.../accumulo/test/manager/SuspendedTabletsIT.java | 5 +-
.../test/performance/scan/CollectTabletStats.java | 4 +-
.../accumulo/test/shell/ShellCreateTableIT.java | 3 +-
.../apache/accumulo/test/shell/ShellServerIT.java | 3 +-
123 files changed, 622 insertions(+), 842 deletions(-)
diff --cc core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
index c837d23a77,9ecc79e0aa..4eea077cbe
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
@@@ -25,15 -25,12 +25,15 @@@ import static java.util.Objects.require
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
- import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
+import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.HOSTING_GOAL;
+import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.HOSTING_REQUESTED;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
+ import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
import static org.apache.accumulo.core.util.Validators.EXISTING_TABLE_NAME;
import static org.apache.accumulo.core.util.Validators.NEW_TABLE_NAME;
+import static org.apache.accumulo.core.util.Validators.NOT_BUILTIN_TABLE;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
@@@ -1185,23 -1225,20 +1182,23 @@@ public class TableOperationsImpl extend
// group key extents to get <= maxSplits
LinkedList<KeyExtent> unmergedExtents = new LinkedList<>();
- List<KeyExtent> mergedExtents = new ArrayList<>();
- for (Map<KeyExtent,List<Range>> map : binnedRanges.values()) {
- unmergedExtents.addAll(map.keySet());
+ try {
+ while (!tl.findTablets(context, Collections.singletonList(range),
+ (cachedTablet, range1) -> unmergedExtents.add(cachedTablet.getExtent()),
+ LocationNeed.NOT_REQUIRED).isEmpty()) {
+ context.requireNotDeleted(tableId);
+ context.requireNotOffline(tableId, tableName);
+
+ log.warn("Unable to locate bins for specified range. Retrying.");
+ // sleep randomly between 100 and 200ms
- sleepUninterruptibly(100 + random.nextInt(100), MILLISECONDS);
++ sleepUninterruptibly(100 + RANDOM.get().nextInt(100), MILLISECONDS);
+ unmergedExtents.clear();
+ tl.invalidateCache();
+ }
+ } catch (InvalidTabletHostingRequestException e) {
+ throw new AccumuloException("findTablets requested tablet hosting when it should not have",
+ e);
}
// the sort method is efficient for linked list
diff --cc core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
index b9ecdfe0e7,87726fd639..f7b2aeb9f2
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
@@@ -18,12 -18,10 +18,12 @@@
*/
package org.apache.accumulo.core.clientImpl;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
+ import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
import java.io.IOException;
- import java.security.SecureRandom;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
@@@ -362,197 -276,9 +361,197 @@@ public class ThriftScanner
Thread.sleep(millis);
}
// wait 2 * last time, with +-10% random jitter
- return (long) (Math.min(millis * 2, maxSleep) * (.9 + random.nextDouble() / 5));
+ return (long) (Math.min(millis * 2, maxSleep) * (.9 + RANDOM.get().nextDouble() / 5));
}
+ private static Optional<ScanAddress> getScanServerAddress(ClientContext context,
+ ScanState scanState, CachedTablet loc, long timeOut, long startTime) {
+ Preconditions.checkArgument(scanState.runOnScanServer);
+
+ ScanAddress addr = null;
+
+ if (scanState.scanID != null && scanState.prevLoc != null
+ && scanState.prevLoc.serverType == ServerType.SSERVER
+ && scanState.prevLoc.getExtent().equals(loc.getExtent())) {
+ // this is the case of continuing a scan on a scan server for the same tablet, so lets not
+ // call the scan server selector and just go back to the previous scan server
+ addr = scanState.prevLoc;
+ log.trace(
+ "For tablet {} continuing scan on scan server {} without consulting scan server selector, using busyTimeout {}",
+ loc.getExtent(), addr.serverAddress, scanState.busyTimeout);
+ } else {
+ var tabletId = new TabletIdImpl(loc.getExtent());
+ // obtain a snapshot once and only expose this snapshot to the plugin for consistency
+ var attempts = scanState.scanAttempts.snapshot();
+
+ Duration timeoutLeft = Duration.ofSeconds(timeOut)
+ .minus(Duration.ofMillis(System.currentTimeMillis() - startTime));
+
+ var params = new ScanServerSelector.SelectorParameters() {
+
+ @Override
+ public List<TabletId> getTablets() {
+ return List.of(tabletId);
+ }
+
+ @Override
+ public Collection<? extends ScanServerAttempt> getAttempts(TabletId tabletId) {
+ return attempts.getOrDefault(tabletId, Set.of());
+ }
+
+ @Override
+ public Map<String,String> getHints() {
+ if (scanState.executionHints == null) {
+ return Map.of();
+ }
+ return scanState.executionHints;
+ }
+
+ @Override
+ public <T> Optional<T> waitUntil(Supplier<Optional<T>> condition, Duration maxWaitTime,
+ String description) {
+ return ThriftScanner.waitUntil(condition, maxWaitTime, description, timeoutLeft, context,
+ loc.getExtent().tableId(), log);
+ }
+ };
+
+ ScanServerSelections actions = context.getScanServerSelector().selectServers(params);
+
+ Duration delay = null;
+
+ String scanServer = actions.getScanServer(tabletId);
+ if (scanServer != null) {
+ addr = new ScanAddress(scanServer, ServerType.SSERVER, loc);
+ delay = actions.getDelay();
+ scanState.busyTimeout = actions.getBusyTimeout();
+ log.trace("For tablet {} scan server selector chose scan_server:{} delay:{} busyTimeout:{}",
+ loc.getExtent(), scanServer, delay, scanState.busyTimeout);
+ } else {
+ Optional<String> tserverLoc = loc.getTserverLocation();
+
+ if (tserverLoc.isPresent()) {
+ addr = new ScanAddress(loc.getTserverLocation().orElseThrow(), ServerType.TSERVER, loc);
+ delay = actions.getDelay();
+ scanState.busyTimeout = Duration.ZERO;
+ log.trace("For tablet {} scan server selector chose tablet_server: {}", loc.getExtent(),
+ addr);
+ } else {
+ log.trace(
+ "For tablet {} scan server selector chose tablet_server, but the tablet is not currently hosted",
+ loc.getExtent());
+ return Optional.empty();
+ }
+ }
+
+ if (!delay.isZero()) {
+ try {
+ Thread.sleep(delay.toMillis());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ return Optional.of(addr);
+ }
+
+ static ScanAddress getNextScanAddress(ClientContext context, ScanState scanState, long timeOut,
+ long startTime, long maxSleepTime)
+ throws TableNotFoundException, AccumuloSecurityException, AccumuloServerException,
+ InterruptedException, ScanTimedOutException, InvalidTabletHostingRequestException {
+
+ String lastError = null;
+ String error = null;
+ long sleepMillis = 100;
+
+ ScanAddress addr = null;
+
+ var hostingNeed = scanState.runOnScanServer ? ClientTabletCache.LocationNeed.NOT_REQUIRED
+ : ClientTabletCache.LocationNeed.REQUIRED;
+
+ while (addr == null) {
+ long currentTime = System.currentTimeMillis();
+ if ((currentTime - startTime) / 1000.0 > timeOut) {
+ throw new ScanTimedOutException("Failed to locate next server to scan before timeout");
+ }
+
+ CachedTablet loc = null;
+
+ Span child1 = TraceUtil.startSpan(ThriftScanner.class, "scan::locateTablet");
+ try (Scope locateSpan = child1.makeCurrent()) {
+
+ loc = ClientTabletCache.getInstance(context, scanState.tableId).findTablet(context,
+ scanState.startRow, scanState.skipStartRow, hostingNeed);
+
+ if (loc == null) {
+ context.requireNotDeleted(scanState.tableId);
+ context.requireNotOffline(scanState.tableId, null);
+
+ error = "Failed to locate tablet for table : " + scanState.tableId + " row : "
+ + scanState.startRow;
+ if (!error.equals(lastError)) {
+ log.debug("{}", error);
+ } else if (log.isTraceEnabled()) {
+ log.trace("{}", error);
+ }
+ lastError = error;
+ sleepMillis = pause(sleepMillis, maxSleepTime, scanState.runOnScanServer);
+ } else {
+ // when a tablet splits we do want to continue scanning the low child
+ // of the split if we are already passed it
+ Range dataRange = loc.getExtent().toDataRange();
+
+ if (scanState.range.getStartKey() != null
+ && dataRange.afterEndKey(scanState.range.getStartKey())) {
+ // go to the next tablet
+ scanState.startRow = loc.getExtent().endRow();
+ scanState.skipStartRow = true;
+ // force another lookup
+ loc = null;
+ } else if (scanState.range.getEndKey() != null
+ && dataRange.beforeStartKey(scanState.range.getEndKey())) {
+ // should not happen
+ throw new IllegalStateException("Unexpected tablet, extent : " + loc.getExtent()
+ + " range : " + scanState.range + " startRow : " + scanState.startRow);
+ }
+ }
+ } catch (AccumuloServerException e) {
+ TraceUtil.setException(child1, e, true);
+ log.debug("Scan failed, server side exception : {}", e.getMessage());
+ throw e;
+ } catch (AccumuloException e) {
+ error = "exception from tablet loc " + e.getMessage();
+ if (!error.equals(lastError)) {
+ log.debug("{}", error);
+ } else if (log.isTraceEnabled()) {
+ log.trace("{}", error);
+ }
+
+ TraceUtil.setException(child1, e, false);
+
+ lastError = error;
+ sleepMillis = pause(sleepMillis, maxSleepTime, scanState.runOnScanServer);
+ } finally {
+ child1.end();
+ }
+
+ if (loc != null) {
+ if (scanState.runOnScanServer) {
+ addr = getScanServerAddress(context, scanState, loc, timeOut, startTime).orElse(null);
+ if (addr == null && loc.getTserverLocation().isEmpty()) {
+ // wanted to fall back to tserver but tablet was not hosted so make another loop
+ hostingNeed = ClientTabletCache.LocationNeed.REQUIRED;
+ }
+ } else {
+ addr = new ScanAddress(loc.getTserverLocation().orElseThrow(), ServerType.TSERVER, loc);
+ }
+ }
+ }
+
+ return addr;
+ }
+
public static List<KeyValue> scan(ClientContext context, ScanState scanState, long timeOut)
throws ScanTimedOutException, AccumuloException, AccumuloSecurityException,
TableNotFoundException {
diff --cc core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java
index 96a7eaa6e1,59bfd40b94..5680ccddf7
--- a/core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java
@@@ -18,8 -18,8 +18,9 @@@
*/
package org.apache.accumulo.core.spi.scan;
+ import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
diff --cc hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AccumuloRecordReader.java
index 1ea193ffe7,09bb4539a6..53a65676e2
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AccumuloRecordReader.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AccumuloRecordReader.java
@@@ -19,9 -19,7 +19,10 @@@
package org.apache.accumulo.hadoopImpl.mapred;
import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.SECONDS;
+ import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
import java.io.IOException;
import java.net.InetAddress;
@@@ -355,51 -346,18 +354,51 @@@ public abstract class AccumuloRecordRea
// tablets... so clear it
tl.invalidateCache();
- while (!tl.binRanges(context, ranges, binnedRanges).isEmpty()) {
- context.requireNotDeleted(tableId);
- context.requireNotOffline(tableId, tableName);
- binnedRanges.clear();
- log.warn("Unable to locate bins for specified ranges. Retrying.");
- // sleep randomly between 100 and 200 ms
- sleepUninterruptibly(100 + RANDOM.get().nextInt(100), TimeUnit.MILLISECONDS);
- tl.invalidateCache();
+ if (InputConfigurator.getConsistencyLevel(callingClass, job)
+ == ConsistencyLevel.IMMEDIATE) {
+ while (!tl.binRanges(context, ranges, binnedRanges).isEmpty()) {
+ context.requireNotDeleted(tableId);
+ context.requireNotOffline(tableId, tableName);
+ binnedRanges.clear();
+ log.warn("Unable to locate bins for specified ranges. Retrying.");
+ // sleep randomly between 100 and 200 ms
- sleepUninterruptibly(100 + random.nextInt(100), TimeUnit.MILLISECONDS);
++ sleepUninterruptibly(100 + RANDOM.get().nextInt(100), TimeUnit.MILLISECONDS);
+ tl.invalidateCache();
+ }
+ } else {
+ Map<String,Map<KeyExtent,List<Range>>> unhostedRanges = new HashMap<>();
+ unhostedRanges.put("", new HashMap<>());
+ BiConsumer<CachedTablet,Range> consumer = (ct, r) -> {
+ unhostedRanges.get("").computeIfAbsent(ct.getExtent(), k -> new ArrayList<>())
+ .add(r);
+ };
+ List<Range> failures =
+ tl.findTablets(context, ranges, consumer, LocationNeed.NOT_REQUIRED);
+
+ Retry retry = Retry.builder().infiniteRetries().retryAfter(100, MILLISECONDS)
+ .incrementBy(100, MILLISECONDS).maxWait(2, SECONDS).backOffFactor(1.5)
+ .logInterval(3, MINUTES).createRetry();
+
+ while (!failures.isEmpty()) {
+
+ context.requireNotDeleted(tableId);
+
+ try {
+ retry.waitForNextAttempt(log,
+ String.format("locating tablets in table %s(%s) for %d ranges", tableName,
+ tableId, ranges.size()));
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ unhostedRanges.get("").clear();
+ tl.invalidateCache();
+ failures = tl.findTablets(context, ranges, consumer, LocationNeed.NOT_REQUIRED);
+ }
+ binnedRanges = unhostedRanges;
}
}
- } catch (TableOfflineException | TableNotFoundException | AccumuloException
- | AccumuloSecurityException e) {
+ } catch (InvalidTabletHostingRequestException | TableOfflineException
+ | TableNotFoundException | AccumuloException | AccumuloSecurityException e) {
throw new IOException(e);
}
diff --cc hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AccumuloRecordReader.java
index b4a90aa151,3b60ca798a..b088d8530f
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AccumuloRecordReader.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AccumuloRecordReader.java
@@@ -19,9 -19,7 +19,10 @@@
package org.apache.accumulo.hadoopImpl.mapreduce;
import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.SECONDS;
+ import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
import java.io.IOException;
import java.net.InetAddress;
@@@ -388,52 -379,18 +387,52 @@@ public abstract class AccumuloRecordRea
// tables tablets... so clear it
tl.invalidateCache();
- while (!tl.binRanges(clientContext, ranges, binnedRanges).isEmpty()) {
- clientContext.requireNotDeleted(tableId);
- clientContext.requireNotOffline(tableId, tableName);
- binnedRanges.clear();
- log.warn("Unable to locate bins for specified ranges. Retrying.");
- // sleep randomly between 100 and 200 ms
- sleepUninterruptibly(100 + RANDOM.get().nextInt(100), TimeUnit.MILLISECONDS);
- tl.invalidateCache();
+ if (InputConfigurator.getConsistencyLevel(callingClass, context.getConfiguration())
+ == ConsistencyLevel.IMMEDIATE) {
+ while (!tl.binRanges(clientContext, ranges, binnedRanges).isEmpty()) {
+ clientContext.requireNotDeleted(tableId);
+ clientContext.requireNotOffline(tableId, tableName);
+ binnedRanges.clear();
+ log.warn("Unable to locate bins for specified ranges. Retrying.");
+ // sleep randomly between 100 and 200 ms
- sleepUninterruptibly(100 + random.nextInt(100), TimeUnit.MILLISECONDS);
++ sleepUninterruptibly(100 + RANDOM.get().nextInt(100), TimeUnit.MILLISECONDS);
+ tl.invalidateCache();
+ }
+ } else {
+ Map<String,Map<KeyExtent,List<Range>>> unhostedRanges = new HashMap<>();
+ unhostedRanges.put("", new HashMap<>());
+ BiConsumer<CachedTablet,Range> consumer = (ct, r) -> {
+ unhostedRanges.get("").computeIfAbsent(ct.getExtent(), k -> new ArrayList<>())
+ .add(r);
+ };
+ List<Range> failures =
+ tl.findTablets(clientContext, ranges, consumer, LocationNeed.NOT_REQUIRED);
+
+ Retry retry = Retry.builder().infiniteRetries().retryAfter(100, MILLISECONDS)
+ .incrementBy(100, MILLISECONDS).maxWait(2, SECONDS).backOffFactor(1.5)
+ .logInterval(3, MINUTES).createRetry();
+
+ while (!failures.isEmpty()) {
+
+ clientContext.requireNotDeleted(tableId);
+
+ try {
+ retry.waitForNextAttempt(log,
+ String.format("locating tablets in table %s(%s) for %d ranges", tableName,
+ tableId, ranges.size()));
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ unhostedRanges.get("").clear();
+ tl.invalidateCache();
+ failures =
+ tl.findTablets(clientContext, ranges, consumer, LocationNeed.NOT_REQUIRED);
+ }
+ binnedRanges = unhostedRanges;
}
}
- } catch (TableOfflineException | TableNotFoundException | AccumuloException
- | AccumuloSecurityException e) {
+ } catch (InvalidTabletHostingRequestException | TableOfflineException
+ | TableNotFoundException | AccumuloException | AccumuloSecurityException e) {
throw new IOException(e);
}
diff --cc server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java
index 91d15e0b98,921639bda8..e4dd238c58
--- a/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java
@@@ -131,6 -129,19 +132,19 @@@ public class MetadataConstraints implem
return lst;
}
+ /*
+ * Validates the data file metadata is valid for a StoredDataFile.
+ */
+ private static ArrayList<Short> validateDataFilePath(ArrayList<Short> violations,
+ String metadata) {
+ try {
+ StoredTabletFile.validate(metadata);
+ } catch (RuntimeException e) {
- violations = addViolation(violations, 9);
++ violations = addViolation(violations, 12);
+ }
+ return violations;
+ }
+
@Override
public List<Short> check(Environment env, Mutation mutation) {
final ServerContext context = ((SystemEnvironment) env).getServerContext();
@@@ -217,27 -229,13 +234,31 @@@
violations = addViolation(violations, 1);
}
} else if (columnFamily.equals(ScanFileColumnFamily.NAME)) {
-
+ violations =
+ validateDataFilePath(violations, new String(columnUpdate.getColumnQualifier(), UTF_8));
+ } else if (HostingColumnFamily.GOAL_COLUMN.equals(columnFamily, columnQualifier)) {
+ try {
+ TabletHostingGoalUtil.fromValue(new Value(columnUpdate.getValue()));
+ } catch (IllegalArgumentException e) {
+ violations = addViolation(violations, 10);
+ }
+ } else if (ServerColumnFamily.OPID_COLUMN.equals(columnFamily, columnQualifier)) {
+ try {
+ TabletOperationId.validate(new String(columnUpdate.getValue(), UTF_8));
+ } catch (IllegalArgumentException e) {
+ violations = addViolation(violations, 9);
+ }
+ } else if (ServerColumnFamily.SELECTED_COLUMN.equals(columnFamily, columnQualifier)) {
+ try {
+ SelectedFiles.from(new String(columnUpdate.getValue(), UTF_8));
+ } catch (RuntimeException e) {
+ violations = addViolation(violations, 11);
+ }
} else if (columnFamily.equals(BulkFileColumnFamily.NAME)) {
if (!columnUpdate.isDeleted() && !checkedBulk) {
+ violations = validateDataFilePath(violations,
+ new String(columnUpdate.getColumnQualifier(), UTF_8));
+
// splits, which also write the time reference, are allowed to write this reference even
// when
// the transaction is not running because the other half of the tablet is holding a
@@@ -363,11 -366,7 +384,13 @@@
case 8:
return "Bulk load transaction no longer running";
case 9:
+ return "Malformed operation id";
+ case 10:
+ return "Malformed hosting goal";
+ case 11:
+ return "Malformed file selection value";
++ case 12:
+ return "Invalid data file metadata format";
}
return null;
}
diff --cc server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java
index edc3973642,31b7dd746f..2ca3afbc64
--- a/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java
@@@ -59,15 -60,13 +60,13 @@@ import org.slf4j.LoggerFactory
public class FileUtil {
- private static final SecureRandom random = new SecureRandom();
-
public static class FileInfo {
- Key firstKey = new Key();
- Key lastKey = new Key();
+ private final Text firstKey;
+ private final Text lastKey;
public FileInfo(Key firstKey, Key lastKey) {
- this.firstKey = firstKey;
- this.lastKey = lastKey;
+ this.firstKey = firstKey.getRow();
+ this.lastKey = lastKey.getRow();
}
public Text getFirstRow() {
diff --cc server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java
index d41d8aff9b,be66aa1d66..e51d282178
--- a/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java
@@@ -33,10 -31,11 +33,11 @@@ import org.apache.accumulo.core.metadat
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
+import org.apache.accumulo.core.metadata.schema.SelectedFiles;
import org.apache.accumulo.server.ServerContext;
-import org.apache.accumulo.server.zookeeper.TransactionWatcher.Arbitrator;
import org.apache.hadoop.io.Text;
import org.easymock.EasyMock;
import org.junit.jupiter.api.Test;
@@@ -132,10 -152,22 +133,11 @@@ public class MetadataConstraintsTest
Mutation m;
List<Short> violations;
- // inactive txid
- m = new Mutation(new Text("0;foo"));
- m.put(BulkFileColumnFamily.NAME, new Text("hdfs://nn1/a/accumulo/tables/2b/t-001/someFile"),
- new Value("12345"));
- m.put(DataFileColumnFamily.NAME, new Text("hdfs://nn1/a/accumulo/tables/2b/t-001/someFile"),
- new DataFileValue(1, 1).encodeAsValue());
- violations = mc.check(createEnv(), m);
- assertNotNull(violations);
- assertEquals(1, violations.size());
- assertEquals(Short.valueOf((short) 8), violations.get(0));
-
// txid that throws exception
m = new Mutation(new Text("0;foo"));
- m.put(BulkFileColumnFamily.NAME, new Text("/someFile"), new Value("bad value"));
- m.put(DataFileColumnFamily.NAME, new Text("/someFile"),
+ m.put(BulkFileColumnFamily.NAME, new Text("hdfs://nn1/a/accumulo/tables/2b/t-001/someFile"),
- new Value("9"));
++ new Value("bad value"));
+ m.put(DataFileColumnFamily.NAME, new Text("hdfs://nn1/a/accumulo/tables/2b/t-001/someFile"),
new DataFileValue(1, 1).encodeAsValue());
violations = mc.check(createEnv(), m);
assertNotNull(violations);
@@@ -223,51 -267,76 +237,117 @@@
// deleting a load flag
m = new Mutation(new Text("0;foo"));
- m.putDelete(BulkFileColumnFamily.NAME, new Text("/someFile"));
+ m.putDelete(BulkFileColumnFamily.NAME,
+ new Text("hdfs://nn1/a/accumulo/tables/2b/t-001/someFile"));
+ violations = mc.check(createEnv(), m);
+ assertNull(violations);
+
+ // Missing beginning of path
+ m = new Mutation(new Text("0;foo"));
+ m.put(BulkFileColumnFamily.NAME, new Text("/someFile"), new Value("5"));
+ violations = mc.check(createEnv(), m);
+ assertNotNull(violations);
+ assertEquals(2, violations.size());
- assertEquals(Short.valueOf((short) 9), violations.get(0));
++ assertEquals(Short.valueOf((short) 12), violations.get(0));
+ assertNotNull(mc.getViolationDescription(violations.get(0)));
+ // No DataFileColumnFamily included
+ assertEquals(Short.valueOf((short) 8), violations.get(1));
+
+ // Missing tables directory in path
+ m = new Mutation(new Text("0;foo"));
+ m.put(BulkFileColumnFamily.NAME, new Text("hdfs://nn1/a/accumulo/2b/t-001/C00.rf"),
+ new Value("5"));
+ violations = mc.check(createEnv(), m);
+ assertNotNull(violations);
+ assertEquals(2, violations.size());
- assertEquals(Short.valueOf((short) 9), violations.get(0));
++ assertEquals(Short.valueOf((short) 12), violations.get(0));
+ // No DataFileColumnFamily included
+ assertEquals(Short.valueOf((short) 8), violations.get(1));
+
+ }
+
+ @Test
+ public void testDataFileCheck() {
+ testFileMetadataValidation(DataFileColumnFamily.NAME, new DataFileValue(1, 1).encodeAsValue());
+ }
+
+ @Test
+ public void testScanFileCheck() {
+ testFileMetadataValidation(ScanFileColumnFamily.NAME, new Value());
+ }
+
+ private void testFileMetadataValidation(Text columnFamily, Value value) {
- MetadataConstraints mc = new TestMetadataConstraints();
++ MetadataConstraints mc = new MetadataConstraints();
+ Mutation m;
+ List<Short> violations;
+
+ // Missing beginning of path
+ m = new Mutation(new Text("0;foo"));
+ m.put(columnFamily, new Text("/someFile"), value);
+ violations = mc.check(createEnv(), m);
+ assertNotNull(violations);
+ assertEquals(1, violations.size());
- assertEquals(Short.valueOf((short) 9), violations.get(0));
++ assertEquals(Short.valueOf((short) 12), violations.get(0));
+ assertNotNull(mc.getViolationDescription(violations.get(0)));
+
+ // Missing tables directory in path
+ m = new Mutation(new Text("0;foo"));
+ m.put(columnFamily, new Text("hdfs://nn1/a/accumulo/2b/t-001/C00.rf"),
+ new DataFileValue(1, 1).encodeAsValue());
+ violations = mc.check(createEnv(), m);
+ assertNotNull(violations);
+ assertEquals(1, violations.size());
- assertEquals(Short.valueOf((short) 9), violations.get(0));
++ assertEquals(Short.valueOf((short) 12), violations.get(0));
+
+ // Should pass validation
+ m = new Mutation(new Text("0;foo"));
+ m.put(columnFamily, new Text("hdfs://nn1/a/accumulo/tables/2b/t-001/C00.rf"),
+ new DataFileValue(1, 1).encodeAsValue());
violations = mc.check(createEnv(), m);
assertNull(violations);
- assertNotNull(mc.getViolationDescription((short) 9));
++ assertNotNull(mc.getViolationDescription((short) 12));
+ }
+
+ @Test
+ public void testOperationId() {
+ MetadataConstraints mc = new MetadataConstraints();
+ Mutation m;
+ List<Short> violations;
+
+ m = new Mutation(new Text("0;foo"));
+ ServerColumnFamily.OPID_COLUMN.put(m, new Value("bad id"));
+ violations = mc.check(createEnv(), m);
+ assertNotNull(violations);
+ assertEquals(1, violations.size());
+ assertEquals(Short.valueOf((short) 9), violations.get(0));
+
+ m = new Mutation(new Text("0;foo"));
+ ServerColumnFamily.OPID_COLUMN.put(m, new Value("MERGING:FATE[123abc]"));
+ violations = mc.check(createEnv(), m);
+ assertNull(violations);
}
+ @Test
+ public void testSelectedFiles() {
+ MetadataConstraints mc = new MetadataConstraints();
+ Mutation m;
+ List<Short> violations;
+
+ m = new Mutation(new Text("0;foo"));
+ ServerColumnFamily.SELECTED_COLUMN.put(m, new Value("bad id"));
+ violations = mc.check(createEnv(), m);
+ assertNotNull(violations);
+ assertEquals(1, violations.size());
+ assertEquals(Short.valueOf((short) 11), violations.get(0));
+
+ m = new Mutation(new Text("0;foo"));
+ ServerColumnFamily.SELECTED_COLUMN.put(m,
+ new Value(new SelectedFiles(
+ Set.of(StoredTabletFile
+ .of("hdfs://nn.somewhere.com:86753/accumulo/tables/42/t-0000/F00001.rf")),
+ true, 42L).getMetadataValue()));
+ violations = mc.check(createEnv(), m);
+ assertNull(violations);
+ }
}
diff --cc server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 1d81fee284,7e8efbf963..e29637b2a5
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@@ -228,9 -222,8 +228,8 @@@ public class Manager extends AbstractSe
private final AtomicBoolean managerInitialized = new AtomicBoolean(false);
private final AtomicBoolean managerUpgrading = new AtomicBoolean(false);
- private final long timeToCacheRecoveryWalExistence;
- private final long waitTimeBetweenScans;
+ private final long timeToCacheRecoveryWalExistence;
private ExecutorService tableInformationStatusPool = null;
@Override
diff --cc server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
index 413e19c855,713c034f08..5f1f5f3f5a
--- a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
@@@ -206,9 -178,12 +206,12 @@@ abstract class TabletGroupWatcher exten
// slow things down a little, otherwise we spam the logs when there are many wake-up events
sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+ final long waitTimeBetweenScans = manager.getConfiguration()
+ .getTimeInMillis(Property.MANAGER_TABLET_GROUP_WATCHER_INTERVAL);
+
int totalUnloaded = 0;
int unloaded = 0;
- ClosableIterator<TabletLocationState> iter = null;
+ ClosableIterator<TabletManagement> iter = null;
try {
Map<TableId,MergeStats> mergeStatsCache = new HashMap<>();
Map<TableId,MergeStats> currentMerges = new HashMap<>();
@@@ -276,11 -232,12 +279,11 @@@
flushChanges(tLists, wals);
tLists.reset();
unloaded = 0;
- eventListener.waitForEvents(manager.getWaitTimeBetweenScans());
+ eventListener.waitForEvents(waitTimeBetweenScans);
}
- TableId tableId = tls.extent.tableId();
- TableConfiguration tableConf = manager.getContext().getTableConfiguration(tableId);
+ final TableConfiguration tableConf = manager.getContext().getTableConfiguration(tableId);
- MergeStats mergeStats = mergeStatsCache.computeIfAbsent(tableId, k -> {
+ final MergeStats mergeStats = mergeStatsCache.computeIfAbsent(tableId, k -> {
var mStats = currentMerges.get(k);
return mStats != null ? mStats : new MergeStats(new MergeInfo());
});
diff --cc test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
index c9992b3ccd,18528c1391..6119b1407b
--- a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
@@@ -19,8 -19,7 +19,9 @@@
package org.apache.accumulo.test.functional;
import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toSet;
+ import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
diff --cc test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java
index b60c5e153c,6ca58a4fe1..1530c9ad34
--- a/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java
@@@ -20,8 -20,7 +20,9 @@@ package org.apache.accumulo.test.functi
import static java.util.Collections.singletonMap;
import static java.util.concurrent.TimeUnit.SECONDS;
++import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assumptions.assumeTrue;
@@@ -232,123 -213,6 +233,123 @@@ public class SplitIT extends AccumuloCl
}
}
+ @Test
+ public void testLargeSplit() throws Exception {
+ try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
+ String tableName = getUniqueNames(1)[0];
+ c.tableOperations().create(tableName, new NewTableConfiguration()
+ .setProperties(Map.of(Property.TABLE_MAX_END_ROW_SIZE.getKey(), "10K")));
+
+ byte[] okSplit = new byte[4096];
+ for (int i = 0; i < okSplit.length; i++) {
+ okSplit[i] = (byte) (i % 256);
+ }
+
+ var splits1 = new TreeSet<Text>(List.of(new Text(okSplit)));
+
+ c.tableOperations().addSplits(tableName, splits1);
+
+ assertEquals(splits1, new TreeSet<>(c.tableOperations().listSplits(tableName)));
+
+ byte[] bigSplit = new byte[4096 * 4];
+ for (int i = 0; i < bigSplit.length; i++) {
+ bigSplit[i] = (byte) (i % 256);
+ }
+
+ var splits2 = new TreeSet<Text>(List.of(new Text(bigSplit)));
+ // split should fail because it exceeds the configured max split size
+ assertThrows(AccumuloException.class,
+ () -> c.tableOperations().addSplits(tableName, splits2));
+
+ // ensure the large split is not there
+ assertEquals(splits1, new TreeSet<>(c.tableOperations().listSplits(tableName)));
+ }
+ }
+
+ @Test
+ public void concurrentSplit() throws Exception {
+ try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
+
+ final String tableName = getUniqueNames(1)[0];
+
+ log.debug("Creating table {}", tableName);
+ c.tableOperations().create(tableName);
+
+ final int numRows = 100_000;
+ log.debug("Ingesting {} rows into {}", numRows, tableName);
+ VerifyParams params = new VerifyParams(getClientProps(), tableName, numRows);
+ TestIngest.ingest(c, params);
+
+ log.debug("Verifying {} rows ingested into {}", numRows, tableName);
+ VerifyIngest.verifyIngest(c, params);
+
+ log.debug("Creating futures that add random splits to the table");
+ ExecutorService es = Executors.newFixedThreadPool(10);
+ final int totalFutures = 100;
+ final int splitsPerFuture = 4;
+ final Set<Text> totalSplits = new HashSet<>();
+ List<Callable<Void>> tasks = new ArrayList<>(totalFutures);
+ for (int i = 0; i < totalFutures; i++) {
+ final Pair<Integer,Integer> splitBounds = getRandomSplitBounds(numRows);
+ final TreeSet<Text> splits = TestIngest.getSplitPoints(splitBounds.getFirst().longValue(),
+ splitBounds.getSecond().longValue(), splitsPerFuture);
+ totalSplits.addAll(splits);
+ tasks.add(() -> {
+ c.tableOperations().addSplits(tableName, splits);
+ return null;
+ });
+ }
+
+ log.debug("Submitting futures");
+ List<Future<Void>> futures =
+ tasks.parallelStream().map(es::submit).collect(Collectors.toList());
+
+ log.debug("Waiting for futures to complete");
+ for (Future<?> f : futures) {
+ f.get();
+ }
+ es.shutdown();
+
+ log.debug("Checking that {} splits were created ", totalSplits.size());
+
+ assertEquals(totalSplits, new HashSet<>(c.tableOperations().listSplits(tableName)),
+ "Did not see expected splits");
+
+ // ELASTICITY_TODO the following could be removed after #3309. Currently scanning an ondemand
+ // table with lots of tablets will cause the test to timeout.
+ c.tableOperations().setTabletHostingGoal(tableName, new Range(), TabletHostingGoal.ALWAYS);
+
+ log.debug("Verifying {} rows ingested into {}", numRows, tableName);
+ VerifyIngest.verifyIngest(c, params);
+ }
+ }
+
+ /**
+ * Generates a pair of integers that represent the start and end of a range of splits. The start
+ * and end are randomly generated between 0 and upperBound. The start is guaranteed to be less
+ * than the end and the two bounds are guaranteed to be different values.
+ *
+ * @param upperBound the upper bound of the range of splits
+ * @return a pair of integers that represent the start and end of a range of splits
+ */
+ private Pair<Integer,Integer> getRandomSplitBounds(int upperBound) {
+ Preconditions.checkArgument(upperBound > 1, "upperBound must be greater than 1");
+
- int start = random.nextInt(upperBound);
- int end = random.nextInt(upperBound - 1);
++ int start = RANDOM.get().nextInt(upperBound);
++ int end = RANDOM.get().nextInt(upperBound - 1);
+
+ // ensure start is less than end and that end is not equal to start
+ if (end >= start) {
+ end += 1;
+ } else {
+ int tmp = start;
+ start = end;
+ end = tmp;
+ }
+
+ return new Pair<>(start, end);
+ }
+
private String getDir() throws Exception {
var rootPath = getCluster().getTemporaryPath().toString();
String dir = rootPath + "/" + getUniqueNames(1)[0];