You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by do...@apache.org on 2024/01/30 15:03:10 UTC
(accumulo) branch elasticity updated: Use conditional mutations for AccumuloStore. Add checks for status and putRepo (#4160)
This is an automated email from the ASF dual-hosted git repository.
domgarguilo pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push:
new 11949de209 Use conditional mutations for AccumuloStore. Add checks for status and putRepo (#4160)
11949de209 is described below
commit 11949de209a1eba129a488970c946b9dc7ef1e7d
Author: Dom G <do...@apache.org>
AuthorDate: Tue Jan 30 10:03:05 2024 -0500
Use conditional mutations for AccumuloStore. Add checks for status and putRepo (#4160)
* Use conditional mutations for AccumuloStore
* Add requireStatus method
* Create new Condition instead of re-use
* Add retry to create method and add test
---------
Co-authored-by: Keith Turner <kt...@apache.org>
---
.../accumulo/core/fate/accumulo/AccumuloStore.java | 39 ++++-
.../accumulo/core/fate/accumulo/FateMutator.java | 8 +
.../core/fate/accumulo/FateMutatorImpl.java | 75 +++++++--
.../core/fate/accumulo/StatusMappingIterator.java | 151 ++++++++++++++++++
.../test/fate/accumulo/AccumuloStoreIT.java | 94 +++++++++++
.../test/fate/accumulo/FateMutatorImplIT.java | 171 +++++++++++++++++++++
6 files changed, 523 insertions(+), 15 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java
index b5dc999d42..e4c36fd63a 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java
@@ -44,12 +44,17 @@ import org.apache.accumulo.core.metadata.AccumuloTable;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.ColumnFQ;
import org.apache.accumulo.core.util.FastFormat;
+import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
public class AccumuloStore<T> extends AbstractFateStore<T> {
+ private static final Logger log = LoggerFactory.getLogger(AccumuloStore.class);
+
private final ClientContext context;
private final String tableName;
@@ -73,11 +78,35 @@ public class AccumuloStore<T> extends AbstractFateStore<T> {
@Override
public long create() {
- long tid = RANDOM.get().nextLong() & 0x7fffffffffffffffL;
+ final int maxAttempts = 5;
+ long tid = 0L;
- newMutator(tid).putStatus(TStatus.NEW).putCreateTime(System.currentTimeMillis()).mutate();
+ for (int attempt = 0; attempt < maxAttempts; attempt++) {
+ if (attempt >= 1) {
+ log.debug("Failed to create new id: {}, trying again", tid);
+ UtilWaitThread.sleep(100);
+ }
+ tid = getTid();
+
+ var status = newMutator(tid).requireStatus().putStatus(TStatus.NEW)
+ .putCreateTime(System.currentTimeMillis()).tryMutate();
+
+ switch (status) {
+ case ACCEPTED:
+ return tid;
+ case UNKNOWN:
+ case REJECTED:
+ continue;
+ default:
+ throw new IllegalStateException("Unknown status " + status);
+ }
+ }
- return tid;
+ throw new IllegalStateException("Failed to create new id after " + maxAttempts + " attempts");
+ }
+
+ public long getTid() {
+ return RANDOM.get().nextLong() & 0x7fffffffffffffffL;
}
@Override
@@ -249,11 +278,9 @@ public class AccumuloStore<T> extends AbstractFateStore<T> {
public void setTransactionInfo(TxInfo txInfo, Serializable so) {
verifyReserved(true);
- FateMutator<T> fateMutator = newMutator(tid);
final byte[] serialized = serializeTxInfo(so);
- fateMutator.putTxInfo(txInfo, serialized);
- fateMutator.mutate();
+ newMutator(tid).putTxInfo(txInfo, serialized).mutate();
}
@Override
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/FateMutator.java b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/FateMutator.java
index 4caf5985bd..22497006db 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/FateMutator.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/FateMutator.java
@@ -46,4 +46,12 @@ public interface FateMutator<T> {
void mutate();
+ // This exists to represent the subset of statuses from ConditionalWriter.Status that are expected
+ // and need to be handled.
+ enum Status {
+ ACCEPTED, REJECTED, UNKNOWN
+ }
+
+ Status tryMutate();
+
}
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/FateMutatorImpl.java b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/FateMutatorImpl.java
index 90d22008d5..7056438d21 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/FateMutatorImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/FateMutatorImpl.java
@@ -24,11 +24,16 @@ import static org.apache.accumulo.core.fate.accumulo.AccumuloStore.invertRepo;
import java.util.Objects;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.ConditionalWriter;
+import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.clientImpl.ClientContext;
-import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Condition;
+import org.apache.accumulo.core.data.ConditionalMutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.fate.Fate.TxInfo;
import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
@@ -45,13 +50,13 @@ public class FateMutatorImpl<T> implements FateMutator<T> {
private final ClientContext context;
private final String tableName;
private final long tid;
- private final Mutation mutation;
+ private final ConditionalMutation mutation;
- FateMutatorImpl(ClientContext context, String tableName, long tid) {
+ public FateMutatorImpl(ClientContext context, String tableName, long tid) {
this.context = Objects.requireNonNull(context);
this.tableName = Objects.requireNonNull(tableName);
this.tid = tid;
- this.mutation = new Mutation(new Text("tx_" + FastFormat.toHexString(tid)));
+ this.mutation = new ConditionalMutation(new Text("tx_" + FastFormat.toHexString(tid)));
}
@Override
@@ -122,7 +127,10 @@ public class FateMutatorImpl<T> implements FateMutator<T> {
@Override
public FateMutator<T> putRepo(int position, Repo<T> repo) {
- mutation.put(RepoColumnFamily.NAME, invertRepo(position), new Value(serialize(repo)));
+ final Text cq = invertRepo(position);
+ // ensure this repo is not already set
+ mutation.addCondition(new Condition(RepoColumnFamily.NAME, cq));
+ mutation.put(RepoColumnFamily.NAME, cq, new Value(serialize(repo)));
return this;
}
@@ -143,12 +151,61 @@ public class FateMutatorImpl<T> implements FateMutator<T> {
return this;
}
+ /**
+ * Require that the transaction status is one of the given statuses. If no statuses are provided,
+ * require that the status column is absent.
+ *
+ * @param statuses The statuses to check against.
+ */
+ public FateMutator<T> requireStatus(TStatus... statuses) {
+ Condition condition = StatusMappingIterator.createCondition(statuses);
+ mutation.addCondition(condition);
+ return this;
+ }
+
@Override
public void mutate() {
- try (BatchWriter writer = context.createBatchWriter(tableName)) {
- writer.addMutation(mutation);
- } catch (Exception e) {
- throw new IllegalStateException(e);
+ var status = tryMutate();
+ if (status != Status.ACCEPTED) {
+ throw new IllegalStateException("Failed to write mutation " + status + " " + mutation);
+ }
+ }
+
+ @Override
+ public Status tryMutate() {
+ try {
+ // if there are no conditions attached, then we can use a batch writer
+ if (mutation.getConditions().isEmpty()) {
+ try (BatchWriter writer = context.createBatchWriter(tableName)) {
+ writer.addMutation(mutation);
+ } catch (MutationsRejectedException e) {
+ throw new RuntimeException(e);
+ }
+
+ return Status.ACCEPTED;
+ } else {
+ try (ConditionalWriter writer = context.createConditionalWriter(tableName)) {
+ ConditionalWriter.Result result = writer.write(mutation);
+
+ switch (result.getStatus()) {
+ case ACCEPTED:
+ return Status.ACCEPTED;
+ case REJECTED:
+ return Status.REJECTED;
+ case UNKNOWN:
+ return Status.UNKNOWN;
+ default:
+ // do not expect other statuses
+ throw new IllegalStateException(
+ "Unhandled status for mutation " + result.getStatus());
+ }
+
+ } catch (AccumuloException | AccumuloSecurityException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ } catch (TableNotFoundException e) {
+ throw new RuntimeException(e);
}
}
}
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/StatusMappingIterator.java b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/StatusMappingIterator.java
new file mode 100644
index 0000000000..d7dc4fa22c
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/StatusMappingIterator.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.fate.accumulo;
+
+import static org.apache.accumulo.core.fate.accumulo.schema.FateSchema.TxColumnFamily.STATUS_COLUMN;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Condition;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.fate.ReadOnlyFateStore;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+
+/**
+ * A specialized iterator that maps the value of the status column to "present" or "absent". This
+ * iterator allows for checking of the status column's value against a set of acceptable statuses
+ * within a conditional mutation.
+ */
+public class StatusMappingIterator implements SortedKeyValueIterator<Key,Value> {
+
+ private static final String PRESENT = "present";
+ private static final String ABSENT = "absent";
+ private static final String STATUS_SET_KEY = "statusSet";
+
+ private SortedKeyValueIterator<Key,Value> source;
+ private final Set<String> acceptableStatuses = new HashSet<>();
+ private Value mappedValue;
+
+ /**
+ * The set of acceptable must be provided as an option to the iterator using the
+ * {@link #STATUS_SET_KEY} key.
+ */
+ @Override
+ public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options,
+ IteratorEnvironment env) throws IOException {
+ this.source = source;
+ if (options.containsKey(STATUS_SET_KEY)) {
+ String[] statuses = decodeStatuses(options.get(STATUS_SET_KEY));
+ acceptableStatuses.addAll(Arrays.asList(statuses));
+ }
+ }
+
+ @Override
+ public boolean hasTop() {
+ return source.hasTop();
+ }
+
+ @Override
+ public void next() throws IOException {
+ source.next();
+ mapValue();
+ }
+
+ @Override
+ public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive)
+ throws IOException {
+ source.seek(range, columnFamilies, inclusive);
+ mapValue();
+ }
+
+ /**
+ * Maps the value of the status column to "present" or "absent" based on its presence within the
+ * set of statuses.
+ */
+ private void mapValue() {
+ if (source.hasTop()) {
+ String currentValue = source.getTopValue().toString();
+ mappedValue =
+ acceptableStatuses.contains(currentValue) ? new Value(PRESENT) : new Value(ABSENT);
+ } else {
+ mappedValue = null;
+ }
+ }
+
+ @Override
+ public Key getTopKey() {
+ return source.getTopKey();
+ }
+
+ @Override
+ public Value getTopValue() {
+ return Objects.requireNonNull(mappedValue);
+ }
+
+ @Override
+ public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Creates a condition that checks if the status column's value is one of the given acceptable
+ * statuses.
+ *
+ * @param statuses The acceptable statuses.
+ * @return A condition configured to use this iterator.
+ */
+ public static Condition createCondition(ReadOnlyFateStore.TStatus... statuses) {
+ Condition condition =
+ new Condition(STATUS_COLUMN.getColumnFamily(), STATUS_COLUMN.getColumnQualifier());
+
+ if (statuses.length == 0) {
+ // If no statuses are provided, require the status column to be absent. Return the condition
+ // with no value set so that the mutation will be rejected if the status column is present.
+ return condition;
+ } else {
+ IteratorSetting is = new IteratorSetting(100, StatusMappingIterator.class);
+ is.addOption(STATUS_SET_KEY, encodeStatuses(statuses));
+
+ // If the value of the status column is in the set, it will be mapped to "present", so set the
+ // value of the condition to "present".
+ return condition.setValue(PRESENT).setIterators(is);
+ }
+ }
+
+ private static String encodeStatuses(ReadOnlyFateStore.TStatus[] statuses) {
+ return Arrays.stream(statuses).map(Enum::name).collect(Collectors.joining(","));
+ }
+
+ private static String[] decodeStatuses(String statuses) {
+ return statuses.split(",");
+ }
+
+}
diff --git a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreIT.java b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreIT.java
new file mode 100644
index 0000000000..88c2ac4884
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreIT.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.fate.accumulo;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.fate.accumulo.AccumuloStore;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AccumuloStoreIT extends SharedMiniClusterBase {
+
+ private static final Logger log = LoggerFactory.getLogger(AccumuloStore.class);
+
+ @BeforeAll
+ public static void setup() throws Exception {
+ SharedMiniClusterBase.startMiniCluster();
+ }
+
+ @AfterAll
+ public static void teardown() {
+ SharedMiniClusterBase.stopMiniCluster();
+ }
+
+ private static class TestAccumuloStore extends AccumuloStore<String> {
+ private final Iterator<Long> tidIterator;
+
+ // use the list of txids to simulate collisions on txids
+ public TestAccumuloStore(ClientContext context, String tableName, List<Long> txids) {
+ super(context, tableName);
+ this.tidIterator = txids.iterator();
+ }
+
+ @Override
+ public long getTid() {
+ if (tidIterator.hasNext()) {
+ return tidIterator.next();
+ } else {
+ return -1L;
+ }
+ }
+ }
+
+ @Test
+ public void testCreateWithCollisionAndExceedRetryLimit() throws Exception {
+ String table = getUniqueNames(1)[0];
+ try (ClientContext client =
+ (ClientContext) Accumulo.newClient().from(getClientProps()).build()) {
+ client.tableOperations().create(table);
+
+ List<Long> txids = List.of(1L, 1L, 1L, 2L, 3L, 3L, 3L, 3L, 4L, 4L, 5L, 5L, 5L, 5L, 5L, 5L);
+ Set<Long> expectedTids = new TreeSet<>(txids);
+ TestAccumuloStore store = new TestAccumuloStore(client, table, txids);
+
+ // call create and expect we get the unique txids
+ for (Long expectedTid : expectedTids) {
+ long tid = store.create();
+ log.info("Created tid: " + tid);
+ assertEquals(expectedTid, tid, "Expected " + expectedTid + " but got " + tid);
+ }
+
+ // Calling create again on 5L should throw an exception since we've exceeded the max retries
+ assertThrows(IllegalStateException.class, store::create);
+ }
+ }
+}
diff --git a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/FateMutatorImplIT.java b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/FateMutatorImplIT.java
new file mode 100644
index 0000000000..27e6dd650b
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/FateMutatorImplIT.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.fate.accumulo;
+
+import static org.apache.accumulo.core.fate.accumulo.FateMutator.Status.ACCEPTED;
+import static org.apache.accumulo.core.fate.accumulo.FateMutator.Status.REJECTED;
+import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.time.Duration;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.client.admin.TabletHostingGoal;
+import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.fate.ReadOnlyFateStore;
+import org.apache.accumulo.core.fate.accumulo.FateMutatorImpl;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.test.fate.FateIT;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FateMutatorImplIT extends SharedMiniClusterBase {
+
+ Logger log = LoggerFactory.getLogger(FateMutatorImplIT.class);
+ final NewTableConfiguration ntc =
+ new NewTableConfiguration().withInitialHostingGoal(TabletHostingGoal.ALWAYS);
+
+ @BeforeAll
+ public static void setup() throws Exception {
+ SharedMiniClusterBase.startMiniCluster();
+ }
+
+ @AfterAll
+ public static void tearDown() {
+ SharedMiniClusterBase.stopMiniCluster();
+ }
+
+ @Override
+ protected Duration defaultTimeout() {
+ return Duration.ofMinutes(5);
+ }
+
+ @Test
+ public void putRepo() throws Exception {
+ final String table = getUniqueNames(1)[0];
+ try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
+ client.tableOperations().create(table, ntc);
+
+ ClientContext context = (ClientContext) client;
+
+ final long tid = RANDOM.get().nextLong() & 0x7fffffffffffffffL;
+
+ // add some repos in order
+ FateMutatorImpl<FateIT.TestEnv> fateMutator = new FateMutatorImpl<>(context, table, tid);
+ fateMutator.putRepo(100, new FateIT.TestRepo("test")).mutate();
+ FateMutatorImpl<FateIT.TestEnv> fateMutator1 = new FateMutatorImpl<>(context, table, tid);
+ fateMutator1.putRepo(99, new FateIT.TestRepo("test")).mutate();
+ FateMutatorImpl<FateIT.TestEnv> fateMutator2 = new FateMutatorImpl<>(context, table, tid);
+ fateMutator2.putRepo(98, new FateIT.TestRepo("test")).mutate();
+
+ // make sure we cant add a repo that has already been added
+ FateMutatorImpl<FateIT.TestEnv> fateMutator3 = new FateMutatorImpl<>(context, table, tid);
+ assertThrows(IllegalStateException.class,
+ () -> fateMutator3.putRepo(98, new FateIT.TestRepo("test")).mutate(),
+ "Repo in position 98 already exists. Expected to not be able to add it again.");
+ FateMutatorImpl<FateIT.TestEnv> fateMutator4 = new FateMutatorImpl<>(context, table, tid);
+ assertThrows(IllegalStateException.class,
+ () -> fateMutator4.putRepo(99, new FateIT.TestRepo("test")).mutate(),
+ "Repo in position 99 already exists. Expected to not be able to add it again.");
+ }
+ }
+
+ @Test
+ public void requireStatus() throws Exception {
+ final String table = getUniqueNames(1)[0];
+ try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
+ client.tableOperations().create(table, ntc);
+
+ ClientContext context = (ClientContext) client;
+
+ final long tid = RANDOM.get().nextLong() & 0x7fffffffffffffffL;
+
+ // use require status passing all statuses. without the status column present this should fail
+ assertThrows(IllegalStateException.class,
+ () -> new FateMutatorImpl<>(context, table, tid)
+ .requireStatus(ReadOnlyFateStore.TStatus.values())
+ .putStatus(ReadOnlyFateStore.TStatus.NEW).mutate());
+ assertEquals(0, client.createScanner(table).stream().count());
+ var status = new FateMutatorImpl<>(context, table, tid)
+ .requireStatus(ReadOnlyFateStore.TStatus.values())
+ .putStatus(ReadOnlyFateStore.TStatus.NEW).tryMutate();
+ assertEquals(REJECTED, status);
+ assertEquals(0, client.createScanner(table).stream().count());
+
+ // use require status without passing any statuses to require that the status column is absent
+ status = new FateMutatorImpl<>(context, table, tid).requireStatus()
+ .putStatus(ReadOnlyFateStore.TStatus.NEW).tryMutate();
+ assertEquals(ACCEPTED, status);
+
+ // try again with requiring an absent status column. this time it should fail because we just
+ // put status NEW
+ assertThrows(IllegalStateException.class,
+ () -> new FateMutatorImpl<>(context, table, tid).requireStatus()
+ .putStatus(ReadOnlyFateStore.TStatus.NEW).mutate(),
+ "Expected to not be able to use requireStatus() without passing any statuses");
+ status = new FateMutatorImpl<>(context, table, tid).requireStatus()
+ .putStatus(ReadOnlyFateStore.TStatus.NEW).tryMutate();
+ assertEquals(REJECTED, status,
+ "Expected to not be able to use requireStatus() without passing any statuses");
+
+ // now use require same with the current status, NEW passed in
+ status =
+ new FateMutatorImpl<>(context, table, tid).requireStatus(ReadOnlyFateStore.TStatus.NEW)
+ .putStatus(ReadOnlyFateStore.TStatus.SUBMITTED).tryMutate();
+ assertEquals(ACCEPTED, status);
+
+ // use require same with an array of statuses, none of which are the current status
+ // (SUBMITTED)
+ assertThrows(IllegalStateException.class,
+ () -> new FateMutatorImpl<>(context, table, tid)
+ .requireStatus(ReadOnlyFateStore.TStatus.NEW, ReadOnlyFateStore.TStatus.UNKNOWN)
+ .putStatus(ReadOnlyFateStore.TStatus.SUBMITTED).mutate(),
+ "Expected to not be able to use requireStatus() with statuses that do not match the current status");
+ status = new FateMutatorImpl<>(context, table, tid)
+ .requireStatus(ReadOnlyFateStore.TStatus.NEW, ReadOnlyFateStore.TStatus.UNKNOWN)
+ .putStatus(ReadOnlyFateStore.TStatus.SUBMITTED).tryMutate();
+ assertEquals(REJECTED, status,
+ "Expected to not be able to use requireStatus() with statuses that do not match the current status");
+
+ // use require same with an array of statuses, one of which is the current status (SUBMITTED)
+ status = new FateMutatorImpl<>(context, table, tid)
+ .requireStatus(ReadOnlyFateStore.TStatus.UNKNOWN, ReadOnlyFateStore.TStatus.SUBMITTED)
+ .putStatus(ReadOnlyFateStore.TStatus.IN_PROGRESS).tryMutate();
+ assertEquals(ACCEPTED, status);
+
+ // one more time check that we can use require same with the current status (IN_PROGRESS)
+ status = new FateMutatorImpl<>(context, table, tid)
+ .requireStatus(ReadOnlyFateStore.TStatus.IN_PROGRESS)
+ .putStatus(ReadOnlyFateStore.TStatus.FAILED_IN_PROGRESS).tryMutate();
+ assertEquals(ACCEPTED, status);
+
+ }
+
+ }
+
+ void logAllEntriesInTable(String tableName, AccumuloClient client) throws Exception {
+ client.createScanner(tableName).forEach(e -> log.info(e.getKey() + " " + e.getValue()));
+ }
+}