You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2023/05/23 22:05:43 UTC
[accumulo] branch elasticity updated: Improves handling of conditional writer edge cases in Ample (#3416)
This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push:
new e0612abb7b Improves handling of conditional writer edge cases in Ample (#3416)
e0612abb7b is described below
commit e0612abb7b8ad835350d01d754872d2a736828a5
Author: Keith Turner <kt...@apache.org>
AuthorDate: Tue May 23 18:05:36 2023 -0400
Improves handling of conditional writer edge cases in Ample (#3416)
---
.../accumulo/core/metadata/schema/Ample.java | 131 ++++++++++++++++----
.../manager/state/AbstractTabletStateStore.java | 26 ++--
.../server/manager/state/MetaDataStateStore.java | 6 +-
.../metadata/ConditionalTabletMutatorImpl.java | 18 +--
.../metadata/ConditionalTabletsMutatorImpl.java | 135 ++++++++++++++++-----
.../ConditionalTabletsMutatorImplTest.java | 64 ++++++----
.../tableOps/bulkVer2/CleanUpBulkImport.java | 4 +-
.../manager/tableOps/bulkVer2/LoadFiles.java | 10 +-
.../manager/tableOps/split/DeleteOperationIds.java | 17 ++-
.../accumulo/manager/tableOps/split/PreSplit.java | 9 +-
.../manager/tableOps/split/UpdateTablets.java | 36 ++----
.../test/functional/AmpleConditionalWriterIT.java | 76 ++++++------
12 files changed, 336 insertions(+), 196 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
index 491dad5754..c0ee195825 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
@@ -186,6 +186,11 @@ public interface Ample {
throw new UnsupportedOperationException();
}
+ /**
+ * An entry point for updating tablets metadata using a conditional writer.
+ *
+ * @see ConditionalTabletMutator#submit(RejectionHandler)
+ */
default ConditionalTabletsMutator conditionallyMutateTablets() {
throw new UnsupportedOperationException();
}
@@ -250,23 +255,27 @@ public interface Ample {
public interface ConditionalResult {
/**
- * Returns the status of the conditional mutation. If the status was
- * {@link org.apache.accumulo.core.client.ConditionalWriter.Status#UNKNOWN} and
- * Ample#UknownValidator indicates things are ok then this will return
- * {@link org.apache.accumulo.core.client.ConditionalWriter.Status#ACCEPTED}
+ * This enum was created instead of using {@link ConditionalWriter.Status} because Ample has
+ * automated handling for most of the statuses of the conditional writer and therefore only a
+ * subset are expected to be passed out of Ample. This enum represents the subset that Ample
+ * will actually return.
*/
- ConditionalWriter.Status getStatus();
+ enum Status {
+ ACCEPTED, REJECTED
+ }
+
+ /**
+ * Returns the status of the conditional mutation or may return a computed status of ACCEPTED in
+ * some cases, see {@link ConditionalTabletMutator#submit(RejectionHandler)} for details.
+ */
+ Status getStatus();
KeyExtent getExtent();
/**
* This can only be called when {@link #getStatus()} returns something other than
- * {@link org.apache.accumulo.core.client.ConditionalWriter.Status#ACCEPTED}. It reads that
- * tablets metadata for a failed conditional mutation. This can used used to see why it failed.
- * In the case where {@link #getStatus()} returns
- * {@link org.apache.accumulo.core.client.ConditionalWriter.Status#UNKNOWN} it can be used to
- * see if the mutation actually succeeded or not.
- *
+ * {@link Status#ACCEPTED}. It reads that tablets metadata for a failed conditional mutation.
+ * This can used used to see why it was rejected.
*/
TabletMetadata readMetadata();
}
@@ -275,7 +284,7 @@ public interface Ample {
/**
* @return A fluent interface to conditional mutating a tablet. Ensure you call
- * {@link ConditionalTabletMutator#submit()} when finished.
+ * {@link ConditionalTabletMutator#submit(RejectionHandler)} when finished.
*/
OperationRequirements mutateTablet(KeyExtent extent);
@@ -396,9 +405,9 @@ public interface Ample {
}
/**
- * Convenience interface for handling conditional mutations with a status of UNKNOWN.
+ * Convenience interface for handling conditional mutations with a status of REJECTED.
*/
- interface UnknownValidator extends Predicate<TabletMetadata> {}
+ interface RejectionHandler extends Predicate<TabletMetadata> {}
interface ConditionalTabletMutator extends TabletUpdates<ConditionalTabletMutator> {
@@ -428,18 +437,92 @@ public interface Ample {
ConditionalTabletMutator requirePrevEndRow(Text per);
/**
- * Submits or queues a conditional mutation for processing.
- */
- void submit();
-
- /**
- * @param unknownCheck if the conditional mutation comes back with a status of
- * {@link org.apache.accumulo.core.client.ConditionalWriter.Status#UNKNOWN} then read the
- * tablets metadata and apply this check to see if it should be considered as
+ * <p>
+ * Ample provides the following features on top of the conditional writer to help automate
+ * handling of edges cases that arise when using the conditional writer.
+ * <ul>
+ * <li>Automatically resubmit conditional mutations with a status of
+ * {@link org.apache.accumulo.core.client.ConditionalWriter.Status#UNKNOWN}.</li>
+ * <li>When a mutation is rejected (status of
+ * {@link org.apache.accumulo.core.client.ConditionalWriter.Status#REJECTED}) it will read the
+ * tablets metadata and call the passed rejectionHandler to determine if the mutation should be
+ * considered as accepted.</li>
+ * <li>For status of
+ * {@link org.apache.accumulo.core.client.ConditionalWriter.Status#INVISIBLE_VISIBILITY} and
+ * {@link org.apache.accumulo.core.client.ConditionalWriter.Status#VIOLATED} ample will throw an
+ * exception. This is done so that all code does not have to deal with these unexpected
+ * statuses.</li>
+ * </ul>
+ *
+ * <p>
+ * The motivation behind the rejectionHandler is to help sort things out when conditional
+ * mutations are submitted twice and the subsequent submission is rejected even though the first
+ * submission was accepted. There are two causes for this. First when a threads is running in
+ * something like FATE it may submit a mutation and the thread dies before it sees the response.
+ * Later FATE will run the code again for a second time, submitting a second mutation. The
+ * second cause is ample resubmitting on unknown as mentioned above. Below are a few examples
+ * that go over how Ample will handle these different situations.
+ *
+ * <h3>Example 1</h3>
+ *
+ * <ul>
+ * <li>Conditional mutation CM1 with a condition requiring an absent location that sets a future
+ * location is submitted. When its submitted to ample a rejectionHandler is set that checks the
+ * future location.</li>
+ * <li>Inside Ample CM1 is submitted to a conditional writer and returns a status of UNKNOWN,
+ * but it actually succeeded. This could be caused by the mutation succeeding and the tablet
+ * server dying just before it reports back.</li>
+ * <li>Ample sees the UNKNOWN status and resubmits CM1 for a second time. Because the future
+ * location was set, the mutation is returned to ample with a status of rejected by the
+ * conditional writer.</li>
+ * <li>Because the mutation was rejected, ample reads the tablet metadata and calls the
+ * rejectionHandler. The rejectionHandler sees the future location was set and reports that
+ * everything is ok, therefore ample reports the status as ACCEPTED.</li>
+ * </ul>
+ *
+ * <h3>Example 2</h3>
+ *
+ * <ul>
+ * <li>Conditional mutation CM2 with a condition requiring an absent location that sets a future
+ * location is submitted. When its submitted to ample a rejectionHandler is set that checks the
+ * future location.</li>
+ * <li>Inside Ample CM2 is submitted to a conditional writer and returns a status of UNKNOWN,
+ * but it actually never made it to the tserver. This could be caused by the tablet server dying
+ * just after a network connection was established to send the mutation.</li>
+ * <li>Ample sees the UNKNOWN status and resubmits CM2 for a second time. There is no future
+ * location set so the mutation is returned to ample with a status of accepted by the
+ * conditional writer.</li>
+ * <li>Because the mutation was accepted, ample never calls the rejectionHandler and returns it
+ * as accepted.</li>
+ * </ul>
+ *
+ * <h3>Example 3</h3>
+ *
+ * <ul>
+ * <li>Conditional mutation CM3 with a condition requiring an absent operation that sets the
+ * operation id to a fate transaction id is submitted. When it's submitted to ample a
+ * rejectionHandler is set that checks if the operation id equals the fate transaction id.</li>
+ * <li>The thread running the fate operation dies after submitting the mutation but before
+ * seeing it was actually accepted.</li>
+ * <li>Later fate creates an identical mutation to CM3, lets call it CM3.2, and resubmits it
+ * with the same rejection handler.</li>
+ * <li>CM3.2 is rejected because the operation id is not absent.</li>
+ * <li>Because the mutation was rejected, ample calls the rejectionHandler. The rejectionHandler
+ * sees in the tablet metadata that the operation id is its fate transaction id and reports back
+ * true</li>
+ * <li>When rejectionHandler reports true, ample reports the mutation as accepted.</li>
+ * </ul>
+ *
+ * @param rejectionHandler if the conditional mutation comes back with a status of
+ * {@link org.apache.accumulo.core.client.ConditionalWriter.Status#REJECTED} then read
+ * the tablets metadata and apply this check to see if it should be considered as
* {@link org.apache.accumulo.core.client.ConditionalWriter.Status#ACCEPTED} in the
- * return of {@link ConditionalTabletsMutator#process()}
+ * return of {@link ConditionalTabletsMutator#process()}. The rejection handler is only
+ * called when a tablets metadata exists. If ample reads a tablet's metadata and the
+ * tablet no longer exists, then ample will not call the rejectionHandler with null. It
+ * will let the rejected status carry forward in this case.
*/
- void submit(UnknownValidator unknownCheck);
+ void submit(RejectionHandler rejectionHandler);
}
/**
diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/AbstractTabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/AbstractTabletStateStore.java
index cbd8384923..41516db6a5 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/AbstractTabletStateStore.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/AbstractTabletStateStore.java
@@ -21,14 +21,13 @@ package org.apache.accumulo.server.manager.state;
import java.util.Collection;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
-import org.apache.accumulo.core.client.ConditionalWriter;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.TabletLocationState;
import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
import org.apache.accumulo.server.util.ManagerMetadataUtil;
@@ -61,16 +60,13 @@ public abstract class AbstractTabletStateStore implements TabletStateStore {
conditionalMutator.submit(tabletMetadata -> {
Preconditions.checkArgument(tabletMetadata.getExtent().equals(assignment.tablet));
- // see if we are the current location, if so then the unknown mutation actually
- // succeeded
return tabletMetadata.getLocation() != null && tabletMetadata.getLocation()
.equals(TabletMetadata.Location.current(assignment.server));
});
}
if (tabletsMutator.process().values().stream()
- .anyMatch(result -> result.getStatus() != ConditionalWriter.Status.ACCEPTED)) {
- // TODO should this look at why?
+ .anyMatch(result -> result.getStatus() != Status.ACCEPTED)) {
throw new DistributedStoreException(
"failed to set tablet location, conditional mutation failed");
}
@@ -89,8 +85,6 @@ public abstract class AbstractTabletStateStore implements TabletStateStore {
.putLocation(TabletMetadata.Location.future(assignment.server))
.submit(tabletMetadata -> {
Preconditions.checkArgument(tabletMetadata.getExtent().equals(assignment.tablet));
- // see if we are the future location, if so then the unknown mutation actually
- // succeeded
return tabletMetadata.getLocation() != null && tabletMetadata.getLocation()
.equals(TabletMetadata.Location.future(assignment.server));
});
@@ -98,12 +92,9 @@ public abstract class AbstractTabletStateStore implements TabletStateStore {
var results = tabletsMutator.process();
- if (results.values().stream()
- .anyMatch(result -> result.getStatus() != ConditionalWriter.Status.ACCEPTED)) {
- var statuses = results.values().stream().map(Ample.ConditionalResult::getStatus)
- .collect(Collectors.toSet());
+ if (results.values().stream().anyMatch(result -> result.getStatus() != Status.ACCEPTED)) {
throw new DistributedStoreException(
- "failed to set tablet location, conditional mutation failed. " + statuses);
+ "failed to set tablet location, conditional mutation failed. ");
}
} catch (RuntimeException ex) {
@@ -158,16 +149,13 @@ public abstract class AbstractTabletStateStore implements TabletStateStore {
processSuspension(tabletMutator, tls, suspensionTimestamp);
- tabletMutator.submit(tabletMetadata -> {
- // The status of the conditional update is unknown, so check and see if things are ok
- return tabletMetadata.getLocation() == null;
- });
+ tabletMutator.submit(tabletMetadata -> tabletMetadata.getLocation() == null);
}
Map<KeyExtent,Ample.ConditionalResult> results = tabletsMutator.process();
- if (results.values().stream().anyMatch(conditionalResult -> conditionalResult.getStatus()
- != ConditionalWriter.Status.ACCEPTED)) {
+ if (results.values().stream()
+ .anyMatch(conditionalResult -> conditionalResult.getStatus() != Status.ACCEPTED)) {
throw new DistributedStoreException("Some unassignments did not satisfy conditions.");
}
diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java
index 86cd47552e..a5b898cb9e 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java
@@ -20,11 +20,11 @@ package org.apache.accumulo.server.manager.state;
import java.util.Collection;
-import org.apache.accumulo.core.client.ConditionalWriter;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.TabletLocationState;
import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
class MetaDataStateStore extends AbstractTabletStateStore implements TabletStateStore {
@@ -64,8 +64,8 @@ class MetaDataStateStore extends AbstractTabletStateStore implements TabletState
.submit(tabletMetadata -> tabletMetadata.getSuspend() == null);
}
- boolean unacceptedConditions = tabletsMutator.process().values().stream().anyMatch(
- conditionalResult -> conditionalResult.getStatus() != ConditionalWriter.Status.ACCEPTED);
+ boolean unacceptedConditions = tabletsMutator.process().values().stream()
+ .anyMatch(conditionalResult -> conditionalResult.getStatus() != Status.ACCEPTED);
if (unacceptedConditions) {
throw new DistributedStoreException("Some mutations failed to satisfy conditions");
}
diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java
index 6cfd3add13..cef634a8c7 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java
@@ -55,7 +55,7 @@ public class ConditionalTabletMutatorImpl extends TabletMutatorBase<Ample.Condit
private final Consumer<ConditionalMutation> mutationConsumer;
private final Ample.ConditionalTabletsMutator parent;
- private final BiConsumer<KeyExtent,Ample.UnknownValidator> unknownValidators;
+ private final BiConsumer<KeyExtent,Ample.RejectionHandler> rejectionHandlerConsumer;
private final KeyExtent extent;
@@ -63,12 +63,12 @@ public class ConditionalTabletMutatorImpl extends TabletMutatorBase<Ample.Condit
protected ConditionalTabletMutatorImpl(Ample.ConditionalTabletsMutator parent,
ServerContext context, KeyExtent extent, Consumer<ConditionalMutation> mutationConsumer,
- BiConsumer<KeyExtent,Ample.UnknownValidator> unknownValidators) {
+ BiConsumer<KeyExtent,Ample.RejectionHandler> rejectionHandlerConsumer) {
super(context, new ConditionalMutation(extent.toMetaRow()));
this.mutation = (ConditionalMutation) super.mutation;
this.mutationConsumer = mutationConsumer;
this.parent = parent;
- this.unknownValidators = unknownValidators;
+ this.rejectionHandlerConsumer = rejectionHandlerConsumer;
this.extent = extent;
}
@@ -150,19 +150,11 @@ public class ConditionalTabletMutatorImpl extends TabletMutatorBase<Ample.Condit
}
@Override
- public void submit() {
+ public void submit(Ample.RejectionHandler rejectionCheck) {
Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate.");
Preconditions.checkState(sawOperationRequirement, "No operation requirements were seen");
getMutation();
mutationConsumer.accept(mutation);
- }
-
- @Override
- public void submit(Ample.UnknownValidator unknownCheck) {
- Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate.");
- Preconditions.checkState(sawOperationRequirement, "No operation requirements were seen");
- getMutation();
- mutationConsumer.accept(mutation);
- unknownValidators.accept(extent, unknownCheck);
+ rejectionHandlerConsumer.accept(extent, rejectionCheck);
}
}
diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java
index 940555276f..cfcc1fec49 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java
@@ -18,8 +18,13 @@
*/
package org.apache.accumulo.server.metadata;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -29,12 +34,12 @@ import java.util.stream.Collectors;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.ConditionalWriter;
-import org.apache.accumulo.core.client.ConditionalWriter.Status;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.ConditionalMutation;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.util.Retry;
import org.apache.accumulo.server.ServerContext;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
@@ -58,7 +63,7 @@ public class ConditionalTabletsMutatorImpl implements Ample.ConditionalTabletsMu
private boolean active = true;
- Map<KeyExtent,Ample.UnknownValidator> unknownValidators = new HashMap<>();
+ Map<KeyExtent,Ample.RejectionHandler> rejectedHandlers = new HashMap<>();
public ConditionalTabletsMutatorImpl(ServerContext context) {
this.context = context;
@@ -80,7 +85,7 @@ public class ConditionalTabletsMutatorImpl implements Ample.ConditionalTabletsMu
Preconditions.checkState(extents.putIfAbsent(extent.toMetaRow(), extent) == null,
"Duplicate extents not handled");
return new ConditionalTabletMutatorImpl(this, context, extent, mutations::add,
- unknownValidators::put);
+ rejectedHandlers::put);
}
protected ConditionalWriter createConditionalWriter(Ample.DataLevel dataLevel)
@@ -109,7 +114,7 @@ public class ConditionalTabletsMutatorImpl implements Ample.ConditionalTabletsMu
var extents = results.entrySet().stream().filter(e -> {
try {
- return e.getValue().getStatus() != Status.ACCEPTED;
+ return e.getValue().getStatus() != ConditionalWriter.Status.ACCEPTED;
} catch (AccumuloException | AccumuloSecurityException ex) {
throw new RuntimeException(ex);
}
@@ -122,12 +127,83 @@ public class ConditionalTabletsMutatorImpl implements Ample.ConditionalTabletsMu
return readTablets(extents);
}
+ private void partitionResults(Iterator<ConditionalWriter.Result> results,
+ List<ConditionalWriter.Result> resultsList, List<ConditionalWriter.Result> unknownResults) {
+ while (results.hasNext()) {
+ var result = results.next();
+
+ try {
+ if (result.getStatus() == ConditionalWriter.Status.UNKNOWN) {
+ unknownResults.add(result);
+ } else {
+ resultsList.add(result);
+ }
+ } catch (AccumuloException | AccumuloSecurityException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private Iterator<ConditionalWriter.Result> writeMutations(ConditionalWriter conditionalWriter) {
+ var results = conditionalWriter.write(mutations.iterator());
+
+ List<ConditionalWriter.Result> resultsList = new ArrayList<>();
+ List<ConditionalWriter.Result> unknownResults = new ArrayList<>();
+ partitionResults(results, resultsList, unknownResults);
+
+ Retry retry = null;
+
+ while (!unknownResults.isEmpty()) {
+ try {
+ if (retry == null) {
+ retry = Retry.builder().infiniteRetries().retryAfter(100, MILLISECONDS)
+ .incrementBy(100, MILLISECONDS).maxWait(2, SECONDS).backOffFactor(1.5)
+ .logInterval(3, MINUTES).createRetry();
+ }
+ retry.waitForNextAttempt(log, "handle conditional mutations with unknown status");
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+
+ results = conditionalWriter
+ .write(unknownResults.stream().map(ConditionalWriter.Result::getMutation).iterator());
+
+ // create a new array instead of clearing in case the above has not consumed everything
+ unknownResults = new ArrayList<>();
+
+ partitionResults(results, resultsList, unknownResults);
+ }
+
+ return resultsList.iterator();
+ }
+
+ private Ample.ConditionalResult.Status mapStatus(KeyExtent extent,
+ ConditionalWriter.Result result) {
+
+ ConditionalWriter.Status status = null;
+ try {
+ status = result.getStatus();
+ } catch (AccumuloException | AccumuloSecurityException e) {
+ throw new IllegalStateException(e);
+ }
+
+ switch (status) {
+ case REJECTED:
+ return Ample.ConditionalResult.Status.REJECTED;
+ case ACCEPTED:
+ return Ample.ConditionalResult.Status.ACCEPTED;
+ default:
+ throw new IllegalStateException(
+ "Unexpected conditional mutation status : " + extent + " " + status);
+ }
+ }
+
@Override
public Map<KeyExtent,Ample.ConditionalResult> process() {
Preconditions.checkState(active);
if (dataLevel != null) {
try (ConditionalWriter conditionalWriter = createConditionalWriter(dataLevel)) {
- var results = conditionalWriter.write(mutations.iterator());
+ var results = writeMutations(conditionalWriter);
var resultsMap = new HashMap<KeyExtent,ConditionalWriter.Result>();
@@ -138,22 +214,7 @@ public class ConditionalTabletsMutatorImpl implements Ample.ConditionalTabletsMu
}
var extentsSet = Set.copyOf(extents.values());
- if (!resultsMap.keySet().equals(Set.copyOf(extents.values()))) {
- // ELASTICITY_TODO this check can trigger if someone forgets to submit, could check for
- // that
-
- Sets.difference(resultsMap.keySet(), extentsSet)
- .forEach(extent -> log.error("Unexpected extent seen in in result {}", extent));
-
- Sets.difference(extentsSet, resultsMap.keySet())
- .forEach(extent -> log.error("Expected extent not seen in result {}", extent));
-
- resultsMap.forEach((keyExtent, result) -> {
- log.error("result seen {} {}", keyExtent, new Text(result.getMutation().getRow()));
- });
-
- throw new AssertionError("Not all extents were seen, this is unexpected");
- }
+ ensureAllExtentsSeen(resultsMap, extentsSet);
// only fetch the metadata for failures when requested and when it is requested fetch all
// of the failed extents at once to avoid fetching them one by one.
@@ -161,20 +222,16 @@ public class ConditionalTabletsMutatorImpl implements Ample.ConditionalTabletsMu
return Maps.transformEntries(resultsMap, (extent, result) -> new Ample.ConditionalResult() {
- private Status _getStatus() {
- try {
- return result.getStatus();
- } catch (AccumuloException | AccumuloSecurityException e) {
- throw new RuntimeException(e);
- }
+ private Ample.ConditionalResult.Status _getStatus() {
+ return mapStatus(extent, result);
}
@Override
public Status getStatus() {
var status = _getStatus();
- if (status == Status.UNKNOWN && unknownValidators.containsKey(extent)) {
+ if (status == Status.REJECTED && rejectedHandlers.containsKey(extent)) {
var tabletMetadata = readMetadata();
- if (tabletMetadata != null && unknownValidators.get(extent).test(tabletMetadata)) {
+ if (tabletMetadata != null && rejectedHandlers.get(extent).test(tabletMetadata)) {
return Status.ACCEPTED;
}
}
@@ -211,6 +268,26 @@ public class ConditionalTabletsMutatorImpl implements Ample.ConditionalTabletsMu
}
}
+ private void ensureAllExtentsSeen(HashMap<KeyExtent,ConditionalWriter.Result> resultsMap,
+ Set<KeyExtent> extentsSet) {
+ if (!resultsMap.keySet().equals(Set.copyOf(extents.values()))) {
+ // ELASTICITY_TODO this check can trigger if someone forgets to submit, could check for
+ // that
+
+ Sets.difference(resultsMap.keySet(), extentsSet)
+ .forEach(extent -> log.error("Unexpected extent seen in in result {}", extent));
+
+ Sets.difference(extentsSet, resultsMap.keySet())
+ .forEach(extent -> log.error("Expected extent not seen in result {}", extent));
+
+ resultsMap.forEach((keyExtent, result) -> {
+ log.error("result seen {} {}", keyExtent, new Text(result.getMutation().getRow()));
+ });
+
+ throw new AssertionError("Not all extents were seen, this is unexpected");
+ }
+ }
+
@Override
public void close() {}
}
diff --git a/server/base/src/test/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImplTest.java b/server/base/src/test/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImplTest.java
index 569b6190a4..e4d6bd51ff 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImplTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImplTest.java
@@ -28,7 +28,6 @@ import java.util.function.Function;
import java.util.stream.StreamSupport;
import org.apache.accumulo.core.client.ConditionalWriter;
-import org.apache.accumulo.core.client.ConditionalWriter.Status;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.ConditionalMutation;
import org.apache.accumulo.core.data.TableId;
@@ -45,9 +44,11 @@ public class ConditionalTabletsMutatorImplTest {
static class TestConditionalTabletsMutator extends ConditionalTabletsMutatorImpl {
private final Map<KeyExtent,TabletMetadata> failedExtents;
- private final Function<Text,Status> statuses;
+ private final List<Function<Text,ConditionalWriter.Status>> statuses;
- public TestConditionalTabletsMutator(Function<Text,Status> statuses,
+ private int attempt = 0;
+
+ public TestConditionalTabletsMutator(List<Function<Text,ConditionalWriter.Status>> statuses,
Map<KeyExtent,TabletMetadata> failedExtents) {
super(null);
this.statuses = statuses;
@@ -64,8 +65,10 @@ public class ConditionalTabletsMutatorImplTest {
@Override
public Iterator<Result> write(Iterator<ConditionalMutation> mutations) {
Iterable<ConditionalMutation> iterable = () -> mutations;
+ var localAttempt = attempt++;
return StreamSupport.stream(iterable.spliterator(), false)
- .map(cm -> new Result(statuses.apply(new Text(cm.getRow())), cm, "server"))
+ .map(cm -> new Result(statuses.get(localAttempt).apply(new Text(cm.getRow())), cm,
+ "server"))
.iterator();
}
@@ -83,9 +86,10 @@ public class ConditionalTabletsMutatorImplTest {
}
@Test
- public void testUnknownValidation() {
+ public void testRejectionHandler() {
- // this test checks the handling of conditional mutations that return a status of unknown
+ // this test checks the handling of conditional mutations that return a status of unknown and
+ // rejected
var ke1 = new KeyExtent(TableId.of("1"), null, null);
@@ -100,41 +104,51 @@ public class ConditionalTabletsMutatorImplTest {
EasyMock.replay(tm2);
var ke3 = new KeyExtent(TableId.of("b"), null, null);
+
+ TabletMetadata tm3 = EasyMock.createMock(TabletMetadata.class);
+ EasyMock.expect(tm3.getDirName()).andReturn("dir4").anyTimes();
+ EasyMock.replay(tm3);
+
var ke4 = new KeyExtent(TableId.of("c"), null, null);
- var failedExtents = Map.of(ke1, tm1, ke2, tm2);
- var statuses = Map.of(ke1.toMetaRow(), Status.UNKNOWN, ke2.toMetaRow(), Status.UNKNOWN,
- ke3.toMetaRow(), Status.REJECTED, ke4.toMetaRow(), Status.ACCEPTED);
+ TabletMetadata tm4 = EasyMock.createMock(TabletMetadata.class);
+ EasyMock.expect(tm4.getDirName()).andReturn("dir5").anyTimes();
+ EasyMock.replay(tm4);
+
+ var failedExtents = Map.of(ke1, tm1, ke2, tm2, ke3, tm3, ke4, tm4);
+
+ // expect retry on unknown
+ var statuses1 = Map.of(ke1.toMetaRow(), ConditionalWriter.Status.UNKNOWN, ke2.toMetaRow(),
+ ConditionalWriter.Status.UNKNOWN, ke3.toMetaRow(), ConditionalWriter.Status.REJECTED,
+ ke4.toMetaRow(), ConditionalWriter.Status.ACCEPTED);
+
+ // on the 2nd retry, return rejected
+ var statuses2 = Map.of(ke1.toMetaRow(), ConditionalWriter.Status.REJECTED, ke2.toMetaRow(),
+ ConditionalWriter.Status.REJECTED);
+
+ try (var mutator =
+ new TestConditionalTabletsMutator(List.of(statuses1::get, statuses2::get), failedExtents)) {
- try (var mutator = new TestConditionalTabletsMutator(statuses::get, failedExtents)) {
- // passed in unknown handler should determine the mutations status should be accepted
mutator.mutateTablet(ke1).requireAbsentOperation().putDirName("dir1")
.submit(tmeta -> tmeta.getDirName().equals("dir1"));
- // passed in unknown handler should determine the mutations status should continue to be
- // UNKNOWN
mutator.mutateTablet(ke2).requireAbsentOperation().putDirName("dir3")
.submit(tmeta -> tmeta.getDirName().equals("dir3"));
- // ensure the unknown handler is only called when the status is unknown, this mutation will
- // have a status of REJECTED
- mutator.mutateTablet(ke3).requireAbsentOperation().putDirName("dir3").submit(tmeta -> {
- throw new IllegalStateException();
- });
+ mutator.mutateTablet(ke3).requireAbsentOperation().putDirName("dir4")
+ .submit(tmeta -> tmeta.getDirName().equals("dir4"));
- // ensure the unknown handler is only called when the status is unknown, this mutations will
- // have a status of ACCEPTED
- mutator.mutateTablet(ke4).requireAbsentOperation().putDirName("dir3").submit(tmeta -> {
+ mutator.mutateTablet(ke4).requireAbsentOperation().putDirName("dir5").submit(tmeta -> {
throw new IllegalStateException();
});
Map<KeyExtent,Ample.ConditionalResult> results = mutator.process();
assertEquals(Set.of(ke1, ke2, ke3, ke4), results.keySet());
- assertEquals(Status.ACCEPTED, results.get(ke1).getStatus());
- assertEquals(Status.UNKNOWN, results.get(ke2).getStatus());
- assertEquals(Status.REJECTED, results.get(ke3).getStatus());
- assertEquals(Status.ACCEPTED, results.get(ke4).getStatus());
+ assertEquals(Ample.ConditionalResult.Status.ACCEPTED, results.get(ke1).getStatus());
+ assertEquals(Ample.ConditionalResult.Status.REJECTED, results.get(ke2).getStatus());
+ assertEquals(Ample.ConditionalResult.Status.ACCEPTED, results.get(ke3).getStatus());
+ assertEquals(Ample.ConditionalResult.Status.ACCEPTED, results.get(ke4).getStatus());
}
}
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java
index 2760abd8bd..139762d557 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java
@@ -27,13 +27,13 @@ import java.util.Collections;
import java.util.Map;
import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.ConditionalWriter.Status;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.fate.FateTxId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.gc.ReferenceFile;
import org.apache.accumulo.core.manager.thrift.BulkImportState;
import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
import org.apache.accumulo.core.util.Retry;
import org.apache.accumulo.manager.Manager;
@@ -111,7 +111,7 @@ public class CleanUpBulkImport extends ManagerRepo {
tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation();
tablet.getLoaded().entrySet().stream().filter(entry -> entry.getValue() == tid)
.map(Map.Entry::getKey).forEach(tabletMutator::deleteBulkFile);
- tabletMutator.submit();
+ tabletMutator.submit(tm -> false);
}
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
index f739fa1b03..1efd621a0e 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
@@ -30,7 +30,6 @@ import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
-import org.apache.accumulo.core.client.ConditionalWriter;
import org.apache.accumulo.core.clientImpl.bulk.Bulk;
import org.apache.accumulo.core.clientImpl.bulk.Bulk.Files;
import org.apache.accumulo.core.clientImpl.bulk.BulkSerialize;
@@ -42,6 +41,7 @@ import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.manager.thrift.BulkImportState;
import org.apache.accumulo.core.metadata.TabletFile;
import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
@@ -139,7 +139,7 @@ class LoadFiles extends ManagerRepo {
tabletMutator.putFile(f, v);
});
- tabletMutator.submit();
+ tabletMutator.submit(tm -> false);
}
}
}
@@ -147,15 +147,15 @@ class LoadFiles extends ManagerRepo {
long finish() {
var results = conditionalMutator.process();
- boolean allDone = results.values().stream()
- .allMatch(result -> result.getStatus() == ConditionalWriter.Status.ACCEPTED);
+ boolean allDone =
+ results.values().stream().allMatch(result -> result.getStatus() == Status.ACCEPTED);
long sleepTime = 0;
if (!allDone) {
sleepTime = 1000;
results.forEach((extent, condResult) -> {
- if (condResult.getStatus() != ConditionalWriter.Status.ACCEPTED) {
+ if (condResult.getStatus() != Status.ACCEPTED) {
var metadata = condResult.readMetadata();
log.debug("Tablet update failed {} {} {} {} {} {}", FateTxId.formatTid(tid), extent,
condResult.getStatus(), metadata.getOperationId(), metadata.getLocation(),
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/DeleteOperationIds.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/DeleteOperationIds.java
index 181bc71e62..6528b7e21c 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/DeleteOperationIds.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/DeleteOperationIds.java
@@ -20,9 +20,9 @@ package org.apache.accumulo.manager.tableOps.split;
import java.util.stream.Collectors;
-import org.apache.accumulo.core.client.ConditionalWriter;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status;
import org.apache.accumulo.core.metadata.schema.TabletOperationId;
import org.apache.accumulo.core.metadata.schema.TabletOperationType;
import org.apache.accumulo.manager.Manager;
@@ -44,25 +44,22 @@ public class DeleteOperationIds extends ManagerRepo {
try (var tabletsMutator = manager.getContext().getAmple().conditionallyMutateTablets()) {
// As long as the operation is not our operation id, then this step can be considered
- // successful in the case of unknown. If this repo is running for a second time and has
+ // successful in the case of rejection. If this repo is running for a second time and has
// already deleted the operation id, then it could be absent or set by another fate operation.
- Ample.UnknownValidator unknownValidator =
- tabletMetadata -> tabletMetadata.getOperationId() == null
- || !tabletMetadata.getOperationId().equals(opid);
+ Ample.RejectionHandler rejectionHandler =
+ tabletMetadata -> !opid.equals(tabletMetadata.getOperationId());
splitInfo.getTablets().forEach(extent -> {
tabletsMutator.mutateTablet(extent).requireOperation(opid).deleteOperation()
- .submit(unknownValidator);
+ .submit(rejectionHandler);
});
var results = tabletsMutator.process();
- boolean allAccepted = results.values().stream()
- .allMatch(result -> result.getStatus() == ConditionalWriter.Status.ACCEPTED);
+ boolean allAccepted =
+ results.values().stream().allMatch(result -> result.getStatus() == Status.ACCEPTED);
if (!allAccepted) {
- // ELASTICITY_TODO not handling the case where running a 2nd time in the case of failures.
- // Fix this when improving unknown handling.
throw new IllegalStateException(
"Failed to delete operation ids " + splitInfo.getOriginal() + " " + results.values()
.stream().map(Ample.ConditionalResult::getStatus).collect(Collectors.toSet()));
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java
index 9d8b853fba..ae0bfae78a 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java
@@ -25,11 +25,11 @@ import java.util.Objects;
import java.util.Optional;
import java.util.SortedSet;
-import org.apache.accumulo.core.client.ConditionalWriter;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.fate.FateTxId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
import org.apache.accumulo.core.metadata.schema.TabletOperationId;
@@ -77,12 +77,11 @@ public class PreSplit extends ManagerRepo {
tabletsMutator.mutateTablet(splitInfo.getOriginal()).requireAbsentOperation()
.requireAbsentLocation().requirePrevEndRow(splitInfo.getOriginal().prevEndRow())
- .putOperation(opid)
- .submit(tmeta -> tmeta.getOperationId() != null && tmeta.getOperationId().equals(opid));
+ .putOperation(opid).submit(tmeta -> opid.equals(tmeta.getOperationId()));
Map<KeyExtent,Ample.ConditionalResult> results = tabletsMutator.process();
- if (results.get(splitInfo.getOriginal()).getStatus() == ConditionalWriter.Status.ACCEPTED) {
+ if (results.get(splitInfo.getOriginal()).getStatus() == Status.ACCEPTED) {
log.trace("{} reserved {} for split", FateTxId.formatTid(tid), splitInfo.getOriginal());
return 0;
} else {
@@ -91,7 +90,7 @@ public class PreSplit extends ManagerRepo {
// its possible the tablet no longer exists
var optMeta = Optional.ofNullable(tabletMetadata);
- log.trace("{} Failed to set operation id. extent:{} location:{} opid:{}",
+ log.debug("{} Failed to set operation id. extent:{} location:{} opid:{}",
FateTxId.formatTid(tid), splitInfo.getOriginal(),
optMeta.map(TabletMetadata::getLocation).orElse(null),
optMeta.map(TabletMetadata::getOperationId).orElse(null));
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java
index 09db1e11f8..1ed5de53b9 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java
@@ -26,12 +26,12 @@ import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
-import org.apache.accumulo.core.client.ConditionalWriter;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.fate.FateTxId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.metadata.StoredTabletFile;
+import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.metadata.schema.TabletOperationId;
@@ -169,7 +169,7 @@ public class UpdateTablets extends ManagerRepo {
try (var tabletsMutator = manager.getContext().getAmple().conditionallyMutateTablets()) {
for (var newExtent : newTablets) {
if (newExtent.equals(newTablets.last())) {
- // Skip the last tablet, its done in the next fate step.
+ // Skip the last tablet, its done after successfully adding all new tablets
continue;
}
@@ -188,28 +188,15 @@ public class UpdateTablets extends ManagerRepo {
newTabletsFiles.get(newExtent).forEach(mutator::putFile);
- mutator.submit(afterMeta -> afterMeta.getOperationId() != null
- && afterMeta.getOperationId().equals(opid));
+ mutator.submit(afterMeta -> opid.equals(afterMeta.getOperationId()));
}
var results = tabletsMutator.process();
results.values().forEach(result -> {
var status = result.getStatus();
- if (status == ConditionalWriter.Status.REJECTED) {
- // lets see if this was rejected because this operation is running again in the case of
- // failure
- var newTabletMetadata = result.readMetadata();
- if (newTabletMetadata != null && opid.equals(newTabletMetadata.getOperationId())) {
- log.trace(
- "{} {} creating new tablet was rejected because it existed, operation probably failed before.",
- FateTxId.formatTid(tid), result.getExtent());
- return;
- }
- }
- Preconditions.checkState(status == ConditionalWriter.Status.ACCEPTED,
- "Failed to add new tablet %s %s %s", status, splitInfo.getOriginal(),
- result.getExtent());
+ Preconditions.checkState(status == Status.ACCEPTED, "Failed to add new tablet %s %s %s",
+ status, splitInfo.getOriginal(), result.getExtent());
});
}
}
@@ -234,24 +221,21 @@ public class UpdateTablets extends ManagerRepo {
}
});
- mutator.submit();
+ mutator.submit(tm -> false);
var result = tabletsMutator.process().get(splitInfo.getOriginal());
- if (result.getStatus() == ConditionalWriter.Status.UNKNOWN) {
- // Can not use Ample's built in code for checking unknown because we are changing the prev
- // end row, so much check it manually
+ if (result.getStatus() == Status.REJECTED) {
+ // Can not use Ample's built in code for checking rejected because we are changing the prev
+ // end row and Ample would try to read the old tablet, so must check it manually.
var tabletMeta = manager.getContext().getAmple().readTablet(newExtent);
if (tabletMeta == null || !tabletMeta.getOperationId().equals(opid)) {
- // ELASTICITY_TODO need to retry when an UNKNOWN condition is seen, its possible the
- // mutation never made it to the tserver. May want ample to always retry on unknown and
- // change the unknown handler to a rejected handler.
throw new IllegalStateException("Failed to update existing tablet in split "
+ splitInfo.getOriginal() + " " + result.getStatus() + " " + result.getExtent());
}
- } else if (result.getStatus() != ConditionalWriter.Status.ACCEPTED) {
+ } else if (result.getStatus() != Status.ACCEPTED) {
// maybe this step is being run again and the update was already made
throw new IllegalStateException("Failed to update existing tablet in split "
+ splitInfo.getOriginal() + " " + result.getStatus() + " " + result.getExtent());
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
index ed99f59686..ee18a6133a 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
@@ -30,7 +30,6 @@ import java.util.TreeSet;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
-import org.apache.accumulo.core.client.ConditionalWriter.Status;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
@@ -38,6 +37,7 @@ import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.metadata.TServerInstance;
+import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType;
@@ -78,7 +78,7 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness {
var ctmi = new ConditionalTabletsMutatorImpl(context);
ctmi.mutateTablet(e1).requireAbsentOperation().requireAbsentLocation()
- .putLocation(Location.future(ts1)).submit();
+ .putLocation(Location.future(ts1)).submit(tm -> false);
var results = ctmi.process();
assertEquals(Status.ACCEPTED, results.get(e1).getStatus());
@@ -86,7 +86,7 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness {
ctmi = new ConditionalTabletsMutatorImpl(context);
ctmi.mutateTablet(e1).requireAbsentOperation().requireAbsentLocation()
- .putLocation(Location.future(ts2)).submit();
+ .putLocation(Location.future(ts2)).submit(tm -> false);
results = ctmi.process();
assertEquals(Status.REJECTED, results.get(e1).getStatus());
@@ -94,7 +94,8 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness {
ctmi = new ConditionalTabletsMutatorImpl(context);
ctmi.mutateTablet(e1).requireAbsentOperation().requireLocation(Location.future(ts1))
- .putLocation(Location.current(ts1)).deleteLocation(Location.future(ts1)).submit();
+ .putLocation(Location.current(ts1)).deleteLocation(Location.future(ts1))
+ .submit(tm -> false);
results = ctmi.process();
assertEquals(Status.ACCEPTED, results.get(e1).getStatus());
@@ -102,7 +103,8 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness {
ctmi = new ConditionalTabletsMutatorImpl(context);
ctmi.mutateTablet(e1).requireAbsentOperation().requireLocation(Location.future(ts1))
- .putLocation(Location.current(ts1)).deleteLocation(Location.future(ts1)).submit();
+ .putLocation(Location.current(ts1)).deleteLocation(Location.future(ts1))
+ .submit(tm -> false);
results = ctmi.process();
assertEquals(Status.REJECTED, results.get(e1).getStatus());
@@ -110,7 +112,8 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness {
ctmi = new ConditionalTabletsMutatorImpl(context);
ctmi.mutateTablet(e1).requireAbsentOperation().requireLocation(Location.future(ts2))
- .putLocation(Location.current(ts2)).deleteLocation(Location.future(ts2)).submit();
+ .putLocation(Location.current(ts2)).deleteLocation(Location.future(ts2))
+ .submit(tm -> false);
results = ctmi.process();
assertEquals(Status.REJECTED, results.get(e1).getStatus());
@@ -118,7 +121,7 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness {
ctmi = new ConditionalTabletsMutatorImpl(context);
ctmi.mutateTablet(e1).requireAbsentOperation().requireLocation(Location.current(ts1))
- .deleteLocation(Location.current(ts1)).submit();
+ .deleteLocation(Location.current(ts1)).submit(tm -> false);
results = ctmi.process();
assertEquals(Status.ACCEPTED, results.get(e1).getStatus());
@@ -162,7 +165,7 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness {
// simulate a compaction where the tablet location is not set
var ctmi = new ConditionalTabletsMutatorImpl(context);
ctmi.mutateTablet(e1).requireAbsentOperation().requireFile(stf1).requireFile(stf2)
- .requireFile(stf3).putFile(stf4, new DataFileValue(0, 0)).submit();
+ .requireFile(stf3).putFile(stf4, new DataFileValue(0, 0)).submit(tm -> false);
var results = ctmi.process();
assertEquals(Status.REJECTED, results.get(e1).getStatus());
@@ -172,7 +175,7 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness {
for (StoredTabletFile file : List.of(stf1, stf2, stf3)) {
ctmi = new ConditionalTabletsMutatorImpl(context);
ctmi.mutateTablet(e1).requireAbsentOperation().requireLocation(Location.current(ts1))
- .putFile(file, new DataFileValue(0, 0)).submit();
+ .putFile(file, new DataFileValue(0, 0)).submit(tm -> false);
results = ctmi.process();
assertEquals(Status.REJECTED, results.get(e1).getStatus());
}
@@ -182,7 +185,7 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness {
// set the location
ctmi = new ConditionalTabletsMutatorImpl(context);
ctmi.mutateTablet(e1).requireAbsentOperation().requireAbsentLocation()
- .putLocation(Location.current(ts1)).submit();
+ .putLocation(Location.current(ts1)).submit(tm -> false);
results = ctmi.process();
assertEquals(Status.ACCEPTED, results.get(e1).getStatus());
@@ -190,7 +193,7 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness {
for (StoredTabletFile file : List.of(stf1, stf2, stf3)) {
ctmi = new ConditionalTabletsMutatorImpl(context);
ctmi.mutateTablet(e1).requireAbsentOperation().requireLocation(Location.current(ts2))
- .putFile(file, new DataFileValue(0, 0)).submit();
+ .putFile(file, new DataFileValue(0, 0)).submit(tm -> false);
results = ctmi.process();
assertEquals(Status.REJECTED, results.get(e1).getStatus());
}
@@ -201,7 +204,7 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness {
for (StoredTabletFile file : List.of(stf1, stf2, stf3)) {
ctmi = new ConditionalTabletsMutatorImpl(context);
ctmi.mutateTablet(e1).requireAbsentOperation().requireLocation(Location.current(ts1))
- .putFile(file, new DataFileValue(0, 0)).submit();
+ .putFile(file, new DataFileValue(0, 0)).submit(tm -> false);
results = ctmi.process();
assertEquals(Status.ACCEPTED, results.get(e1).getStatus());
}
@@ -212,7 +215,7 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness {
ctmi = new ConditionalTabletsMutatorImpl(context);
ctmi.mutateTablet(e1).requireAbsentOperation().requireFile(stf1).requireFile(stf2)
.requireFile(stf3).putFile(stf4, new DataFileValue(0, 0)).deleteFile(stf1)
- .deleteFile(stf2).deleteFile(stf3).submit();
+ .deleteFile(stf2).deleteFile(stf3).submit(tm -> false);
results = ctmi.process();
assertEquals(Status.ACCEPTED, results.get(e1).getStatus());
@@ -227,7 +230,7 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness {
ctmi = new ConditionalTabletsMutatorImpl(context);
ctmi.mutateTablet(e1).requireAbsentOperation().requireAbsentBulkFile(stf5)
.putFile(stf5, new DataFileValue(0, 0)).putBulkFile(stf5, 9L)
- .putFile(stf5, new DataFileValue(0, 0)).submit();
+ .putFile(stf5, new DataFileValue(0, 0)).submit(tm -> false);
results = ctmi.process();
assertEquals(Status.ACCEPTED, results.get(e1).getStatus());
@@ -238,7 +241,8 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness {
"hdfs://localhost:8020/accumulo/tables/2a/default_tablet/A0000075.rf");
ctmi = new ConditionalTabletsMutatorImpl(context);
ctmi.mutateTablet(e1).requireAbsentOperation().requireFile(stf4).requireFile(stf5)
- .putFile(stf6, new DataFileValue(0, 0)).deleteFile(stf4).deleteFile(stf5).submit();
+ .putFile(stf6, new DataFileValue(0, 0)).deleteFile(stf4).deleteFile(stf5)
+ .submit(tm -> false);
results = ctmi.process();
assertEquals(Status.ACCEPTED, results.get(e1).getStatus());
@@ -248,7 +252,7 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness {
ctmi = new ConditionalTabletsMutatorImpl(context);
ctmi.mutateTablet(e1).requireAbsentOperation().requireAbsentBulkFile(stf5)
.putFile(stf5, new DataFileValue(0, 0)).putBulkFile(stf5, 9L)
- .putFile(stf5, new DataFileValue(0, 0)).submit();
+ .putFile(stf5, new DataFileValue(0, 0)).submit(tm -> false);
results = ctmi.process();
assertEquals(Status.REJECTED, results.get(e1).getStatus());
@@ -282,9 +286,9 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness {
var ctmi = new ConditionalTabletsMutatorImpl(context);
ctmi.mutateTablet(e1).requireAbsentOperation().requireAbsentLocation()
- .putLocation(Location.future(ts1)).submit();
+ .putLocation(Location.future(ts1)).submit(tm -> false);
ctmi.mutateTablet(e2).requireAbsentOperation().requireAbsentLocation()
- .putLocation(Location.future(ts2)).submit();
+ .putLocation(Location.future(ts2)).submit(tm -> false);
var results = ctmi.process();
assertEquals(Status.ACCEPTED, results.get(e1).getStatus());
@@ -299,13 +303,13 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness {
ctmi = new ConditionalTabletsMutatorImpl(context);
ctmi.mutateTablet(e1).requireAbsentOperation().requireAbsentLocation()
- .putLocation(Location.future(ts2)).submit();
+ .putLocation(Location.future(ts2)).submit(tm -> false);
ctmi.mutateTablet(e2).requireAbsentOperation().requireAbsentLocation()
- .putLocation(Location.future(ts1)).submit();
+ .putLocation(Location.future(ts1)).submit(tm -> false);
ctmi.mutateTablet(e3).requireAbsentOperation().requireAbsentLocation()
- .putLocation(Location.future(ts1)).submit();
+ .putLocation(Location.future(ts1)).submit(tm -> false);
ctmi.mutateTablet(e4).requireAbsentOperation().requireAbsentLocation()
- .putLocation(Location.future(ts2)).submit();
+ .putLocation(Location.future(ts2)).submit(tm -> false);
results = ctmi.process();
assertEquals(Status.REJECTED, results.get(e1).getStatus());
@@ -347,9 +351,9 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness {
var opid2 = TabletOperationId.from("MERGING:FATE[5678]");
var ctmi = new ConditionalTabletsMutatorImpl(context);
- ctmi.mutateTablet(e1).requireAbsentOperation().putOperation(opid1).submit();
- ctmi.mutateTablet(e2).requireAbsentOperation().putOperation(opid2).submit();
- ctmi.mutateTablet(e3).requireOperation(opid1).deleteOperation().submit();
+ ctmi.mutateTablet(e1).requireAbsentOperation().putOperation(opid1).submit(tm -> false);
+ ctmi.mutateTablet(e2).requireAbsentOperation().putOperation(opid2).submit(tm -> false);
+ ctmi.mutateTablet(e3).requireOperation(opid1).deleteOperation().submit(tm -> false);
var results = ctmi.process();
assertEquals(Status.ACCEPTED, results.get(e1).getStatus());
@@ -361,11 +365,11 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness {
assertEquals(TabletOperationType.MERGING,
context.getAmple().readTablet(e2).getOperationId().getType());
assertEquals(opid2, context.getAmple().readTablet(e2).getOperationId());
- assertEquals(null, context.getAmple().readTablet(e3).getOperationId());
+ assertNull(context.getAmple().readTablet(e3).getOperationId());
ctmi = new ConditionalTabletsMutatorImpl(context);
- ctmi.mutateTablet(e1).requireOperation(opid2).deleteOperation().submit();
- ctmi.mutateTablet(e2).requireOperation(opid1).deleteOperation().submit();
+ ctmi.mutateTablet(e1).requireOperation(opid2).deleteOperation().submit(tm -> false);
+ ctmi.mutateTablet(e2).requireOperation(opid1).deleteOperation().submit(tm -> false);
results = ctmi.process();
assertEquals(Status.REJECTED, results.get(e1).getStatus());
@@ -376,14 +380,14 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness {
context.getAmple().readTablet(e2).getOperationId().getType());
ctmi = new ConditionalTabletsMutatorImpl(context);
- ctmi.mutateTablet(e1).requireOperation(opid1).deleteOperation().submit();
- ctmi.mutateTablet(e2).requireOperation(opid2).deleteOperation().submit();
+ ctmi.mutateTablet(e1).requireOperation(opid1).deleteOperation().submit(tm -> false);
+ ctmi.mutateTablet(e2).requireOperation(opid2).deleteOperation().submit(tm -> false);
results = ctmi.process();
assertEquals(Status.ACCEPTED, results.get(e1).getStatus());
assertEquals(Status.ACCEPTED, results.get(e2).getStatus());
- assertEquals(null, context.getAmple().readTablet(e1).getOperationId());
- assertEquals(null, context.getAmple().readTablet(e2).getOperationId());
+ assertNull(context.getAmple().readTablet(e1).getOperationId());
+ assertNull(context.getAmple().readTablet(e2).getOperationId());
}
}
@@ -399,21 +403,23 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness {
var ctmi = new ConditionalTabletsMutatorImpl(context);
ctmi.mutateTablet(RootTable.EXTENT).requireAbsentOperation().requireAbsentLocation()
- .putCompactionId(7).submit();
+ .putCompactionId(7).submit(tm -> false);
var results = ctmi.process();
assertEquals(Status.REJECTED, results.get(RootTable.EXTENT).getStatus());
assertFalse(context.getAmple().readTablet(RootTable.EXTENT).getCompactId().isPresent());
ctmi = new ConditionalTabletsMutatorImpl(context);
ctmi.mutateTablet(RootTable.EXTENT).requireAbsentOperation()
- .requireLocation(Location.future(loc.getServerInstance())).putCompactionId(7).submit();
+ .requireLocation(Location.future(loc.getServerInstance())).putCompactionId(7)
+ .submit(tm -> false);
results = ctmi.process();
assertEquals(Status.REJECTED, results.get(RootTable.EXTENT).getStatus());
assertFalse(context.getAmple().readTablet(RootTable.EXTENT).getCompactId().isPresent());
ctmi = new ConditionalTabletsMutatorImpl(context);
ctmi.mutateTablet(RootTable.EXTENT).requireAbsentOperation()
- .requireLocation(Location.current(loc.getServerInstance())).putCompactionId(7).submit();
+ .requireLocation(Location.current(loc.getServerInstance())).putCompactionId(7)
+ .submit(tm -> false);
results = ctmi.process();
assertEquals(Status.ACCEPTED, results.get(RootTable.EXTENT).getStatus());
assertEquals(7L, context.getAmple().readTablet(RootTable.EXTENT).getCompactId().getAsLong());