You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2023/11/06 18:18:33 UTC
(accumulo) branch elasticity updated: updates delete table to use conditional muations (#3929)
This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push:
new a131e97052 updates delete table to use conditional muations (#3929)
a131e97052 is described below
commit a131e9705293ef7166cfce3fbdae9f40201430b8
Author: Keith Turner <kt...@apache.org>
AuthorDate: Mon Nov 6 13:18:27 2023 -0500
updates delete table to use conditional muations (#3929)
Updates the delete table fate operation to use conditional mutations.
Also introduces a new mechanism in Ample for processing conditional
mutations that does not buffer everything in memory. This avoid
buffering large amounts of tablet metadata in memory when deleting a
large table. The IT that creates 1 million splits in a table and
deletes it passes with these changes.
---
.../accumulo/core/metadata/schema/Ample.java | 43 ++++++--
.../AsyncConditionalTabletsMutatorImpl.java | 98 ++++++++++++++++++
.../accumulo/server/metadata/ServerAmpleImpl.java | 7 ++
.../accumulo/manager/tableOps/delete/CleanUp.java | 31 +-----
.../manager/tableOps/delete/DeleteTable.java | 2 +-
.../manager/tableOps/delete/PreDeleteTable.java | 1 +
.../manager/tableOps/delete/ReserveTablets.java | 111 +++++++++++++++++++++
.../test/functional/AmpleConditionalWriterIT.java | 77 ++++++++++++++
8 files changed, 332 insertions(+), 38 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
index a23e6da08e..44f1715775 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
+import java.util.function.BiConsumer;
import java.util.function.Predicate;
import java.util.stream.Stream;
@@ -209,7 +210,10 @@ public interface Ample {
}
/**
- * An entry point for updating tablets metadata using a conditional writer.
+ * An entry point for updating tablets metadata using a conditional writer. The returned mutator
+ * will buffer everything in memory until {@link ConditionalTabletsMutator#process()} is called.
+ * If buffering everything in memory is undesirable, then consider using
+ * {@link #conditionallyMutateTablets(BiConsumer)}
*
* @see ConditionalTabletMutator#submit(RejectionHandler)
*/
@@ -217,6 +221,26 @@ public interface Ample {
throw new UnsupportedOperationException();
}
+ /**
+ * An entry point for updating tablets metadata using a conditional writer asynchronously. This
+ * will process conditional mutations in the background as they are added. The benefit of this
+ * method over {@link #conditionallyMutateTablets()} is that it can avoid buffering everything in
+ * memory. Using this method may also be faster as it allows tablet metadata scans and conditional
+ * updates of tablets to run concurrently.
+ *
+ * @param resultsConsumer as conditional mutations are processed in the background their result is
+ * passed to this consumer. This consumer should be thread safe as it may be called from a
+ * different thread.
+ * @return A conditional tablet mutator that will asynchronously report results. Closing this
+ * object will force everything to be processed and reported. The returned object is not
+ * thread safe and is only intended to be used by a single thread.
+ * @see ConditionalTabletMutator#submit(RejectionHandler)
+ */
+ default AsyncConditionalTabletsMutator
+ conditionallyMutateTablets(BiConsumer<KeyExtent,ConditionalResult> resultsConsumer) {
+ throw new UnsupportedOperationException();
+ }
+
default void putGcCandidates(TableId tableId, Collection<StoredTabletFile> candidates) {
throw new UnsupportedOperationException();
}
@@ -264,7 +288,7 @@ public interface Ample {
void close();
}
- public interface ConditionalResult {
+ interface ConditionalResult {
/**
* This enum was created instead of using {@link ConditionalWriter.Status} because Ample has
@@ -292,14 +316,22 @@ public interface Ample {
TabletMetadata readMetadata();
}
- public interface ConditionalTabletsMutator extends AutoCloseable {
-
+ interface AsyncConditionalTabletsMutator extends AutoCloseable {
/**
* @return A fluent interface to conditional mutating a tablet. Ensure you call
* {@link ConditionalTabletMutator#submit(RejectionHandler)} when finished.
*/
OperationRequirements mutateTablet(KeyExtent extent);
+ /**
+ * Closing ensures that all mutations are processed and their results are reported.
+ */
+ @Override
+ void close();
+ }
+
+ interface ConditionalTabletsMutator extends AsyncConditionalTabletsMutator {
+
/**
* After creating one or more conditional mutations using {@link #mutateTablet(KeyExtent)}, call
* this method to process them using a {@link ConditionalWriter}
@@ -307,9 +339,6 @@ public interface Ample {
* @return The result from the {@link ConditionalWriter} of processing each tablet.
*/
Map<KeyExtent,ConditionalResult> process();
-
- @Override
- void close();
}
/**
diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/AsyncConditionalTabletsMutatorImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/AsyncConditionalTabletsMutatorImpl.java
new file mode 100644
index 0000000000..0d31f71eef
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/AsyncConditionalTabletsMutatorImpl.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.metadata;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.function.BiConsumer;
+
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.util.threads.Threads;
+import org.apache.accumulo.server.ServerContext;
+
+public class AsyncConditionalTabletsMutatorImpl implements Ample.AsyncConditionalTabletsMutator {
+ private final BiConsumer<KeyExtent,Ample.ConditionalResult> resultsConsumer;
+ private final ExecutorService executor;
+ private Future<Map<KeyExtent,Ample.ConditionalResult>> backgroundProcessing = null;
+ private ConditionalTabletsMutatorImpl bufferingMutator;
+ private final ServerContext context;
+ private long mutatedTablets = 0;
+ public static final int BATCH_SIZE = 1000;
+
+ AsyncConditionalTabletsMutatorImpl(ServerContext context,
+ BiConsumer<KeyExtent,Ample.ConditionalResult> resultsConsumer) {
+ this.resultsConsumer = Objects.requireNonNull(resultsConsumer);
+ this.bufferingMutator = new ConditionalTabletsMutatorImpl(context);
+ this.context = context;
+ var creatorId = Thread.currentThread().getId();
+ this.executor = Executors.newSingleThreadExecutor(runnable -> Threads.createThread(
+ "Async conditional tablets mutator background thread, created by : #" + creatorId,
+ runnable));
+
+ }
+
+ @Override
+ public Ample.OperationRequirements mutateTablet(KeyExtent extent) {
+ if (mutatedTablets > BATCH_SIZE) {
+ if (backgroundProcessing != null) {
+ // a previous batch of mutations was submitted for processing so wait on it.
+ try {
+ backgroundProcessing.get().forEach(resultsConsumer);
+ } catch (InterruptedException | ExecutionException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ // Spin up processing of the mutations submitted so far in a background thread. Must copy the
+ // reference for the background thread because a new one is about to be created.
+ var bufferingMutatorRef = bufferingMutator;
+ backgroundProcessing = executor.submit(() -> {
+ var result = bufferingMutatorRef.process();
+ bufferingMutatorRef.close();
+ return result;
+ });
+
+ bufferingMutator = new ConditionalTabletsMutatorImpl(context);
+ mutatedTablets = 0;
+ }
+ mutatedTablets++;
+ return bufferingMutator.mutateTablet(extent);
+ }
+
+ @Override
+ public void close() {
+ if (backgroundProcessing != null) {
+ // a previous batch of mutations was submitted for processing so wait on it.
+ try {
+ backgroundProcessing.get().forEach(resultsConsumer);
+ } catch (InterruptedException | ExecutionException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ // process anything not processed so far
+ bufferingMutator.process().forEach(resultsConsumer);
+ bufferingMutator.close();
+ executor.shutdownNow();
+ }
+}
diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
index f8d916e159..a42bc5f488 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
@@ -28,6 +28,7 @@ import java.util.Iterator;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
+import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -93,6 +94,12 @@ public class ServerAmpleImpl extends AmpleImpl implements Ample {
return new ConditionalTabletsMutatorImpl(context);
}
+ @Override
+ public AsyncConditionalTabletsMutator
+ conditionallyMutateTablets(BiConsumer<KeyExtent,ConditionalResult> resultsConsumer) {
+ return new AsyncConditionalTabletsMutatorImpl(context, resultsConsumer);
+ }
+
private void mutateRootGcCandidates(Consumer<RootGcCandidates> mutator) {
String zpath = context.getZooKeeperRoot() + ZROOT_TABLET_GC_CANDIDATES;
try {
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/CleanUp.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/CleanUp.java
index 0b0512b072..4262443dfc 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/CleanUp.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/CleanUp.java
@@ -18,15 +18,10 @@
*/
package org.apache.accumulo.manager.tableOps.delete;
-import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
-import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
-import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SUSPEND;
-
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Map.Entry;
-import java.util.Set;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchScanner;
@@ -40,11 +35,8 @@ import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.iterators.user.GrepIterator;
import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.TServerInstance;
-import org.apache.accumulo.core.metadata.TabletState;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
-import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.tableOps.ManagerRepo;
@@ -87,32 +79,11 @@ class CleanUp extends ManagerRepo {
@Override
public long isReady(long tid, Manager manager) throws Exception {
+ // ELASTICITY_TODO investigate this, what is it for and is it still needed?
if (!manager.hasCycled(creationTime)) {
return 50;
}
- boolean done = true;
-
- try (var tablets = manager.getContext().getAmple().readTablets().forTable(tableId)
- .fetch(LOCATION, PREV_ROW, SUSPEND).checkConsistency().build()) {
- Set<TServerInstance> liveTServers = manager.onlineTabletServers();
- for (TabletMetadata tm : tablets) {
- TabletState state = TabletState.compute(tm, liveTServers);
- if (!state.equals(TabletState.UNASSIGNED)) {
- // This code will even wait on tablets that are assigned to dead tablets servers. This is
- // intentional because the manager may make metadata writes for these tablets. See #587
- log.debug("Still waiting for table({}) to be deleted; Target tablet state: UNASSIGNED, "
- + "Current tablet state: {}, locationState: {}", tableId, state, tm);
- done = false;
- break;
- }
- }
- }
-
- if (!done) {
- return 50;
- }
-
return 0;
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/DeleteTable.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/DeleteTable.java
index e8eb5c8761..b511056e2a 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/DeleteTable.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/DeleteTable.java
@@ -49,7 +49,7 @@ public class DeleteTable extends ManagerRepo {
public Repo<Manager> call(long tid, Manager env) {
env.getTableManager().transitionTableState(tableId, TableState.DELETING);
env.getEventCoordinator().event(tableId, "deleting table %s ", tableId);
- return new CleanUp(tableId, namespaceId);
+ return new ReserveTablets(tableId, namespaceId);
}
@Override
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/PreDeleteTable.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/PreDeleteTable.java
index d9c31c13b8..41ce26dcbb 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/PreDeleteTable.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/PreDeleteTable.java
@@ -57,6 +57,7 @@ public class PreDeleteTable extends ManagerRepo {
private void preventFutureCompactions(Manager environment)
throws KeeperException, InterruptedException {
+ // ELASTICITY_TODO investigate this. Is still needed? Is it still working as expected?
String deleteMarkerPath = createDeleteMarkerPath(environment.getInstanceID(), tableId);
ZooReaderWriter zoo = environment.getContext().getZooReaderWriter();
zoo.putPersistentData(deleteMarkerPath, new byte[] {}, NodeExistsPolicy.SKIP);
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/ReserveTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/ReserveTablets.java
new file mode 100644
index 0000000000..afdd5fc155
--- /dev/null
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/ReserveTablets.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.manager.tableOps.delete;
+
+import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
+import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID;
+import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+
+import org.apache.accumulo.core.data.NamespaceId;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.fate.Repo;
+import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.TabletOperationId;
+import org.apache.accumulo.core.metadata.schema.TabletOperationType;
+import org.apache.accumulo.manager.Manager;
+import org.apache.accumulo.manager.tableOps.ManagerRepo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReserveTablets extends ManagerRepo {
+
+ private static final Logger log = LoggerFactory.getLogger(ReserveTablets.class);
+
+ private static final long serialVersionUID = 1L;
+
+ private final TableId tableId;
+ private final NamespaceId namespaceId;
+
+ public ReserveTablets(TableId tableId, NamespaceId namespaceId) {
+ this.tableId = tableId;
+ this.namespaceId = namespaceId;
+ }
+
+ @Override
+ public long isReady(long tid, Manager manager) throws Exception {
+
+ var opid = TabletOperationId.from(TabletOperationType.DELETING, tid);
+
+ // The consumer may be called in another thread so use an AtomicLong
+ AtomicLong accepted = new AtomicLong(0);
+ BiConsumer<KeyExtent,Ample.ConditionalResult> resultsConsumer = (extent, result) -> {
+ if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) {
+ accepted.incrementAndGet();
+ } else {
+ log.debug("Failed to set operation id {} {}", opid, extent);
+ }
+ };
+
+ long locations = 0;
+ long otherOps = 0;
+ long submitted = 0;
+ long tabletsSeen = 0;
+
+ try (
+ var tablets = manager.getContext().getAmple().readTablets().forTable(tableId)
+ .fetch(OPID, PREV_ROW, LOCATION).checkConsistency().build();
+ var conditionalMutator =
+ manager.getContext().getAmple().conditionallyMutateTablets(resultsConsumer)) {
+
+ for (var tabletMeta : tablets) {
+ tabletsSeen++;
+ if (tabletMeta.getLocation() != null) {
+ locations++;
+ }
+
+ if (tabletMeta.getOperationId() != null) {
+ if (!opid.equals(tabletMeta.getOperationId())) {
+ otherOps++;
+ }
+ } else {
+ conditionalMutator.mutateTablet(tabletMeta.getExtent()).requireAbsentOperation()
+ .putOperation(opid).submit(tm -> opid.equals(tm.getOperationId()));
+ submitted++;
+ }
+ }
+ }
+
+ if (locations > 0 || otherOps > 0 || submitted != accepted.get()) {
+ log.debug("Waiting to delete table locations:{} operations:{} submitted:{} accepted:{}",
+ locations, otherOps, submitted, accepted.get());
+ return Math.min(Math.max(100, tabletsSeen), 30000);
+ }
+
+ return 0;
+ }
+
+ @Override
+ public Repo<Manager> call(long tid, Manager manager) throws Exception {
+ return new CleanUp(tableId, namespaceId);
+ }
+}
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
index 2ee6c5d1e7..3a7e2d3c9d 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
@@ -26,6 +26,8 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FLUSH_ID;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOADED;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
+import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID;
+import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.TIME;
import static org.apache.accumulo.core.util.LazySingletons.GSON;
@@ -43,8 +45,11 @@ import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
@@ -62,6 +67,7 @@ import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.metadata.TServerInstance;
+import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
import org.apache.accumulo.core.metadata.schema.MetadataTime;
@@ -73,6 +79,7 @@ import org.apache.accumulo.core.metadata.schema.TabletOperationId;
import org.apache.accumulo.core.metadata.schema.TabletOperationType;
import org.apache.accumulo.core.security.TablePermission;
import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.server.metadata.AsyncConditionalTabletsMutatorImpl;
import org.apache.accumulo.server.metadata.ConditionalTabletsMutatorImpl;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
@@ -794,4 +801,74 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness {
assertEquals(44L, context.getAmple().readTablet(e1).getFlushId().getAsLong());
}
}
+
+ @Test
+ public void testAsyncMutator() throws Exception {
+ var table = getUniqueNames(2)[1];
+ try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
+ // The AsyncConditionalTabletsMutatorImpl processes batches of conditional mutations. Run
+ // tests where more than the batch size is processed an ensure this handled correctly.
+
+ TreeSet<Text> splits =
+ IntStream.range(1, (int) (AsyncConditionalTabletsMutatorImpl.BATCH_SIZE * 2.5))
+ .mapToObj(i -> new Text(String.format("%06d", i)))
+ .collect(Collectors.toCollection(TreeSet::new));
+
+ assertTrue(splits.size() > AsyncConditionalTabletsMutatorImpl.BATCH_SIZE);
+
+ c.tableOperations().create(table, new NewTableConfiguration().withSplits(splits));
+ var tableId = TableId.of(c.tableOperations().tableIdMap().get(table));
+
+ var ample = cluster.getServerContext().getAmple();
+
+ AtomicLong accepted = new AtomicLong(0);
+ AtomicLong total = new AtomicLong(0);
+ BiConsumer<KeyExtent,Ample.ConditionalResult> resultsConsumer = (extent, result) -> {
+ if (result.getStatus() == Status.ACCEPTED) {
+ accepted.incrementAndGet();
+ }
+ total.incrementAndGet();
+ };
+
+ // run a test where a subset of tablets are modified, all modifications should be accepted
+ var opid1 = TabletOperationId.from(TabletOperationType.MERGING, 50);
+
+ int expected = 0;
+ try (var tablets = ample.readTablets().forTable(tableId).fetch(OPID, PREV_ROW).build();
+ var mutator = ample.conditionallyMutateTablets(resultsConsumer)) {
+ for (var tablet : tablets) {
+ if (tablet.getEndRow() != null
+ && Integer.parseInt(tablet.getEndRow().toString()) % 2 == 0) {
+ mutator.mutateTablet(tablet.getExtent()).requireAbsentOperation().putOperation(opid1)
+ .submit(tm -> opid1.equals(tm.getOperationId()));
+ expected++;
+ }
+ }
+ }
+
+ assertTrue(expected > 0);
+ assertEquals(expected, accepted.get());
+ assertEquals(total.get(), accepted.get());
+
+ // run test where some will be accepted and some will be rejected and ensure the counts come
+ // out as expected.
+ var opid2 = TabletOperationId.from(TabletOperationType.MERGING, 51);
+
+ accepted.set(0);
+ total.set(0);
+
+ try (var tablets = ample.readTablets().forTable(tableId).fetch(OPID, PREV_ROW).build();
+ var mutator = ample.conditionallyMutateTablets(resultsConsumer)) {
+ for (var tablet : tablets) {
+ mutator.mutateTablet(tablet.getExtent()).requireAbsentOperation().putOperation(opid2)
+ .submit(tm -> opid2.equals(tm.getOperationId()));
+ }
+ }
+
+ var numTablets = splits.size() + 1;
+ assertEquals(numTablets - expected, accepted.get());
+ assertEquals(numTablets, total.get());
+ }
+ }
+
}