You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by "belliottsmith (via GitHub)" <gi...@apache.org> on 2023/03/01 16:04:43 UTC

[GitHub] [cassandra-accord] belliottsmith commented on a diff in pull request #33: Immutable state

belliottsmith commented on code in PR #33:
URL: https://github.com/apache/cassandra-accord/pull/33#discussion_r1121703739


##########
accord-core/src/main/java/accord/utils/ThreadPoolScheduler.java:
##########
@@ -96,7 +95,7 @@ public void stop()
         }
         catch (InterruptedException e)
         {
-            throw new UncheckedInterruptedException(e);
+            throw new RuntimeException(e);

Review Comment:
   Still need to figure out a solution to this, as it will interplay with simulation in C*



##########
accord-core/src/main/java/accord/primitives/Txn.java:
##########
@@ -150,7 +149,13 @@ && read().equals(txn.read())
         @Override
         public int hashCode()
         {
-            throw new UnsupportedOperationException();

Review Comment:
   Where do we need this implementation?



##########
accord-core/src/main/java/accord/utils/DeterministicIdentitySet.java:
##########
@@ -117,6 +121,12 @@ public boolean add(T item)
         entry.next.prev = entry;
         return true;
     }
+    // we add to the front, and iterate in reverse order, so that we can add and remove while iterating without modifying the set we iterate over
+    @Override
+    public boolean add(T item)

Review Comment:
   defunct to split these?



##########
accord-core/src/main/java/accord/coordinate/Coordinate.java:
##########
@@ -32,21 +32,21 @@
 import accord.messages.PreAccept;
 import accord.messages.PreAccept.PreAcceptOk;
 import accord.messages.PreAccept.PreAcceptReply;
-import com.google.common.base.Preconditions;
-
-import org.apache.cassandra.utils.concurrent.AsyncFuture;
-import org.apache.cassandra.utils.concurrent.Future;
+import accord.utils.Invariants;
 
 import static accord.coordinate.Propose.Invalidate.proposeInvalidate;
 import static accord.messages.Commit.Invalidate.commitInvalidate;
 
+import accord.utils.async.AsyncResult;
+import accord.utils.async.AsyncResults;
+
 /**
  * Perform initial rounds of PreAccept and Accept until we have reached agreement about when we should execute.
  * If we are preempted by a recovery coordinator, we abort and let them complete (and notify us about the execution result)
  *
  * TODO (desired, testing): dedicated burn test to validate outcomes
  */
-public class Coordinate extends AsyncFuture<Result> implements Callback<PreAcceptReply>, BiConsumer<Result, Throwable>
+public class Coordinate extends AsyncResults.Settable<Result> implements Callback<PreAcceptReply>, BiConsumer<Result, Throwable>

Review Comment:
   nit: Maybe call it a `SettableResult` so we can import the inner class and make the use sites a bit cleaner?



##########
accord-core/src/main/java/accord/messages/Apply.java:
##########
@@ -120,7 +123,7 @@ public Iterable<TxnId> txnIds()
     @Override
     public Seekables<?, ?> keys()
     {
-        return Keys.EMPTY;
+        return keys;

Review Comment:
   Do we need `keys` here? I don't think we need them loaded until we perform the _apply_, and we should have access to the local txn `keys` when scheduling that?



##########
accord-core/src/main/java/accord/local/ContextValue.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.local;
+
+public class ContextValue<Value>

Review Comment:
   delete?



##########
accord-core/src/main/java/accord/messages/ReadData.java:
##########
@@ -130,18 +134,19 @@ public synchronized void onChange(SafeCommandStore safeStore, Command command)
             case ReadyToExecute:
         }
 
-        command.removeListener(this);
+        command = liveCommand.removeListener(this);
+
         if (!isObsolete)
-            read(safeStore, command);
+            read(safeStore, command.asCommitted());
     }
 
     @Override
     public synchronized ReadNack apply(SafeCommandStore safeStore)
     {
-        Command command = safeStore.command(txnId);
-        Status status = command.status();
+        SafeCommand liveCommand = safeStore.command(txnId);

Review Comment:
   liveCommand -> safeCommand?



##########
accord-core/src/main/java/accord/local/CommonAttributes.java:
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.local;
+
+import accord.api.RoutingKey;
+import accord.api.VisibleForImplementation;
+import accord.primitives.PartialDeps;
+import accord.primitives.PartialTxn;
+import accord.primitives.Route;
+import accord.primitives.TxnId;
+import accord.utils.Invariants;
+import com.google.common.collect.ImmutableSet;
+
+import java.util.Set;
+
+import static accord.utils.Utils.ensureImmutable;
+import static accord.utils.Utils.ensureMutable;
+
+// TODO: remove or cleanup

Review Comment:
   perhaps make this a longer term `TODO (desired)` or something? I think it's clean enough for now, only being used sparingly.



##########
accord-core/src/main/java/accord/messages/ReadData.java:
##########
@@ -61,6 +61,7 @@ public static ReadData create(TxnId txnId, Seekables<?, ?> scope, long executeAt
     private transient boolean isObsolete; // TODO (low priority, semantics): respond with the Executed result we have stored?
     private transient BitSet waitingOn;
     private transient int waitingOnCount;
+    private transient BitSet submitted;

Review Comment:
   is this for debug purposes?



##########
accord-core/src/main/java/accord/messages/PreAccept.java:
##########
@@ -100,12 +100,12 @@ public PreAcceptReply apply(SafeCommandStore safeStore)
                     calculatePartialDeps(safeStore, txnId, partialTxn.keys(), txnId, safeStore.ranges().between(minUnsyncedEpoch, txnId.epoch())));
         }
 
-        Command command = safeStore.command(txnId);
-        switch (command.preaccept(safeStore, partialTxn, route != null ? route : scope, progressKey))

Review Comment:
   Not a strongly held suggestion, just wondering: might it be cleaner in some cases to pass the `safeCommand` in here so we have it for use after? Or even perhaps to invoke the preaccept method on `safeCommand` itself?



##########
accord-core/src/main/java/accord/local/Command.java:
##########
@@ -18,1178 +18,1435 @@
 
 package accord.local;
 
-import accord.api.*;
-import accord.local.Status.Durability;
-import accord.local.Status.Known;
+import accord.api.Data;
+import accord.api.Result;
+import accord.api.RoutingKey;
+import accord.impl.CommandsForKey;
+import accord.impl.CommandsForKeys;
 import accord.primitives.*;
-import accord.primitives.Writes;
 import accord.utils.Invariants;
-import org.apache.cassandra.utils.concurrent.Future;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.function.BiConsumer;
-import java.util.function.Consumer;
-import java.util.function.Function;
-
-import static accord.local.Status.*;
-import static accord.local.Status.Known.*;
-import static accord.local.Status.Known.Done;
-import static accord.local.Status.Known.ExecuteAtOnly;
-import static accord.primitives.Route.isFullRoute;
-import static accord.utils.Utils.listOf;
-
-import javax.annotation.Nonnull;
+import accord.utils.Utils;
+import accord.utils.async.AsyncChain;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableSortedMap;
+import com.google.common.collect.ImmutableSortedSet;
+
 import javax.annotation.Nullable;
+import java.util.*;
 
-import accord.api.ProgressLog.ProgressShard;
-import accord.primitives.Ranges;
-import accord.primitives.Ballot;
-import accord.primitives.PartialDeps;
-import accord.primitives.PartialTxn;
-import accord.primitives.Route;
-import accord.primitives.Timestamp;
-import accord.primitives.TxnId;
-import accord.api.Result;
-import accord.api.RoutingKey;
+import static accord.local.Status.Durability.Local;
+import static accord.local.Status.Durability.NotDurable;
+import static accord.local.Status.Known.DefinitionOnly;
+import static accord.utils.Utils.*;
+import static java.lang.String.format;
 
-import static accord.api.ProgressLog.ProgressShard.Home;
-import static accord.api.ProgressLog.ProgressShard.Local;
-import static accord.api.ProgressLog.ProgressShard.No;
-import static accord.api.ProgressLog.ProgressShard.Unsure;
-import static accord.local.Command.EnsureAction.Add;
-import static accord.local.Command.EnsureAction.Check;
-import static accord.local.Command.EnsureAction.Ignore;
-import static accord.local.Command.EnsureAction.Set;
-import static accord.local.Command.EnsureAction.TrySet;
-
-public abstract class Command implements CommandListener, BiConsumer<SafeCommandStore, CommandListener>, PreLoadContext
+public abstract class Command extends ImmutableState
 {
-    private static final Logger logger = LoggerFactory.getLogger(Command.class);
+    // sentinel value to indicate a command requested in a preexecute context was not found
+    // should not escape the safe command store
+    public static final Command EMPTY = new Command()
+    {
+        @Override public Route<?> route() { throw new IllegalStateException("Attempting to access EMPTY sentinel values"); }
+        @Override public RoutingKey progressKey() { throw new IllegalStateException("Attempting to access EMPTY sentinel values"); }
+        @Override public RoutingKey homeKey() { throw new IllegalStateException("Attempting to access EMPTY sentinel values"); }
+        @Override public TxnId txnId() { throw new IllegalStateException("Attempting to access EMPTY sentinel values"); }
+        @Override public Ballot promised() { throw new IllegalStateException("Attempting to access EMPTY sentinel values"); }
+        @Override public Status.Durability durability() { throw new IllegalStateException("Attempting to access EMPTY sentinel values"); }
+        @Override public ImmutableSet<CommandListener> listeners() { throw new IllegalStateException("Attempting to access EMPTY sentinel values"); }
+        @Override public SaveStatus saveStatus() { throw new IllegalStateException("Attempting to access EMPTY sentinel values"); }
+        @Override public Timestamp executeAt() { throw new IllegalStateException("Attempting to access EMPTY sentinel values"); }
+        @Override public Ballot accepted() { throw new IllegalStateException("Attempting to access EMPTY sentinel values"); }
+        @Override public PartialTxn partialTxn() { throw new IllegalStateException("Attempting to access EMPTY sentinel values"); }
+        @Nullable
+        @Override public PartialDeps partialDeps() { throw new IllegalStateException("Attempting to access EMPTY sentinel values"); }
 
-    public abstract TxnId txnId();
+        @Override
+        public String toString()
+        {
+            return "Command(EMPTY)";
+        }
+    };
 
-    // TODO (desirable, API consistency): should any of these calls be replaced by corresponding known() registers?
-    public boolean hasBeen(Status status)
+    static
     {
-        return status().hasBeen(status);
+        EMPTY.markInvalidated();
     }
 
-    public boolean has(Known known)
+    static PreLoadContext contextForCommand(Command command)
     {
-        return known.isSatisfiedBy(saveStatus().known);
+        Invariants.checkState(command.hasBeen(Status.PreAccepted) && command.partialTxn() != null);
+        return command instanceof PreLoadContext ? (PreLoadContext) command : PreLoadContext.contextFor(command.txnId(), command.partialTxn().keys());
     }
 
-    public boolean has(Definition definition)
+    private static Status.Durability durability(Status.Durability durability, SaveStatus status)
     {
-        return known().definition.compareTo(definition) >= 0;
+        if (status.compareTo(SaveStatus.PreApplied) >= 0 && durability == NotDurable)
+            return Local; // not necessary anywhere, but helps for logical consistency
+        return durability;
     }
 
-    public boolean has(Outcome outcome)
+    public interface CommonAttributes
     {
-        return known().outcome.compareTo(outcome) >= 0;
+        TxnId txnId();
+        Status.Durability durability();
+        RoutingKey homeKey();
+        RoutingKey progressKey();
+        Route<?> route();
+        PartialTxn partialTxn();
+        PartialDeps partialDeps();
+        ImmutableSet<CommandListener> listeners();
     }
 
-    public boolean is(Status status)
+    public static class SerializerSupport
     {
-        return status() == status;
-    }
-
-    /**
-     * homeKey is a global value that defines the home shard - the one tasked with ensuring the transaction is finished.
-     * progressKey is a local value that defines the local shard responsible for ensuring progress on the transaction.
-     * This will be homeKey if it is owned by the node, and some other key otherwise. If not the home shard, the progress
-     * shard has much weaker responsibilities, only ensuring that the home shard has durably witnessed the txnId.
-     *
-     * TODO (expected, efficiency): we probably do not want to save this on its own, as we probably want to
-     *  minimize IO interactions and discrete registers, so will likely reference commit log entries directly
-     *  At which point we may impose a requirement that only a Route can be saved, not a homeKey on its own.
-     *  Once this restriction is imposed, we no longer need to pass around Routable.Domain with TxnId.
-     */
-    public abstract RoutingKey homeKey();
-    protected abstract void setHomeKey(RoutingKey key);
-
-    public abstract RoutingKey progressKey();
-    protected abstract void setProgressKey(RoutingKey key);
-
-    /**
-     * If this is the home shard, we require that this is a Route for all states &gt; NotWitnessed;
-     * otherwise for the local progress shard this is ordinarily a PartialRoute, and for other shards this is not set,
-     * so that there is only one copy per node that can be consulted to construct the full set of involved keys.
-     *
-     * If hasBeen(Committed) this must contain the keys for both txnId.epoch and executeAt.epoch
-     */
-    public abstract @Nullable Route<?> route();
-    protected abstract void setRoute(Route<?> route);
-
-    public abstract PartialTxn partialTxn();
-    protected abstract void setPartialTxn(PartialTxn txn);
-
-    public abstract Ballot promised();
-    protected abstract void setPromised(Ballot ballot);
-
-    public abstract Ballot accepted();
-    protected abstract void setAccepted(Ballot ballot);
-
-    public abstract Timestamp executeAt();
-    protected abstract void setExecuteAt(Timestamp timestamp);
-
-    /**
-     * While !hasBeen(Committed), used only as a register for Accept state, used by Recovery
-     * If hasBeen(Committed), represents the full deps owned by this range for execution at both txnId.epoch
-     * AND executeAt.epoch so that it may be used for Recovery (which contacts only txnId.epoch topology),
-     * but also for execution.
-     */
-    public abstract PartialDeps partialDeps();
-    protected abstract void setPartialDeps(PartialDeps deps);
-
-    public abstract Writes writes();
-    protected abstract void setWrites(Writes writes);
-
-    public abstract Result result();
-    protected abstract void setResult(Result result);
-
-    public abstract SaveStatus saveStatus();
-    protected abstract void setSaveStatus(SaveStatus status);
-
-    public Status status() { return saveStatus().status; }
-    protected void setStatus(Status status) { setSaveStatus(SaveStatus.get(status, known())); }
-
-    public Known known() { return saveStatus().known; }
-
-    public abstract Durability durability();
-    public abstract void setDurability(Durability v);
+        public static Command.Listener listener(TxnId txnId)
+        {
+            return new Command.Listener(txnId);
+        }
 
-    public abstract Command addListener(CommandListener listener);
-    public abstract void removeListener(CommandListener listener);
-    protected abstract void notifyListeners(SafeCommandStore safeStore);
+        public static NotWitnessed notWitnessed(CommonAttributes attributes, Ballot promised)
+        {
+            return NotWitnessed.Factory.create(attributes, promised);
+        }
 
-    protected abstract void addWaitingOnCommit(TxnId txnId);
-    protected abstract void removeWaitingOnCommit(TxnId txnId);
-    protected abstract TxnId firstWaitingOnCommit();
+        public static Preaccepted preaccepted(CommonAttributes common, Timestamp executeAt, Ballot promised)
+        {
+            return Preaccepted.Factory.create(common, executeAt, promised);
+        }
 
-    protected abstract void addWaitingOnApplyIfAbsent(TxnId txnId, Timestamp executeAt);
-    protected abstract TxnId firstWaitingOnApply(@Nullable TxnId ifExecutesBefore);
+        public static Accepted accepted(CommonAttributes common, SaveStatus status, Timestamp executeAt, Ballot promised, Ballot accepted)
+        {
+            return Accepted.Factory.create(common, status, executeAt, promised, accepted);
+        }
 
-    protected abstract void removeWaitingOn(TxnId txnId, Timestamp executeAt);
-    protected abstract boolean isWaitingOnDependency();
+        public static Committed committed(CommonAttributes common, SaveStatus status, Timestamp executeAt, Ballot promised, Ballot accepted, ImmutableSortedSet<TxnId> waitingOnCommit, ImmutableSortedMap<Timestamp, TxnId> waitingOnApply)
+        {
+            return Committed.Factory.create(common, status, executeAt, promised, accepted, waitingOnCommit, waitingOnApply);
+        }
 
-    public boolean hasBeenWitnessed()
-    {
-        return partialTxn() != null;
+        public static Executed executed(CommonAttributes common, SaveStatus status, Timestamp executeAt, Ballot promised, Ballot accepted, ImmutableSortedSet<TxnId> waitingOnCommit, ImmutableSortedMap<Timestamp, TxnId> waitingOnApply, Writes writes, Result result)
+        {
+            return Executed.Factory.create(common, status, executeAt, promised, accepted, waitingOnCommit, waitingOnApply, writes, result);
+        }
     }
 
-    @Override
-    public Iterable<TxnId> txnIds()
+    private static SaveStatus validateCommandClass(SaveStatus status, Class<?> expected, Class<?> actual)
     {
-        return Collections.singleton(txnId());
+        if (actual != expected)
+        {
+            throw new IllegalStateException(format("Cannot instantiate %s for status %s. %s expected",
+                                                   actual.getSimpleName(), status, expected.getSimpleName()));
+        }
+        return status;
     }
 
-    @Override
-    public Seekables<?, ?> keys()
+    private static SaveStatus validateCommandClass(SaveStatus status, Class<?> klass)
     {
-        // TODO (expected, consider): when do we need this, and will it always be sufficient?
-        return partialTxn().keys();
+        switch (status)
+        {
+            case NotWitnessed:
+                return validateCommandClass(status, NotWitnessed.class, klass);
+            case PreAccepted:
+                return validateCommandClass(status, Preaccepted.class, klass);
+            case AcceptedInvalidate:
+            case AcceptedInvalidateWithDefinition:
+            case Accepted:
+            case AcceptedWithDefinition:
+                return validateCommandClass(status, Accepted.class, klass);
+            case Committed:
+            case ReadyToExecute:
+                return validateCommandClass(status, Committed.class, klass);
+            case PreApplied:
+            case Applied:
+            case Invalidated:
+                return validateCommandClass(status, Executed.class, klass);
+            default:
+                throw new IllegalStateException("Unhandled status " + status);
+        }
     }
 
-    public void setDurability(SafeCommandStore safeStore, Durability durability, RoutingKey homeKey, @Nullable Timestamp executeAt)
+    public static Command addListener(SafeCommandStore safeStore, Command command, CommandListener listener)
     {
-        updateHomeKey(safeStore, homeKey);
-        if (executeAt != null && hasBeen(PreCommitted) && !this.executeAt().equals(executeAt))
-            safeStore.agent().onInconsistentTimestamp(this, this.executeAt(), executeAt);
-        setDurability(durability);
+        return safeStore.beginUpdate(command).addListener(listener).updateAttributes();
     }
 
-    public enum AcceptOutcome
+    public static Command removeListener(SafeCommandStore safeStore, Command command, CommandListener listener)
     {
-        Success, Redundant, RejectedBallot
+        return safeStore.beginUpdate(command).removeListener(listener).updateAttributes();
     }
 
-    public AcceptOutcome preaccept(SafeCommandStore safeStore, PartialTxn partialTxn, Route<?> route, @Nullable RoutingKey progressKey)
+    public static Committed updateWaitingOn(SafeCommandStore safeStore, Committed command, WaitingOn.Update waitingOn)
     {
-        return preacceptOrRecover(safeStore, partialTxn, route, progressKey, Ballot.ZERO);
+        if (!waitingOn.hasChanges())
+            return command;
+
+        Update update = safeStore.beginUpdate(command);
+        Committed updated =  command instanceof Executed ?
+                Executed.Factory.update(command.asExecuted(), update, waitingOn.build()) :
+                Committed.Factory.update(command, update, waitingOn.build());
+        return update.complete(updated);
     }
 
-    public AcceptOutcome recover(SafeCommandStore safeStore, PartialTxn partialTxn, Route<?> route, @Nullable RoutingKey progressKey, Ballot ballot)
+    public static class Listener implements CommandListener
     {
-        return preacceptOrRecover(safeStore, partialTxn, route, progressKey, ballot);
-    }
+        protected final TxnId listenerId;
 
-    private AcceptOutcome preacceptOrRecover(SafeCommandStore safeStore, PartialTxn partialTxn, Route<?> route, @Nullable RoutingKey progressKey, Ballot ballot)
-    {
-        int compareBallots = promised().compareTo(ballot);
-        if (compareBallots > 0)
+        private Listener(TxnId listenerId)
         {
-            logger.trace("{}: skipping preaccept - higher ballot witnessed ({})", txnId(), promised());
-            return AcceptOutcome.RejectedBallot;
+            this.listenerId = listenerId;
         }
-        else if (compareBallots < 0)
+
+        @Override
+        public boolean equals(Object o)
         {
-            // save the new ballot as a promise
-            setPromised(ballot);
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            Listener that = (Listener) o;
+            return listenerId.equals(that.listenerId);
         }
 
-        if (known().definition.isKnown())
+        @Override
+        public int hashCode()
         {
-            Invariants.checkState(status() == Invalidated || executeAt() != null);
-            logger.trace("{}: skipping preaccept - already known ({})", txnId(), status());
-            // in case of Ballot.ZERO, we must either have a competing recovery coordinator or have late delivery of the
-            // preaccept; in the former case we should abandon coordination, and in the latter we have already completed
-            return ballot.equals(Ballot.ZERO) ? AcceptOutcome.Redundant : AcceptOutcome.Success;
+            return Objects.hash(listenerId);
         }
 
-        Ranges coordinateRanges = coordinateRanges(safeStore);
-        Invariants.checkState(!coordinateRanges.isEmpty());
-        ProgressShard shard = progressShard(safeStore, route, progressKey, coordinateRanges);
-        if (!validate(Ranges.EMPTY, coordinateRanges, shard, route, Set, partialTxn, Set, null, Ignore))
-            throw new IllegalStateException();
-
-        if (executeAt() == null)
+        @Override
+        public String toString()
         {
-            TxnId txnId = txnId();
-            // unlike in the Accord paper, we partition shards within a node, so that to ensure a total order we must either:
-            //  - use a global logical clock to issue new timestamps; or
-            //  - assign each shard _and_ process a unique id, and use both as components of the timestamp
-            // if we are performing recovery (i.e. non-zero ballot), do not permit a fast path decision as we want to
-            // invalidate any transactions that were not completed by their initial coordinator
-            if (ballot.equals(Ballot.ZERO)) setExecuteAt(safeStore.preaccept(txnId, partialTxn.keys()));
-            else setExecuteAt(safeStore.time().uniqueNow(txnId));
+            return "ListenerProxy{" + listenerId + '}';
+        }
 
-            if (status() == NotWitnessed)
-                setStatus(PreAccepted);
-            safeStore.progressLog().preaccepted(this, shard);
+        public TxnId txnId()
+        {
+            return listenerId;
         }
-        else
+
+        @Override
+        public void onChange(SafeCommandStore safeStore, TxnId txnId)
         {
-            // TODO (expected, ?): in the case that we are pre-committed but had not been preaccepted/accepted, should we inform progressLog?
-            setSaveStatus(SaveStatus.enrich(saveStatus(), DefinitionOnly));
+            Commands.listenerUpdate(safeStore, safeStore.command(listenerId), safeStore.command(txnId));
         }
-        set(safeStore, Ranges.EMPTY, coordinateRanges, shard, route, partialTxn, Set, null, Ignore);
 
-        notifyListeners(safeStore);
-        return AcceptOutcome.Success;
+        @Override
+        public PreLoadContext listenerPreLoadContext(TxnId caller)
+        {
+            return PreLoadContext.contextFor(Utils.listOf(listenerId, caller), Keys.EMPTY);
+        }
     }
 
-    public boolean preacceptInvalidate(Ballot ballot)
+    public static CommandListener listener(TxnId txnId)
     {
-        if (promised().compareTo(ballot) > 0)
-        {
-            logger.trace("{}: skipping preacceptInvalidate - witnessed higher ballot ({})", txnId(), promised());
-            return false;
-        }
-        setPromised(ballot);
-        return true;
+        return new Listener(txnId);
     }
 
-    public AcceptOutcome accept(SafeCommandStore safeStore, Ballot ballot, PartialRoute<?> route, Seekables<?, ?> keys, @Nullable RoutingKey progressKey, Timestamp executeAt, PartialDeps partialDeps)
+    private abstract static class AbstractCommand extends Command
     {
-        if (this.promised().compareTo(ballot) > 0)
+        private final TxnId txnId;
+        private final SaveStatus status;
+        private final Status.Durability durability;
+        private final RoutingKey homeKey;

Review Comment:
   Thoughts? I think this is still valid, and I think it might be preferable to split NotWitnessed from AbstractCommand



##########
accord-core/src/main/java/accord/local/Command.java:
##########
@@ -109,1087 +313,758 @@ public boolean is(Status status)
      *  Once this restriction is imposed, we no longer need to pass around Routable.Domain with TxnId.
      */
     public abstract RoutingKey homeKey();
-    protected abstract void setHomeKey(RoutingKey key);
+    public abstract TxnId txnId();
+    public abstract Ballot promised();
+    public abstract Status.Durability durability();
+    public abstract Listeners.Immutable listeners();
+    public abstract SaveStatus saveStatus();
 
-    public abstract RoutingKey progressKey();
-    protected abstract void setProgressKey(RoutingKey key);
+    static boolean isSameClass(Command command, Class<? extends Command> klass)
+    {
+        return command.getClass() == klass;
+    }
 
-    /**
-     * If this is the home shard, we require that this is a Route for all states &gt; NotWitnessed;
-     * otherwise for the local progress shard this is ordinarily a PartialRoute, and for other shards this is not set,
-     * so that there is only one copy per node that can be consulted to construct the full set of involved keys.
-     *
-     * If hasBeen(Committed) this must contain the keys for both txnId.epoch and executeAt.epoch
-     */
-    public abstract @Nullable Route<?> route();
-    protected abstract void setRoute(Route<?> route);
+    private static void checkNewBallot(Ballot current, Ballot next, String name)
+    {
+        if (next.compareTo(current) < 0)
+            throw new IllegalArgumentException(String.format("Cannot update %s ballot from %s to %s. New ballot is less than current", name, current, next));
+    }
 
-    public abstract PartialTxn partialTxn();
-    protected abstract void setPartialTxn(PartialTxn txn);
+    private static void checkPromised(Command command, Ballot ballot)
+    {
+        checkNewBallot(command.promised(), ballot, "promised");
+    }
 
-    public abstract Ballot promised();
-    protected abstract void setPromised(Ballot ballot);
+    private static void checkAccepted(Command command, Ballot ballot)
+    {
+        checkNewBallot(command.accepted(), ballot, "accepted");
+    }
 
-    public abstract Ballot accepted();
-    protected abstract void setAccepted(Ballot ballot);
+    private static void checkSameClass(Command command, Class<? extends Command> klass, String errorMsg)
+    {
+        if (!isSameClass(command, klass))
+            throw new IllegalArgumentException(errorMsg + format(" expected %s got %s", klass.getSimpleName(), command.getClass().getSimpleName()));
+    }
 
-    public abstract Timestamp executeAt();
-    protected abstract void setExecuteAt(Timestamp timestamp);
+    // TODO (low priority, progress): callers should try to consult the local progress shard (if any) to obtain the full set of keys owned locally
+    public final Route<?> someRoute()
+    {
+        if (route() != null)
+            return route();
 
-    /**
-     * While !hasBeen(Committed), used only as a register for Accept state, used by Recovery
-     * If hasBeen(Committed), represents the full deps owned by this range for execution at both txnId.epoch
-     * AND executeAt.epoch so that it may be used for Recovery (which contacts only txnId.epoch topology),
-     * but also for execution.
-     */
-    public abstract PartialDeps partialDeps();
-    protected abstract void setPartialDeps(PartialDeps deps);
+        if (homeKey() != null)
+            return PartialRoute.empty(txnId().domain(), homeKey());
 
-    public abstract Writes writes();
-    protected abstract void setWrites(Writes writes);
+        return null;
+    }
 
-    public abstract Result result();
-    protected abstract void setResult(Result result);
+    public Unseekables<?, ?> maxUnseekables()
+    {
+        Route<?> route = someRoute();
+        if (route == null)
+            return null;
 
-    public abstract SaveStatus saveStatus();
-    protected abstract void setSaveStatus(SaveStatus status);
+        return route.toMaximalUnseekables();
+    }
+
+    public PreLoadContext contextForSelf()
+    {
+        return contextForCommand(this);
+    }
 
-    public Status status() { return saveStatus().status; }
-    protected void setStatus(Status status) { setSaveStatus(SaveStatus.get(status, known())); }
+    public abstract Timestamp executeAt();
+    public abstract Ballot accepted();
+    public abstract PartialTxn partialTxn();
+    public abstract @Nullable PartialDeps partialDeps();
 
-    public Known known() { return saveStatus().known; }
+    public final Status status()
+    {
+        return saveStatus().status;
+    }
 
-    public abstract Durability durability();
-    public abstract void setDurability(Durability v);
+    public final Status.Known known()
+    {
+        return saveStatus().known;
+    }
 
-    public abstract Command addListener(CommandListener listener);
-    public abstract void removeListener(CommandListener listener);
-    protected abstract void notifyListeners(SafeCommandStore safeStore);
+    public boolean hasBeenWitnessed()
+    {
+        return partialTxn() != null;
+    }
 
-    protected abstract void addWaitingOnCommit(TxnId txnId);
-    protected abstract void removeWaitingOnCommit(TxnId txnId);
-    protected abstract TxnId firstWaitingOnCommit();
+    public final boolean hasBeen(Status status)
+    {
+        return status().compareTo(status) >= 0;
+    }
 
-    protected abstract void addWaitingOnApplyIfAbsent(TxnId txnId, Timestamp executeAt);
-    protected abstract TxnId firstWaitingOnApply(@Nullable TxnId ifExecutesBefore);
+    public boolean has(Status.Known known)
+    {
+        return known.isSatisfiedBy(saveStatus().known);
+    }
 
-    protected abstract void removeWaitingOn(TxnId txnId, Timestamp executeAt);
-    protected abstract boolean isWaitingOnDependency();
+    public boolean has(Status.Definition definition)
+    {
+        return known().definition.compareTo(definition) >= 0;
+    }
 
-    public boolean hasBeenWitnessed()
+    public boolean has(Status.Outcome outcome)
     {
-        return partialTxn() != null;
+        return known().outcome.compareTo(outcome) >= 0;
     }
 
-    @Override
-    public Iterable<TxnId> txnIds()
+    public boolean is(Status status)
     {
-        return Collections.singleton(txnId());
+        return status() == status;
     }
 
-    @Override
-    public Seekables<?, ?> keys()
+    public final CommandListener asListener()
     {
-        // TODO (expected, consider): when do we need this, and will it always be sufficient?
-        return partialTxn().keys();
+        return listener(txnId());
     }
 
-    public void setDurability(SafeCommandStore safeStore, Durability durability, RoutingKey homeKey, @Nullable Timestamp executeAt)
+    public final boolean isWitnessed()
     {
-        updateHomeKey(safeStore, homeKey);
-        if (executeAt != null && hasBeen(PreCommitted) && !this.executeAt().equals(executeAt))
-            safeStore.agent().onInconsistentTimestamp(this, this.executeAt(), executeAt);
-        setDurability(durability);
+        boolean result = status().hasBeen(Status.PreAccepted);
+        Invariants.checkState(result == (this instanceof PreAccepted));
+        return result;
     }
 
-    public enum AcceptOutcome
+    public final PreAccepted asWitnessed()
     {
-        Success, Redundant, RejectedBallot
+        return (PreAccepted) this;
     }
 
-    public AcceptOutcome preaccept(SafeCommandStore safeStore, PartialTxn partialTxn, Route<?> route, @Nullable RoutingKey progressKey)
+    public final boolean isAccepted()
     {
-        return preacceptOrRecover(safeStore, partialTxn, route, progressKey, Ballot.ZERO);
+        boolean result = status().hasBeen(Status.AcceptedInvalidate);
+        Invariants.checkState(result == (this instanceof Accepted));
+        return result;
     }
 
-    public AcceptOutcome recover(SafeCommandStore safeStore, PartialTxn partialTxn, Route<?> route, @Nullable RoutingKey progressKey, Ballot ballot)
+    public final Accepted asAccepted()
     {
-        return preacceptOrRecover(safeStore, partialTxn, route, progressKey, ballot);
+        return (Accepted) this;
     }
 
-    private AcceptOutcome preacceptOrRecover(SafeCommandStore safeStore, PartialTxn partialTxn, Route<?> route, @Nullable RoutingKey progressKey, Ballot ballot)
+    public final boolean isCommitted()
     {
-        int compareBallots = promised().compareTo(ballot);
-        if (compareBallots > 0)
-        {
-            logger.trace("{}: skipping preaccept - higher ballot witnessed ({})", txnId(), promised());
-            return AcceptOutcome.RejectedBallot;
-        }
-        else if (compareBallots < 0)
-        {
-            // save the new ballot as a promise
-            setPromised(ballot);
-        }
+        boolean result = status().hasBeen(Status.Committed);
+        Invariants.checkState(result == (this instanceof Committed));
+        return result;
+    }
 
-        if (known().definition.isKnown())
-        {
-            Invariants.checkState(status() == Invalidated || executeAt() != null);
-            logger.trace("{}: skipping preaccept - already known ({})", txnId(), status());
-            // in case of Ballot.ZERO, we must either have a competing recovery coordinator or have late delivery of the
-            // preaccept; in the former case we should abandon coordination, and in the latter we have already completed
-            return ballot.equals(Ballot.ZERO) ? AcceptOutcome.Redundant : AcceptOutcome.Success;
-        }
+    public final Committed asCommitted()
+    {
+        return (Committed) this;
+    }
 
-        Ranges coordinateRanges = coordinateRanges(safeStore);
-        Invariants.checkState(!coordinateRanges.isEmpty());
-        ProgressShard shard = progressShard(safeStore, route, progressKey, coordinateRanges);
-        if (!validate(Ranges.EMPTY, coordinateRanges, shard, route, Set, partialTxn, Set, null, Ignore))
-            throw new IllegalStateException();
+    public final boolean isExecuted()
+    {
+        boolean result = status().hasBeen(Status.PreApplied);
+        Invariants.checkState(result == (this instanceof Executed));
+        return result;
+    }
 
-        if (executeAt() == null)
-        {
-            TxnId txnId = txnId();
-            // unlike in the Accord paper, we partition shards within a node, so that to ensure a total order we must either:
-            //  - use a global logical clock to issue new timestamps; or
-            //  - assign each shard _and_ process a unique id, and use both as components of the timestamp
-            // if we are performing recovery (i.e. non-zero ballot), do not permit a fast path decision as we want to
-            // invalidate any transactions that were not completed by their initial coordinator
-            if (ballot.equals(Ballot.ZERO)) setExecuteAt(safeStore.preaccept(txnId, partialTxn.keys()));
-            else setExecuteAt(safeStore.time().uniqueNow(txnId));
+    public final Executed asExecuted()
+    {
+        return (Executed) this;
+    }
 
-            if (status() == NotWitnessed)
-                setStatus(PreAccepted);
-            safeStore.progressLog().preaccepted(this, shard);
-        }
-        else
-        {
-            // TODO (expected, ?): in the case that we are pre-committed but had not been preaccepted/accepted, should we inform progressLog?
-            setSaveStatus(SaveStatus.enrich(saveStatus(), DefinitionOnly));
-        }
-        set(safeStore, Ranges.EMPTY, coordinateRanges, shard, route, partialTxn, Set, null, Ignore);
+    public abstract Command updateAttributes(CommonAttributes attrs, Ballot promised);
 
-        notifyListeners(safeStore);
-        return AcceptOutcome.Success;
+    public final Command updateAttributes(CommonAttributes attrs)
+    {
+        return updateAttributes(attrs, promised());
     }
 
-    public boolean preacceptInvalidate(Ballot ballot)
+    public final Command updatePromised(Ballot promised)
     {
-        if (promised().compareTo(ballot) > 0)
-        {
-            logger.trace("{}: skipping preacceptInvalidate - witnessed higher ballot ({})", txnId(), promised());
-            return false;
-        }
-        setPromised(ballot);
-        return true;
+        return updateAttributes(this, promised);
     }
 
-    public AcceptOutcome accept(SafeCommandStore safeStore, Ballot ballot, PartialRoute<?> route, Seekables<?, ?> keys, @Nullable RoutingKey progressKey, Timestamp executeAt, PartialDeps partialDeps)
+    public static final class NotWitnessed extends AbstractCommand
     {
-        if (this.promised().compareTo(ballot) > 0)
+        NotWitnessed(TxnId txnId, SaveStatus status, Status.Durability durability, RoutingKey homeKey, RoutingKey progressKey, Route<?> route, Ballot promised, Listeners.Immutable listeners)
         {
-            logger.trace("{}: skipping accept - witnessed higher ballot ({} > {})", txnId(), promised(), ballot);
-            return AcceptOutcome.RejectedBallot;
+            super(txnId, status, durability, homeKey, progressKey, route, promised, listeners);
         }
 
-        if (hasBeen(PreCommitted))
+        NotWitnessed(CommonAttributes common, SaveStatus status, Ballot promised)
         {
-            logger.trace("{}: skipping accept - already committed ({})", txnId(), status());
-            return AcceptOutcome.Redundant;
+            super(common, status, promised);
         }
 
-        TxnId txnId = txnId();
-        Ranges coordinateRanges = coordinateRanges(safeStore);
-        Ranges acceptRanges = txnId.epoch() == executeAt.epoch() ? coordinateRanges : safeStore.ranges().between(txnId.epoch(), executeAt.epoch());
-        Invariants.checkState(!acceptRanges.isEmpty());
-        ProgressShard shard = progressShard(safeStore, route, progressKey, coordinateRanges);
-
-        if (!validate(coordinateRanges, Ranges.EMPTY, shard, route, Ignore, null, Ignore, partialDeps, Set))
-            throw new AssertionError("Invalid response from validate function");
-
-        setExecuteAt(executeAt);
-        setPromised(ballot);
-        setAccepted(ballot);
+        @Override
+        public Command updateAttributes(CommonAttributes attrs, Ballot promised)
+        {
+            return new NotWitnessed(attrs, saveStatus(), promised);
+        }
 
-        // TODO (desired, clarity/efficiency): we don't need to set the route here, and perhaps we don't even need to
-        //  distributed partialDeps at all, since all we gain is not waiting for these transactions to commit during
-        //  recovery. We probably don't want to directly persist a Route in any other circumstances, either, to ease persistence.
-        set(safeStore, coordinateRanges, acceptRanges, shard, route, null, Ignore, partialDeps, Set);
+        public static NotWitnessed notWitnessed(CommonAttributes common, Ballot promised)
+        {
+            return new NotWitnessed(common, SaveStatus.NotWitnessed, promised);
+        }
 
-        // set only registers by transaction keys, which we mightn't already have received
-        if (!known().isDefinitionKnown())
-            safeStore.register(keys, acceptRanges, this);
+        public static NotWitnessed notWitnessed(TxnId txnId)
+        {
+            return new NotWitnessed(txnId, SaveStatus.NotWitnessed, NotDurable, null, null, null, Ballot.ZERO, null);
+        }
 
-        setStatus(Accepted);
-        safeStore.progressLog().accepted(this, shard);
-        notifyListeners(safeStore);
+        public static NotWitnessed notWitnessed(NotWitnessed command, CommonAttributes common, Ballot promised)
+        {
+            checkSameClass(command, NotWitnessed.class, "Cannot update");
+            Invariants.checkArgument(command.txnId().equals(common.txnId()));
+            return new NotWitnessed(common, command.saveStatus(), promised);
+        }
 
-        return AcceptOutcome.Success;
-    }
+        @Override
+        public Timestamp executeAt()
+        {
+            return null;
+        }
 
-    public AcceptOutcome acceptInvalidate(SafeCommandStore safeStore, Ballot ballot)
-    {
-        if (this.promised().compareTo(ballot) > 0)
+        @Override
+        public Ballot promised()
         {
-            logger.trace("{}: skipping accept invalidated - witnessed higher ballot ({} > {})", txnId(), promised(), ballot);
-            return AcceptOutcome.RejectedBallot;
+            return Ballot.ZERO;
         }
 
-        if (hasBeen(PreCommitted))
+        @Override
+        public Ballot accepted()
         {
-            logger.trace("{}: skipping accept invalidated - already committed ({})", txnId(), status());
-            return AcceptOutcome.Redundant;
+            return Ballot.ZERO;
         }
 
-        setPromised(ballot);
-        setAccepted(ballot);
-        setStatus(AcceptedInvalidate);
-        setPartialDeps(null);
-        logger.trace("{}: accepted invalidated", txnId());
+        @Override
+        public PartialTxn partialTxn()
+        {
+            return null;
+        }
 
-        notifyListeners(safeStore);
-        return AcceptOutcome.Success;
+        @Override
+        public @Nullable PartialDeps partialDeps()
+        {
+            return null;
+        }
     }
 
-    public enum CommitOutcome { Success, Redundant, Insufficient }
-
-    // relies on mutual exclusion for each key
-    public CommitOutcome commit(SafeCommandStore safeStore, Route<?> route, @Nullable RoutingKey progressKey, @Nullable PartialTxn partialTxn, Timestamp executeAt, PartialDeps partialDeps)
+    public static class PreAccepted extends AbstractCommand
     {
-        if (hasBeen(PreCommitted))
-        {
-            logger.trace("{}: skipping commit - already committed ({})", txnId(), status());
-            if (!executeAt.equals(executeAt()) || status() == Invalidated)
-                safeStore.agent().onInconsistentTimestamp(this, (status() == Invalidated ? Timestamp.NONE : this.executeAt()), executeAt);
+        private final Timestamp executeAt;
+        private final PartialTxn partialTxn;
+        private final @Nullable PartialDeps partialDeps;
 
-            if (hasBeen(Committed))
-                return CommitOutcome.Redundant;
+        private PreAccepted(CommonAttributes common, SaveStatus status, Timestamp executeAt, Ballot promised)
+        {
+            super(common, status, promised);
+            this.executeAt = executeAt;
+            this.partialTxn = common.partialTxn();
+            this.partialDeps = common.partialDeps();
         }
 
-        Ranges coordinateRanges = coordinateRanges(safeStore);
-        // TODO (expected, consider): consider ranges between coordinateRanges and executeRanges? Perhaps don't need them
-        Ranges executeRanges = executeRanges(safeStore, executeAt);
-        ProgressShard shard = progressShard(safeStore, route, progressKey, coordinateRanges);
-
-        if (!validate(coordinateRanges, executeRanges, shard, route, Check, partialTxn, Add, partialDeps, Set))
-            return CommitOutcome.Insufficient;
-
-        setExecuteAt(executeAt);
-        set(safeStore, coordinateRanges, executeRanges, shard, route, partialTxn, Add, partialDeps, Set);
+        @Override
+        public Command updateAttributes(CommonAttributes attrs, Ballot promised)
+        {
+            return new PreAccepted(attrs, saveStatus(), executeAt(), promised);
+        }
 
-        setStatus(Committed);
-        logger.trace("{}: committed with executeAt: {}, deps: {}", txnId(), executeAt, partialDeps);
-        populateWaitingOn(safeStore);
+        @Override
+        public boolean equals(Object o)
+        {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            if (!super.equals(o)) return false;
+            PreAccepted that = (PreAccepted) o;
+            return executeAt.equals(that.executeAt)
+                    && Objects.equals(partialTxn, that.partialTxn)
+                    && Objects.equals(partialDeps, that.partialDeps);
+        }
 
-        safeStore.progressLog().committed(this, shard);
+        @Override
+        public int hashCode()
+        {
+            int hash = super.hashCode();

Review Comment:
   Do we use this impl, or should we inherit the UOE?



##########
accord-core/src/main/java/accord/local/Commands.java:
##########
@@ -0,0 +1,1083 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.local;
+
+import accord.api.ProgressLog.ProgressShard;
+import accord.api.Result;
+import accord.api.RoutingKey;
+import accord.local.Command.WaitingOn;
+import accord.primitives.*;
+import accord.utils.Invariants;
+import accord.utils.async.AsyncChain;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import static accord.api.ProgressLog.ProgressShard.*;
+import static accord.api.ProgressLog.ProgressShard.Local;
+import static accord.local.Commands.EnsureAction.*;
+import static accord.local.Status.*;
+import static accord.local.Status.Known.ExecuteAtOnly;
+import static accord.primitives.Route.isFullRoute;
+
+public class Commands
+{
+    private static final Logger logger = LoggerFactory.getLogger(Commands.class);
+
+    private Commands()
+    {
+    }
+
+    private static Ranges covers(@Nullable PartialTxn txn)
+    {
+        return txn == null ? null : txn.covering();
+    }
+
+    private static Ranges covers(@Nullable PartialDeps deps)
+    {
+        return deps == null ? null : deps.covering;
+    }
+
+    private static boolean hasQuery(PartialTxn txn)
+    {
+        return txn != null && txn.query() != null;
+    }
+
+    /**
+     * true iff this commandStore owns the given key on the given epoch
+     */
+    public static boolean owns(SafeCommandStore safeStore, long epoch, RoutingKey someKey)
+    {
+        return safeStore.ranges().at(epoch).contains(someKey);
+    }
+
+    public static RoutingKey noProgressKey()
+    {
+        return NO_PROGRESS_KEY;
+    }
+
+    public enum AcceptOutcome {Success, Redundant, RejectedBallot}
+
+    public static AcceptOutcome preaccept(SafeCommandStore safeStore, TxnId txnId, PartialTxn partialTxn, Route<?> route, @Nullable RoutingKey progressKey)
+    {
+        return preacceptOrRecover(safeStore, txnId, partialTxn, route, progressKey, Ballot.ZERO);
+    }
+
+    public static AcceptOutcome recover(SafeCommandStore safeStore, TxnId txnId, PartialTxn partialTxn, Route<?> route, @Nullable RoutingKey progressKey, Ballot ballot)
+    {
+        return preacceptOrRecover(safeStore, txnId, partialTxn, route, progressKey, ballot);
+    }
+
+    private static AcceptOutcome preacceptOrRecover(SafeCommandStore safeStore, TxnId txnId, PartialTxn partialTxn, Route<?> route, @Nullable RoutingKey progressKey, Ballot ballot)
+    {
+        SafeCommand liveCommand = safeStore.command(txnId);
+        Command command = liveCommand.current();
+
+        int compareBallots = command.promised().compareTo(ballot);
+        if (compareBallots > 0)
+        {
+            logger.trace("{}: skipping preaccept - higher ballot witnessed ({})", txnId, command.promised());
+            return AcceptOutcome.RejectedBallot;
+        }
+
+        if (command.known().definition.isKnown())
+        {
+            Invariants.checkState(command.status() == Invalidated || command.executeAt() != null);
+            logger.trace("{}: skipping preaccept - already known ({})", txnId, command.status());
+            // in case of Ballot.ZERO, we must either have a competing recovery coordinator or have late delivery of the
+            // preaccept; in the former case we should abandon coordination, and in the latter we have already completed
+            liveCommand.updatePromised(ballot);
+            return ballot.equals(Ballot.ZERO) ? AcceptOutcome.Redundant : AcceptOutcome.Success;
+        }
+
+        Ranges coordinateRanges = coordinateRanges(safeStore, command);
+        Invariants.checkState(!coordinateRanges.isEmpty());
+        CommonAttributes attrs = updateHomeAndProgressKeys(safeStore, command.txnId(), command, route, progressKey, coordinateRanges);
+        ProgressShard shard = progressShard(attrs, progressKey, coordinateRanges);
+        if (!validate(command.status(), attrs, Ranges.EMPTY, coordinateRanges, shard, route, Set, partialTxn, Set, null, Ignore))
+            throw new IllegalStateException();
+
+        // FIXME: this should go into a consumer method
+        attrs = set(safeStore, command, attrs, Ranges.EMPTY, coordinateRanges, shard, route, partialTxn, Set, null, Ignore);
+        if (command.executeAt() == null)
+        {
+            // unlike in the Accord paper, we partition shards within a node, so that to ensure a total order we must either:
+            //  - use a global logical clock to issue new timestamps; or
+            //  - assign each shard _and_ process a unique id, and use both as components of the timestamp
+            // if we are performing recovery (i.e. non-zero ballot), do not permit a fast path decision as we want to
+            // invalidate any transactions that were not completed by their initial coordinator
+            Timestamp executeAt = ballot.equals(Ballot.ZERO)
+                    ? safeStore.preaccept(txnId, partialTxn.keys())
+                    : safeStore.time().uniqueNow(txnId);
+            command = liveCommand.preaccept(attrs, executeAt, ballot);
+            safeStore.progressLog().preaccepted(command, shard);
+        }
+        else
+        {
+            // TODO (expected, ?): in the case that we are pre-committed but had not been preaccepted/accepted, should we inform progressLog?
+            command = liveCommand.markDefined(attrs, ballot);
+        }
+
+        safeStore.notifyListeners(command);
+        return AcceptOutcome.Success;
+    }
+
+    public static boolean preacceptInvalidate(SafeCommandStore safeStore, TxnId txnId, Ballot ballot)
+    {
+        SafeCommand liveCommand = safeStore.command(txnId);
+        Command command = liveCommand.current();
+        if (command.promised().compareTo(ballot) > 0)
+        {
+            logger.trace("{}: skipping preacceptInvalidate - witnessed higher ballot ({})", command.txnId(), command.promised());
+            return false;
+        }
+        liveCommand.updatePromised(ballot);
+        return true;
+    }
+
+    public static AcceptOutcome accept(SafeCommandStore safeStore, TxnId txnId, Ballot ballot, PartialRoute<?> route, Seekables<?, ?> keys, @Nullable RoutingKey progressKey, Timestamp executeAt, PartialDeps partialDeps)
+    {
+        SafeCommand liveCommand = safeStore.command(txnId);
+        Command command = liveCommand.current();
+        if (command.promised().compareTo(ballot) > 0)
+        {
+            logger.trace("{}: skipping accept - witnessed higher ballot ({} > {})", txnId, command.promised(), ballot);
+            return AcceptOutcome.RejectedBallot;
+        }
+
+        if (command.hasBeen(PreCommitted))
+        {
+            logger.trace("{}: skipping accept - already committed ({})", txnId, command.status());
+            return AcceptOutcome.Redundant;
+        }
+
+        Ranges coordinateRanges = coordinateRanges(safeStore, command);
+        Ranges acceptRanges = txnId.epoch() == executeAt.epoch() ? coordinateRanges : safeStore.ranges().between(txnId.epoch(), executeAt.epoch());
+        Invariants.checkState(!acceptRanges.isEmpty());
+
+        CommonAttributes attrs = updateHomeAndProgressKeys(safeStore, command.txnId(), command, route, progressKey, coordinateRanges);
+        ProgressShard shard = progressShard(attrs, progressKey, coordinateRanges);
+        if (!validate(command.status(), attrs, coordinateRanges, Ranges.EMPTY, shard, route, Ignore, null, Ignore, partialDeps, Set))
+        {
+            throw new AssertionError("Invalid response from validate function");
+        }
+
+        // TODO (desired, clarity/efficiency): we don't need to set the route here, and perhaps we don't even need to
+        //  distributed partialDeps at all, since all we gain is not waiting for these transactions to commit during
+        //  recovery. We probably don't want to directly persist a Route in any other circumstances, either, to ease persistence.
+        attrs = set(safeStore, command, attrs, coordinateRanges, acceptRanges, shard, route, null, Ignore, partialDeps, Set);
+
+        // set only registers by transaction keys, which we mightn't already have received
+        if (!command.known().isDefinitionKnown())
+            safeStore.register(keys, acceptRanges, command);
+
+        command = liveCommand.accept(attrs, executeAt, ballot);
+        safeStore.progressLog().accepted(command, shard);
+        safeStore.notifyListeners(command);
+
+        return AcceptOutcome.Success;
+    }
+
+    public static AcceptOutcome acceptInvalidate(SafeCommandStore safeStore, SafeCommand liveCommand, Ballot ballot)
+    {
+        Command command = liveCommand.current();
+        if (command.promised().compareTo(ballot) > 0)
+        {
+            logger.trace("{}: skipping accept invalidated - witnessed higher ballot ({} > {})", command.txnId(), command.promised(), ballot);
+            return AcceptOutcome.RejectedBallot;
+        }
+
+        if (command.hasBeen(PreCommitted))
+        {
+            logger.trace("{}: skipping accept invalidated - already committed ({})", command.txnId(), command.status());
+            return AcceptOutcome.Redundant;
+        }
+
+        logger.trace("{}: accepted invalidated", command.txnId());
+
+        command = liveCommand.acceptInvalidated(ballot);
+        safeStore.notifyListeners(command);
+        return AcceptOutcome.Success;
+    }
+
+    public enum CommitOutcome {Success, Redundant, Insufficient;}
+
+
+    // relies on mutual exclusion for each key
+    public static CommitOutcome commit(SafeCommandStore safeStore, TxnId txnId, Route<?> route, @Nullable RoutingKey progressKey, @Nullable PartialTxn partialTxn, Timestamp executeAt, PartialDeps partialDeps)
+    {
+        SafeCommand liveCommand = safeStore.command(txnId);
+        Command command = liveCommand.current();
+
+        if (command.hasBeen(PreCommitted))
+        {
+            logger.trace("{}: skipping commit - already committed ({})", txnId, command.status());
+            if (!executeAt.equals(command.executeAt()) || command.status() == Invalidated)
+                safeStore.agent().onInconsistentTimestamp(command, (command.status() == Invalidated ? Timestamp.NONE : command.executeAt()), executeAt);
+
+            if (command.hasBeen(Committed))
+                return CommitOutcome.Redundant;
+        }
+
+        Ranges coordinateRanges = coordinateRanges(safeStore, command);
+        // TODO (expected, consider): consider ranges between coordinateRanges and executeRanges? Perhaps don't need them
+        Ranges executeRanges = executeRanges(safeStore, executeAt);
+
+        CommonAttributes attrs = updateHomeAndProgressKeys(safeStore, command.txnId(), command, route, progressKey, coordinateRanges);
+        ProgressShard shard = progressShard(attrs, progressKey, coordinateRanges);
+
+        if (!validate(command.status(), attrs, coordinateRanges, executeRanges, shard, route, Check, partialTxn, Add, partialDeps, Set))
+        {
+            liveCommand.updateAttributes(attrs);
+            return CommitOutcome.Insufficient;
+        }
+
+        // FIXME: split up set
+        attrs = set(safeStore, command, attrs, coordinateRanges, executeRanges, shard, route, partialTxn, Add, partialDeps, Set);
+
+        logger.trace("{}: committed with executeAt: {}, deps: {}", txnId, executeAt, partialDeps);
+        WaitingOn waitingOn = populateWaitingOn(safeStore, txnId, executeAt, partialDeps);
+        command = liveCommand.commit(attrs, executeAt, waitingOn);
+
+        safeStore.progressLog().committed(command, shard);
+
+        // TODO (expected, safety): introduce intermediate status to avoid reentry when notifying listeners (which might notify us)
+        maybeExecute(safeStore, liveCommand, shard, true, true);
+        return CommitOutcome.Success;
+    }
+
+    // relies on mutual exclusion for each key
+    public static void precommit(SafeCommandStore safeStore, TxnId txnId, Timestamp executeAt)
+    {
+        SafeCommand liveCommand = safeStore.command(txnId);
+        Command command = liveCommand.current();
+        if (command.hasBeen(PreCommitted))
+        {
+            logger.trace("{}: skipping precommit - already committed ({})", txnId, command.status());
+            if (executeAt.equals(command.executeAt()) && command.status() != Invalidated)
+                return;
+
+            safeStore.agent().onInconsistentTimestamp(command, (command.status() == Invalidated ? Timestamp.NONE : command.executeAt()), executeAt);
+        }
+
+        command = liveCommand.precommit(executeAt);
+        safeStore.notifyListeners(command);
+        logger.trace("{}: precommitted with executeAt: {}", txnId, executeAt);
+    }
+
+    protected static WaitingOn populateWaitingOn(SafeCommandStore safeStore, TxnId txnId, Timestamp executeAt, PartialDeps partialDeps)
+    {
+        Ranges ranges = safeStore.ranges().since(executeAt.epoch());
+        if (ranges == null)
+            return WaitingOn.EMPTY;
+
+        WaitingOn.Update update = new WaitingOn.Update();
+        partialDeps.forEach(ranges, depId -> {
+            SafeCommand liveCommand = safeStore.ifLoaded(depId);
+            if (liveCommand == null)
+            {
+                update.addWaitingOnCommit(depId);
+                safeStore.addAndInvokeListener(depId, txnId);
+            }
+            else
+            {
+                Command command = liveCommand.current();
+                switch (command.status())
+                {
+                    default:
+                        throw new IllegalStateException();
+                    case NotWitnessed:
+                    case PreAccepted:
+                    case Accepted:
+                    case AcceptedInvalidate:
+                    case PreCommitted:
+                        // we don't know when these dependencies will execute, and cannot execute until we do
+
+                        command = liveCommand.addListener(Command.listener(txnId));
+                        update.addWaitingOnCommit(command.txnId());
+                        break;
+                    case Committed:
+                        // TODO (desired, efficiency): split into ReadyToRead and ReadyToWrite;
+                        //                             the distributed read can be performed as soon as those keys are ready,
+                        //                             and in parallel with any other reads. the client can even ACK immediately after;
+                        //                             only the write needs to be postponed until other in-progress reads complete
+                    case ReadyToExecute:
+                    case PreApplied:
+                    case Applied:
+                        command = liveCommand.addListener(Command.listener(txnId));
+                        insertPredecessor(txnId, executeAt, update, command);
+                    case Invalidated:
+                        break;
+                }
+            }
+        });
+        return update.build();
+    }
+
+    // TODO (expected, ?): commitInvalidate may need to update cfks _if_ possible
+    public static void commitInvalidate(SafeCommandStore safeStore, TxnId txnId)
+    {
+        SafeCommand liveCommand = safeStore.command(txnId);
+        Command command = liveCommand.current();
+        if (command.hasBeen(PreCommitted))
+        {
+            logger.trace("{}: skipping commit invalidated - already committed ({})", txnId, command.status());
+            if (!command.hasBeen(Invalidated))
+                safeStore.agent().onInconsistentTimestamp(command, Timestamp.NONE, command.executeAt());
+
+            return;
+        }
+
+        ProgressShard shard = progressShard(safeStore, command);
+        safeStore.progressLog().invalidated(command, shard);
+
+        CommonAttributes attrs = command;
+        if (command.partialDeps() == null)
+            attrs = attrs.mutableAttrs().partialDeps(PartialDeps.NONE);
+        command = liveCommand.commitInvalidated(attrs, txnId);
+        logger.trace("{}: committed invalidated", txnId);
+
+        safeStore.notifyListeners(command);
+    }
+
+    public enum ApplyOutcome {Success, Redundant, Insufficient}
+
+
+    public static ApplyOutcome apply(SafeCommandStore safeStore, TxnId txnId, long untilEpoch, Route<?> route, Timestamp executeAt, @Nullable PartialDeps partialDeps, Writes writes, Result result)
+    {
+        SafeCommand liveCommand = safeStore.command(txnId);
+        Command command = liveCommand.current();
+        if (command.hasBeen(PreApplied) && executeAt.equals(command.executeAt()))
+        {
+            logger.trace("{}: skipping apply - already executed ({})", txnId, command.status());
+            return ApplyOutcome.Redundant;
+        }
+        else if (command.hasBeen(PreCommitted) && !executeAt.equals(command.executeAt()))
+        {
+            safeStore.agent().onInconsistentTimestamp(command, command.executeAt(), executeAt);
+        }
+
+        Ranges coordinateRanges = coordinateRanges(safeStore, command);
+        Ranges executeRanges = executeRanges(safeStore, executeAt);
+        if (untilEpoch < safeStore.latestEpoch())
+        {
+            Ranges expectedRanges = safeStore.ranges().between(executeAt.epoch(), untilEpoch);
+            Invariants.checkState(expectedRanges.containsAll(executeRanges));
+        }
+
+        CommonAttributes attrs = updateHomeAndProgressKeys(safeStore, command.txnId(), command, route, coordinateRanges);
+        ProgressShard shard = progressShard(attrs, coordinateRanges);
+
+        if (!validate(command.status(), attrs, coordinateRanges, executeRanges, shard, route, Check, null, Check, partialDeps, command.hasBeen(Committed) ? Add : TrySet))
+        {
+            liveCommand.updateAttributes(attrs);
+            return ApplyOutcome.Insufficient; // TODO (expected, consider): this should probably be an assertion failure if !TrySet
+        }
+
+        WaitingOn waitingOn = !command.hasBeen(Committed) ? populateWaitingOn(safeStore, txnId, executeAt, partialDeps) : command.asCommitted().waitingOn();
+        attrs = set(safeStore, command, attrs, coordinateRanges, executeRanges, shard, route, null, Check, partialDeps, command.hasBeen(Committed) ? Add : TrySet);
+
+        liveCommand.preapplied(attrs, executeAt, waitingOn, writes, result);
+        logger.trace("{}: apply, status set to Executed with executeAt: {}, deps: {}", txnId, executeAt, partialDeps);
+
+        maybeExecute(safeStore, liveCommand, shard, true, true);
+        safeStore.progressLog().executed(liveCommand.current(), shard);
+
+        return ApplyOutcome.Success;
+    }
+
+    public static void listenerUpdate(SafeCommandStore safeStore, SafeCommand liveListener, SafeCommand liveUpdated)
+    {
+        Command listener = liveListener.current();
+        Command updated = liveUpdated.current();
+        logger.trace("{}: updating as listener in response to change on {} with status {} ({})",
+                     listener.txnId(), updated.txnId(), updated.status(), updated);
+        switch (updated.status())
+        {
+            default:
+                throw new IllegalStateException("Unexpected status: " + updated.status());
+            case NotWitnessed:
+            case PreAccepted:
+            case Accepted:
+            case AcceptedInvalidate:
+                break;
+
+            case PreCommitted:
+            case Committed:
+            case ReadyToExecute:
+            case PreApplied:
+            case Applied:
+            case Invalidated:
+                updatePredecessorAndMaybeExecute(safeStore, liveListener, liveUpdated, false);
+                break;
+        }
+    }
+
+    protected static void postApply(SafeCommandStore safeStore, TxnId txnId)
+    {
+        logger.trace("{} applied, setting status to Applied and notifying listeners", txnId);
+        Command command = safeStore.command(txnId).applied();
+        safeStore.notifyListeners(command);
+    }
+
+    private static Function<SafeCommandStore, Void> callPostApply(TxnId txnId)
+    {
+        return safeStore -> {
+            postApply(safeStore, txnId);
+            return null;
+        };
+    }
+
+    protected static AsyncChain<Void> applyChain(SafeCommandStore safeStore, Command.Executed command)
+    {
+        // important: we can't include a reference to *this* in the lambda, since the C* implementation may evict
+        // the command instance from memory between now and the write completing (and post apply being called)
+        CommandStore unsafeStore = safeStore.commandStore();
+        TxnId txnId = command.txnId();
+        PreLoadContext context = command.contextForSelf();
+        return command.writes().apply(safeStore).flatMap(unused -> unsafeStore.submit(context, callPostApply(txnId)));
+    }
+
+    private static void apply(SafeCommandStore safeStore, Command.Executed command)
+    {
+        applyChain(safeStore, command).begin(safeStore.agent());

Review Comment:
   Similarly, maybe merge with `applyChain`?



##########
accord-core/src/main/java/accord/local/CommandStores.java:
##########
@@ -359,29 +353,23 @@ public <O> void mapReduceConsume(PreLoadContext context, RoutingKey key, long mi
      * Note that {@code reduce} and {@code accept} are invoked by only one thread, and never concurrently with {@code apply},
      * so they do not require mutual exclusion.
      *
-     * Implementations are expected to invoke {@link #mapReduceConsume(PreLoadContext, Routables, long, long, MapReduceConsume, MapReduceAdapter)}
+     * Implementations are expected to invoke {@link #mapReduceConsume(PreLoadContext, Routables, long, long, MapReduceConsume)}
      */
-    public abstract <O> void mapReduceConsume(PreLoadContext context, Routables<?, ?> keys, long minEpoch, long maxEpoch, MapReduceConsume<? super SafeCommandStore, O> mapReduceConsume);
-    public abstract <O> void mapReduceConsume(PreLoadContext context, IntStream commandStoreIds, MapReduceConsume<? super SafeCommandStore, O> mapReduceConsume);
-
-    protected <T1, T2, O> void mapReduceConsume(PreLoadContext context, Routables<?, ?> keys, long minEpoch, long maxEpoch, MapReduceConsume<? super SafeCommandStore, O> mapReduceConsume,
-                                                MapReduceAdapter<? super S, T1, T2, O> adapter)
+    protected <O> void mapReduceConsume(PreLoadContext context, Routables<?, ?> keys, long minEpoch, long maxEpoch, MapReduceConsume<? super SafeCommandStore, O> mapReduceConsume)
     {
-        T1 reduced = mapReduce(context, keys, minEpoch, maxEpoch, mapReduceConsume, adapter);
-        adapter.consume(mapReduceConsume, reduced);
+        AsyncChain<O> reduced = mapReduce(context, keys, minEpoch, maxEpoch, mapReduceConsume);
+        reduced.begin(mapReduceConsume);
     }
 
-    protected <T1, T2, O> void mapReduceConsume(PreLoadContext context, IntStream commandStoreIds, MapReduceConsume<? super SafeCommandStore, O> mapReduceConsume,
-                                                   MapReduceAdapter<? super S, T1, T2, O> adapter)
+    public  <O> void mapReduceConsume(PreLoadContext context, IntStream commandStoreIds, MapReduceConsume<? super SafeCommandStore, O> mapReduceConsume)
     {
-        T1 reduced = mapReduce(context, commandStoreIds, mapReduceConsume, adapter);
-        adapter.consume(mapReduceConsume, reduced);
+        AsyncChain<O> reduced = mapReduce(context, commandStoreIds, mapReduceConsume);
+        reduced.begin(mapReduceConsume);
     }
 
-    protected <T1, T2, O> T1 mapReduce(PreLoadContext context, Routables<?, ?> keys, long minEpoch, long maxEpoch, MapReduce<? super SafeCommandStore, O> mapReduce,
-                                       MapReduceAdapter<? super S, T1, T2, O> adapter)
+    public <O> AsyncChain<O> mapReduce(PreLoadContext context, Routables<?, ?> keys, long minEpoch, long maxEpoch, MapReduce<? super SafeCommandStore, O> mapReduce)
     {
-        T2 accumulator = adapter.allocate();
+        List<AsyncChain<O>> chains = new ArrayList<>();

Review Comment:
   Perhaps this could be a simple `AsyncChain<O>` that is either null, a simple `AsyncChain` (the result of one submit), or a reducer that has already been constructed from the existing one and the new one - it looks like `AsyncChains.reduce` already adds to the existing reducer if it already exists.



##########
accord-core/src/main/java/accord/local/Command.java:
##########
@@ -109,1087 +313,758 @@ public boolean is(Status status)
      *  Once this restriction is imposed, we no longer need to pass around Routable.Domain with TxnId.
      */
     public abstract RoutingKey homeKey();
-    protected abstract void setHomeKey(RoutingKey key);
+    public abstract TxnId txnId();
+    public abstract Ballot promised();
+    public abstract Status.Durability durability();
+    public abstract Listeners.Immutable listeners();
+    public abstract SaveStatus saveStatus();
 
-    public abstract RoutingKey progressKey();
-    protected abstract void setProgressKey(RoutingKey key);
+    static boolean isSameClass(Command command, Class<? extends Command> klass)
+    {
+        return command.getClass() == klass;
+    }
 
-    /**
-     * If this is the home shard, we require that this is a Route for all states &gt; NotWitnessed;
-     * otherwise for the local progress shard this is ordinarily a PartialRoute, and for other shards this is not set,
-     * so that there is only one copy per node that can be consulted to construct the full set of involved keys.
-     *
-     * If hasBeen(Committed) this must contain the keys for both txnId.epoch and executeAt.epoch
-     */
-    public abstract @Nullable Route<?> route();
-    protected abstract void setRoute(Route<?> route);
+    private static void checkNewBallot(Ballot current, Ballot next, String name)
+    {
+        if (next.compareTo(current) < 0)
+            throw new IllegalArgumentException(String.format("Cannot update %s ballot from %s to %s. New ballot is less than current", name, current, next));
+    }
 
-    public abstract PartialTxn partialTxn();
-    protected abstract void setPartialTxn(PartialTxn txn);
+    private static void checkPromised(Command command, Ballot ballot)
+    {
+        checkNewBallot(command.promised(), ballot, "promised");
+    }
 
-    public abstract Ballot promised();
-    protected abstract void setPromised(Ballot ballot);
+    private static void checkAccepted(Command command, Ballot ballot)
+    {
+        checkNewBallot(command.accepted(), ballot, "accepted");
+    }
 
-    public abstract Ballot accepted();
-    protected abstract void setAccepted(Ballot ballot);
+    private static void checkSameClass(Command command, Class<? extends Command> klass, String errorMsg)
+    {
+        if (!isSameClass(command, klass))
+            throw new IllegalArgumentException(errorMsg + format(" expected %s got %s", klass.getSimpleName(), command.getClass().getSimpleName()));
+    }
 
-    public abstract Timestamp executeAt();
-    protected abstract void setExecuteAt(Timestamp timestamp);
+    // TODO (low priority, progress): callers should try to consult the local progress shard (if any) to obtain the full set of keys owned locally
+    public final Route<?> someRoute()
+    {
+        if (route() != null)
+            return route();
 
-    /**
-     * While !hasBeen(Committed), used only as a register for Accept state, used by Recovery
-     * If hasBeen(Committed), represents the full deps owned by this range for execution at both txnId.epoch
-     * AND executeAt.epoch so that it may be used for Recovery (which contacts only txnId.epoch topology),
-     * but also for execution.
-     */
-    public abstract PartialDeps partialDeps();
-    protected abstract void setPartialDeps(PartialDeps deps);
+        if (homeKey() != null)
+            return PartialRoute.empty(txnId().domain(), homeKey());
 
-    public abstract Writes writes();
-    protected abstract void setWrites(Writes writes);
+        return null;
+    }
 
-    public abstract Result result();
-    protected abstract void setResult(Result result);
+    public Unseekables<?, ?> maxUnseekables()
+    {
+        Route<?> route = someRoute();
+        if (route == null)
+            return null;
 
-    public abstract SaveStatus saveStatus();
-    protected abstract void setSaveStatus(SaveStatus status);
+        return route.toMaximalUnseekables();
+    }
+
+    public PreLoadContext contextForSelf()
+    {
+        return contextForCommand(this);
+    }
 
-    public Status status() { return saveStatus().status; }
-    protected void setStatus(Status status) { setSaveStatus(SaveStatus.get(status, known())); }
+    public abstract Timestamp executeAt();
+    public abstract Ballot accepted();
+    public abstract PartialTxn partialTxn();
+    public abstract @Nullable PartialDeps partialDeps();
 
-    public Known known() { return saveStatus().known; }
+    public final Status status()
+    {
+        return saveStatus().status;
+    }
 
-    public abstract Durability durability();
-    public abstract void setDurability(Durability v);
+    public final Status.Known known()
+    {
+        return saveStatus().known;
+    }
 
-    public abstract Command addListener(CommandListener listener);
-    public abstract void removeListener(CommandListener listener);
-    protected abstract void notifyListeners(SafeCommandStore safeStore);
+    public boolean hasBeenWitnessed()
+    {
+        return partialTxn() != null;

Review Comment:
   Do we need to override here?



##########
accord-core/src/main/java/accord/local/Command.java:
##########
@@ -109,1087 +313,758 @@ public boolean is(Status status)
      *  Once this restriction is imposed, we no longer need to pass around Routable.Domain with TxnId.
      */
     public abstract RoutingKey homeKey();
-    protected abstract void setHomeKey(RoutingKey key);
+    public abstract TxnId txnId();
+    public abstract Ballot promised();
+    public abstract Status.Durability durability();
+    public abstract Listeners.Immutable listeners();
+    public abstract SaveStatus saveStatus();
 
-    public abstract RoutingKey progressKey();
-    protected abstract void setProgressKey(RoutingKey key);
+    static boolean isSameClass(Command command, Class<? extends Command> klass)
+    {
+        return command.getClass() == klass;
+    }
 
-    /**
-     * If this is the home shard, we require that this is a Route for all states &gt; NotWitnessed;
-     * otherwise for the local progress shard this is ordinarily a PartialRoute, and for other shards this is not set,
-     * so that there is only one copy per node that can be consulted to construct the full set of involved keys.
-     *
-     * If hasBeen(Committed) this must contain the keys for both txnId.epoch and executeAt.epoch
-     */
-    public abstract @Nullable Route<?> route();
-    protected abstract void setRoute(Route<?> route);
+    private static void checkNewBallot(Ballot current, Ballot next, String name)
+    {
+        if (next.compareTo(current) < 0)
+            throw new IllegalArgumentException(String.format("Cannot update %s ballot from %s to %s. New ballot is less than current", name, current, next));
+    }
 
-    public abstract PartialTxn partialTxn();
-    protected abstract void setPartialTxn(PartialTxn txn);
+    private static void checkPromised(Command command, Ballot ballot)
+    {
+        checkNewBallot(command.promised(), ballot, "promised");
+    }
 
-    public abstract Ballot promised();
-    protected abstract void setPromised(Ballot ballot);
+    private static void checkAccepted(Command command, Ballot ballot)
+    {
+        checkNewBallot(command.accepted(), ballot, "accepted");
+    }
 
-    public abstract Ballot accepted();
-    protected abstract void setAccepted(Ballot ballot);
+    private static void checkSameClass(Command command, Class<? extends Command> klass, String errorMsg)
+    {
+        if (!isSameClass(command, klass))
+            throw new IllegalArgumentException(errorMsg + format(" expected %s got %s", klass.getSimpleName(), command.getClass().getSimpleName()));
+    }
 
-    public abstract Timestamp executeAt();
-    protected abstract void setExecuteAt(Timestamp timestamp);
+    // TODO (low priority, progress): callers should try to consult the local progress shard (if any) to obtain the full set of keys owned locally
+    public final Route<?> someRoute()
+    {
+        if (route() != null)
+            return route();
 
-    /**
-     * While !hasBeen(Committed), used only as a register for Accept state, used by Recovery
-     * If hasBeen(Committed), represents the full deps owned by this range for execution at both txnId.epoch
-     * AND executeAt.epoch so that it may be used for Recovery (which contacts only txnId.epoch topology),
-     * but also for execution.
-     */
-    public abstract PartialDeps partialDeps();
-    protected abstract void setPartialDeps(PartialDeps deps);
+        if (homeKey() != null)
+            return PartialRoute.empty(txnId().domain(), homeKey());
 
-    public abstract Writes writes();
-    protected abstract void setWrites(Writes writes);
+        return null;
+    }
 
-    public abstract Result result();
-    protected abstract void setResult(Result result);
+    public Unseekables<?, ?> maxUnseekables()
+    {
+        Route<?> route = someRoute();
+        if (route == null)
+            return null;
 
-    public abstract SaveStatus saveStatus();
-    protected abstract void setSaveStatus(SaveStatus status);
+        return route.toMaximalUnseekables();
+    }
+
+    public PreLoadContext contextForSelf()
+    {
+        return contextForCommand(this);
+    }
 
-    public Status status() { return saveStatus().status; }
-    protected void setStatus(Status status) { setSaveStatus(SaveStatus.get(status, known())); }
+    public abstract Timestamp executeAt();
+    public abstract Ballot accepted();
+    public abstract PartialTxn partialTxn();
+    public abstract @Nullable PartialDeps partialDeps();
 
-    public Known known() { return saveStatus().known; }
+    public final Status status()
+    {
+        return saveStatus().status;
+    }
 
-    public abstract Durability durability();
-    public abstract void setDurability(Durability v);
+    public final Status.Known known()
+    {
+        return saveStatus().known;
+    }
 
-    public abstract Command addListener(CommandListener listener);
-    public abstract void removeListener(CommandListener listener);
-    protected abstract void notifyListeners(SafeCommandStore safeStore);
+    public boolean hasBeenWitnessed()
+    {
+        return partialTxn() != null;
+    }
 
-    protected abstract void addWaitingOnCommit(TxnId txnId);
-    protected abstract void removeWaitingOnCommit(TxnId txnId);
-    protected abstract TxnId firstWaitingOnCommit();
+    public final boolean hasBeen(Status status)
+    {
+        return status().compareTo(status) >= 0;
+    }
 
-    protected abstract void addWaitingOnApplyIfAbsent(TxnId txnId, Timestamp executeAt);
-    protected abstract TxnId firstWaitingOnApply(@Nullable TxnId ifExecutesBefore);
+    public boolean has(Status.Known known)
+    {
+        return known.isSatisfiedBy(saveStatus().known);
+    }
 
-    protected abstract void removeWaitingOn(TxnId txnId, Timestamp executeAt);
-    protected abstract boolean isWaitingOnDependency();
+    public boolean has(Status.Definition definition)
+    {
+        return known().definition.compareTo(definition) >= 0;
+    }
 
-    public boolean hasBeenWitnessed()
+    public boolean has(Status.Outcome outcome)
     {
-        return partialTxn() != null;
+        return known().outcome.compareTo(outcome) >= 0;
     }
 
-    @Override
-    public Iterable<TxnId> txnIds()
+    public boolean is(Status status)
     {
-        return Collections.singleton(txnId());
+        return status() == status;
     }
 
-    @Override
-    public Seekables<?, ?> keys()
+    public final CommandListener asListener()
     {
-        // TODO (expected, consider): when do we need this, and will it always be sufficient?
-        return partialTxn().keys();
+        return listener(txnId());
     }
 
-    public void setDurability(SafeCommandStore safeStore, Durability durability, RoutingKey homeKey, @Nullable Timestamp executeAt)
+    public final boolean isWitnessed()
     {
-        updateHomeKey(safeStore, homeKey);
-        if (executeAt != null && hasBeen(PreCommitted) && !this.executeAt().equals(executeAt))
-            safeStore.agent().onInconsistentTimestamp(this, this.executeAt(), executeAt);
-        setDurability(durability);
+        boolean result = status().hasBeen(Status.PreAccepted);
+        Invariants.checkState(result == (this instanceof PreAccepted));
+        return result;
     }
 
-    public enum AcceptOutcome
+    public final PreAccepted asWitnessed()
     {
-        Success, Redundant, RejectedBallot
+        return (PreAccepted) this;
     }
 
-    public AcceptOutcome preaccept(SafeCommandStore safeStore, PartialTxn partialTxn, Route<?> route, @Nullable RoutingKey progressKey)
+    public final boolean isAccepted()
     {
-        return preacceptOrRecover(safeStore, partialTxn, route, progressKey, Ballot.ZERO);
+        boolean result = status().hasBeen(Status.AcceptedInvalidate);
+        Invariants.checkState(result == (this instanceof Accepted));
+        return result;
     }
 
-    public AcceptOutcome recover(SafeCommandStore safeStore, PartialTxn partialTxn, Route<?> route, @Nullable RoutingKey progressKey, Ballot ballot)
+    public final Accepted asAccepted()
     {
-        return preacceptOrRecover(safeStore, partialTxn, route, progressKey, ballot);
+        return (Accepted) this;
     }
 
-    private AcceptOutcome preacceptOrRecover(SafeCommandStore safeStore, PartialTxn partialTxn, Route<?> route, @Nullable RoutingKey progressKey, Ballot ballot)
+    public final boolean isCommitted()
     {
-        int compareBallots = promised().compareTo(ballot);
-        if (compareBallots > 0)
-        {
-            logger.trace("{}: skipping preaccept - higher ballot witnessed ({})", txnId(), promised());
-            return AcceptOutcome.RejectedBallot;
-        }
-        else if (compareBallots < 0)
-        {
-            // save the new ballot as a promise
-            setPromised(ballot);
-        }
+        boolean result = status().hasBeen(Status.Committed);
+        Invariants.checkState(result == (this instanceof Committed));
+        return result;
+    }
 
-        if (known().definition.isKnown())
-        {
-            Invariants.checkState(status() == Invalidated || executeAt() != null);
-            logger.trace("{}: skipping preaccept - already known ({})", txnId(), status());
-            // in case of Ballot.ZERO, we must either have a competing recovery coordinator or have late delivery of the
-            // preaccept; in the former case we should abandon coordination, and in the latter we have already completed
-            return ballot.equals(Ballot.ZERO) ? AcceptOutcome.Redundant : AcceptOutcome.Success;
-        }
+    public final Committed asCommitted()
+    {
+        return (Committed) this;
+    }
 
-        Ranges coordinateRanges = coordinateRanges(safeStore);
-        Invariants.checkState(!coordinateRanges.isEmpty());
-        ProgressShard shard = progressShard(safeStore, route, progressKey, coordinateRanges);
-        if (!validate(Ranges.EMPTY, coordinateRanges, shard, route, Set, partialTxn, Set, null, Ignore))
-            throw new IllegalStateException();
+    public final boolean isExecuted()
+    {
+        boolean result = status().hasBeen(Status.PreApplied);
+        Invariants.checkState(result == (this instanceof Executed));
+        return result;
+    }
 
-        if (executeAt() == null)
-        {
-            TxnId txnId = txnId();
-            // unlike in the Accord paper, we partition shards within a node, so that to ensure a total order we must either:
-            //  - use a global logical clock to issue new timestamps; or
-            //  - assign each shard _and_ process a unique id, and use both as components of the timestamp
-            // if we are performing recovery (i.e. non-zero ballot), do not permit a fast path decision as we want to
-            // invalidate any transactions that were not completed by their initial coordinator
-            if (ballot.equals(Ballot.ZERO)) setExecuteAt(safeStore.preaccept(txnId, partialTxn.keys()));
-            else setExecuteAt(safeStore.time().uniqueNow(txnId));
+    public final Executed asExecuted()
+    {
+        return (Executed) this;
+    }
 
-            if (status() == NotWitnessed)
-                setStatus(PreAccepted);
-            safeStore.progressLog().preaccepted(this, shard);
-        }
-        else
-        {
-            // TODO (expected, ?): in the case that we are pre-committed but had not been preaccepted/accepted, should we inform progressLog?
-            setSaveStatus(SaveStatus.enrich(saveStatus(), DefinitionOnly));
-        }
-        set(safeStore, Ranges.EMPTY, coordinateRanges, shard, route, partialTxn, Set, null, Ignore);
+    public abstract Command updateAttributes(CommonAttributes attrs, Ballot promised);
 
-        notifyListeners(safeStore);
-        return AcceptOutcome.Success;
+    public final Command updateAttributes(CommonAttributes attrs)
+    {
+        return updateAttributes(attrs, promised());
     }
 
-    public boolean preacceptInvalidate(Ballot ballot)
+    public final Command updatePromised(Ballot promised)
     {
-        if (promised().compareTo(ballot) > 0)
-        {
-            logger.trace("{}: skipping preacceptInvalidate - witnessed higher ballot ({})", txnId(), promised());
-            return false;
-        }
-        setPromised(ballot);
-        return true;
+        return updateAttributes(this, promised);
     }
 
-    public AcceptOutcome accept(SafeCommandStore safeStore, Ballot ballot, PartialRoute<?> route, Seekables<?, ?> keys, @Nullable RoutingKey progressKey, Timestamp executeAt, PartialDeps partialDeps)
+    public static final class NotWitnessed extends AbstractCommand
     {
-        if (this.promised().compareTo(ballot) > 0)
+        NotWitnessed(TxnId txnId, SaveStatus status, Status.Durability durability, RoutingKey homeKey, RoutingKey progressKey, Route<?> route, Ballot promised, Listeners.Immutable listeners)
         {
-            logger.trace("{}: skipping accept - witnessed higher ballot ({} > {})", txnId(), promised(), ballot);
-            return AcceptOutcome.RejectedBallot;
+            super(txnId, status, durability, homeKey, progressKey, route, promised, listeners);
         }
 
-        if (hasBeen(PreCommitted))
+        NotWitnessed(CommonAttributes common, SaveStatus status, Ballot promised)
         {
-            logger.trace("{}: skipping accept - already committed ({})", txnId(), status());
-            return AcceptOutcome.Redundant;
+            super(common, status, promised);
         }
 
-        TxnId txnId = txnId();
-        Ranges coordinateRanges = coordinateRanges(safeStore);
-        Ranges acceptRanges = txnId.epoch() == executeAt.epoch() ? coordinateRanges : safeStore.ranges().between(txnId.epoch(), executeAt.epoch());
-        Invariants.checkState(!acceptRanges.isEmpty());
-        ProgressShard shard = progressShard(safeStore, route, progressKey, coordinateRanges);
-
-        if (!validate(coordinateRanges, Ranges.EMPTY, shard, route, Ignore, null, Ignore, partialDeps, Set))
-            throw new AssertionError("Invalid response from validate function");
-
-        setExecuteAt(executeAt);
-        setPromised(ballot);
-        setAccepted(ballot);
+        @Override
+        public Command updateAttributes(CommonAttributes attrs, Ballot promised)
+        {
+            return new NotWitnessed(attrs, saveStatus(), promised);
+        }
 
-        // TODO (desired, clarity/efficiency): we don't need to set the route here, and perhaps we don't even need to
-        //  distributed partialDeps at all, since all we gain is not waiting for these transactions to commit during
-        //  recovery. We probably don't want to directly persist a Route in any other circumstances, either, to ease persistence.
-        set(safeStore, coordinateRanges, acceptRanges, shard, route, null, Ignore, partialDeps, Set);
+        public static NotWitnessed notWitnessed(CommonAttributes common, Ballot promised)
+        {
+            return new NotWitnessed(common, SaveStatus.NotWitnessed, promised);
+        }
 
-        // set only registers by transaction keys, which we mightn't already have received
-        if (!known().isDefinitionKnown())
-            safeStore.register(keys, acceptRanges, this);
+        public static NotWitnessed notWitnessed(TxnId txnId)
+        {
+            return new NotWitnessed(txnId, SaveStatus.NotWitnessed, NotDurable, null, null, null, Ballot.ZERO, null);
+        }
 
-        setStatus(Accepted);
-        safeStore.progressLog().accepted(this, shard);
-        notifyListeners(safeStore);
+        public static NotWitnessed notWitnessed(NotWitnessed command, CommonAttributes common, Ballot promised)
+        {
+            checkSameClass(command, NotWitnessed.class, "Cannot update");
+            Invariants.checkArgument(command.txnId().equals(common.txnId()));
+            return new NotWitnessed(common, command.saveStatus(), promised);
+        }
 
-        return AcceptOutcome.Success;
-    }
+        @Override
+        public Timestamp executeAt()
+        {
+            return null;
+        }
 
-    public AcceptOutcome acceptInvalidate(SafeCommandStore safeStore, Ballot ballot)
-    {
-        if (this.promised().compareTo(ballot) > 0)
+        @Override
+        public Ballot promised()
         {
-            logger.trace("{}: skipping accept invalidated - witnessed higher ballot ({} > {})", txnId(), promised(), ballot);
-            return AcceptOutcome.RejectedBallot;
+            return Ballot.ZERO;
         }
 
-        if (hasBeen(PreCommitted))
+        @Override
+        public Ballot accepted()
         {
-            logger.trace("{}: skipping accept invalidated - already committed ({})", txnId(), status());
-            return AcceptOutcome.Redundant;
+            return Ballot.ZERO;
         }
 
-        setPromised(ballot);
-        setAccepted(ballot);
-        setStatus(AcceptedInvalidate);
-        setPartialDeps(null);
-        logger.trace("{}: accepted invalidated", txnId());
+        @Override
+        public PartialTxn partialTxn()
+        {
+            return null;
+        }
 
-        notifyListeners(safeStore);
-        return AcceptOutcome.Success;
+        @Override
+        public @Nullable PartialDeps partialDeps()
+        {
+            return null;
+        }
     }
 
-    public enum CommitOutcome { Success, Redundant, Insufficient }
-
-    // relies on mutual exclusion for each key
-    public CommitOutcome commit(SafeCommandStore safeStore, Route<?> route, @Nullable RoutingKey progressKey, @Nullable PartialTxn partialTxn, Timestamp executeAt, PartialDeps partialDeps)
+    public static class PreAccepted extends AbstractCommand
     {
-        if (hasBeen(PreCommitted))
-        {
-            logger.trace("{}: skipping commit - already committed ({})", txnId(), status());
-            if (!executeAt.equals(executeAt()) || status() == Invalidated)
-                safeStore.agent().onInconsistentTimestamp(this, (status() == Invalidated ? Timestamp.NONE : this.executeAt()), executeAt);
+        private final Timestamp executeAt;
+        private final PartialTxn partialTxn;
+        private final @Nullable PartialDeps partialDeps;
 
-            if (hasBeen(Committed))
-                return CommitOutcome.Redundant;
+        private PreAccepted(CommonAttributes common, SaveStatus status, Timestamp executeAt, Ballot promised)
+        {
+            super(common, status, promised);
+            this.executeAt = executeAt;
+            this.partialTxn = common.partialTxn();
+            this.partialDeps = common.partialDeps();
         }
 
-        Ranges coordinateRanges = coordinateRanges(safeStore);
-        // TODO (expected, consider): consider ranges between coordinateRanges and executeRanges? Perhaps don't need them
-        Ranges executeRanges = executeRanges(safeStore, executeAt);
-        ProgressShard shard = progressShard(safeStore, route, progressKey, coordinateRanges);
-
-        if (!validate(coordinateRanges, executeRanges, shard, route, Check, partialTxn, Add, partialDeps, Set))
-            return CommitOutcome.Insufficient;
-
-        setExecuteAt(executeAt);
-        set(safeStore, coordinateRanges, executeRanges, shard, route, partialTxn, Add, partialDeps, Set);
+        @Override
+        public Command updateAttributes(CommonAttributes attrs, Ballot promised)
+        {
+            return new PreAccepted(attrs, saveStatus(), executeAt(), promised);
+        }
 
-        setStatus(Committed);
-        logger.trace("{}: committed with executeAt: {}, deps: {}", txnId(), executeAt, partialDeps);
-        populateWaitingOn(safeStore);
+        @Override
+        public boolean equals(Object o)
+        {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            if (!super.equals(o)) return false;
+            PreAccepted that = (PreAccepted) o;
+            return executeAt.equals(that.executeAt)
+                    && Objects.equals(partialTxn, that.partialTxn)
+                    && Objects.equals(partialDeps, that.partialDeps);
+        }
 
-        safeStore.progressLog().committed(this, shard);
+        @Override
+        public int hashCode()
+        {
+            int hash = super.hashCode();
+            hash = 31 * hash + Objects.hashCode(executeAt);
+            hash = 31 * hash + Objects.hashCode(partialTxn);
+            hash = 31 * hash + Objects.hashCode(partialDeps);
+            return hash;
+        }
 
-        // TODO (expected, safety): introduce intermediate status to avoid reentry when notifying listeners (which might notify us)
-        maybeExecute(safeStore, shard, true, true);
-        return CommitOutcome.Success;
-    }
+        public static PreAccepted preAccepted(CommonAttributes common, Timestamp executeAt, Ballot promised)
+        {
+            return new PreAccepted(common, SaveStatus.PreAccepted, executeAt, promised);
+        }
+        public static PreAccepted preAccepted(PreAccepted command, CommonAttributes common, Ballot promised)
+        {
+            checkPromised(command, promised);
+            checkSameClass(command, PreAccepted.class, "Cannot update");
+            Invariants.checkArgument(command.getClass() == PreAccepted.class);
+            return preAccepted(common, command.executeAt(), promised);
+        }
 
-    // relies on mutual exclusion for each key
-    public void precommit(SafeCommandStore safeStore, Timestamp executeAt)
-    {
-        if (hasBeen(PreCommitted))
+        @Override
+        public Timestamp executeAt()
         {
-            logger.trace("{}: skipping precommit - already committed ({})", txnId(), status());
-            if (executeAt.equals(executeAt()) && status() != Invalidated)
-                return;
+            return executeAt;
+        }
 
-            safeStore.agent().onInconsistentTimestamp(this, (status() == Invalidated ? Timestamp.NONE : this.executeAt()), executeAt);
+        @Override
+        public Ballot accepted()
+        {
+            return Ballot.ZERO;
         }
 
-        setExecuteAt(executeAt);
-        setStatus(PreCommitted);
-        notifyListeners(safeStore);
-        logger.trace("{}: precommitted with executeAt: {}", txnId(), executeAt);
-    }
+        @Override
+        public PartialTxn partialTxn()
+        {
+            return partialTxn;
+        }
 
-    protected void populateWaitingOn(SafeCommandStore safeStore)
-    {
-        Ranges ranges = safeStore.ranges().since(executeAt().epoch());
-        if (ranges != null)
-        {
-            partialDeps().forEach(ranges, txnId -> {
-                Command command = safeStore.ifLoaded(txnId);
-                if (command == null)
-                {
-                    addWaitingOnCommit(txnId);
-                    safeStore.addAndInvokeListener(txnId, this);
-                }
-                else
-                {
-                    switch (command.status())
-                    {
-                        default:
-                            throw new IllegalStateException();
-                        case NotWitnessed:
-                        case PreAccepted:
-                        case Accepted:
-                        case AcceptedInvalidate:
-                            // we don't know when these dependencies will execute, and cannot execute until we do
-                            command.addListener(this);
-                            addWaitingOnCommit(command.txnId());
-                            break;
-                        case PreCommitted:
-                        case Committed:
-                            // TODO (desired, efficiency): split into ReadyToRead and ReadyToWrite;
-                            //                             the distributed read can be performed as soon as those keys are ready,
-                            //                             and in parallel with any other reads. the client can even ACK immediately after;
-                            //                             only the write needs to be postponed until other in-progress reads complete
-                        case ReadyToExecute:
-                        case PreApplied:
-                        case Applied:
-                            command.addListener(this);
-                            insertPredecessor(command);
-                        case Invalidated:
-                            break;
-                    }
-                }
-            });
+        @Override
+        public @Nullable PartialDeps partialDeps()
+        {
+            return partialDeps;
         }
     }
 
-    // TODO (expected, ?): commitInvalidate may need to update cfks _if_ possible
-    public void commitInvalidate(SafeCommandStore safeStore)
+    public static class Accepted extends PreAccepted
     {
-        if (hasBeen(PreCommitted))
-        {
-            logger.trace("{}: skipping commit invalidated - already committed ({})", txnId(), status());
-            if (!hasBeen(Invalidated))
-                safeStore.agent().onInconsistentTimestamp(this, Timestamp.NONE, executeAt());
+        private final Ballot accepted;
 
-            return;
+        Accepted(CommonAttributes common, SaveStatus status, Timestamp executeAt, Ballot promised, Ballot accepted)
+        {
+            super(common, status, executeAt, promised);
+            this.accepted = accepted;
         }
 
-        ProgressShard shard = progressShard(safeStore);
-        safeStore.progressLog().invalidated(this, shard);
-        setExecuteAt(txnId());
-        if (partialDeps() == null)
-            setPartialDeps(PartialDeps.NONE);
-        setStatus(Invalidated);
-        logger.trace("{}: committed invalidated", txnId());
+        @Override
+        public Command updateAttributes(CommonAttributes attrs, Ballot promised)
+        {
+            return new Accepted(attrs, saveStatus(), executeAt(), promised, accepted());
+        }
 
-        notifyListeners(safeStore);
-    }
+        @Override
+        public boolean equals(Object o)
+        {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            if (!super.equals(o)) return false;
+            Accepted that = (Accepted) o;
+            return Objects.equals(accepted, that.accepted);
+        }
 
-    public enum ApplyOutcome { Success, Redundant, Insufficient }
+        @Override
+        public int hashCode()
+        {
+            int hash = super.hashCode();
+            hash = 31 * hash + Objects.hashCode(accepted);
+            return hash;
+        }
 
-    public ApplyOutcome apply(SafeCommandStore safeStore, long untilEpoch, Route<?> route, Timestamp executeAt, @Nullable PartialDeps partialDeps, Writes writes, Result result)
-    {
-        if (hasBeen(PreApplied) && executeAt.equals(this.executeAt()))
+        static Accepted accepted(CommonAttributes common, SaveStatus status, Timestamp executeAt, Ballot promised, Ballot accepted)
         {
-            logger.trace("{}: skipping apply - already executed ({})", txnId(), status());
-            return ApplyOutcome.Redundant;
+            return new Accepted(common, status, executeAt, promised, accepted);
         }
-        else if (hasBeen(PreCommitted) && !executeAt.equals(this.executeAt()))
+        static Accepted accepted(Accepted command, CommonAttributes common, SaveStatus status, Ballot promised)
         {
-            safeStore.agent().onInconsistentTimestamp(this, this.executeAt(), executeAt);
+            checkPromised(command, promised);
+            checkSameClass(command, Accepted.class, "Cannot update");
+            return new Accepted(common, status, command.executeAt(), promised, command.accepted());
         }
-
-        Ranges coordinateRanges = coordinateRanges(safeStore);
-        Ranges executeRanges = executeRanges(safeStore, executeAt);
-        if (untilEpoch < safeStore.latestEpoch())
+        static Accepted accepted(Accepted command, CommonAttributes common, Ballot promised)
         {
-            Ranges expectedRanges = safeStore.ranges().between(executeAt.epoch(), untilEpoch);
-            Invariants.checkState(expectedRanges.containsAll(executeRanges));
+            return accepted(command, common, command.saveStatus(), promised);
         }
-        ProgressShard shard = progressShard(safeStore, route, coordinateRanges);
-
-        if (!validate(coordinateRanges, executeRanges, shard, route, Check, null, Check, partialDeps, hasBeen(Committed) ? Add : TrySet))
-            return ApplyOutcome.Insufficient; // TODO (expected, consider): this should probably be an assertion failure if !TrySet
-
-        setWrites(writes);
-        setResult(result);
-        setExecuteAt(executeAt);
-        set(safeStore, coordinateRanges, executeRanges, shard, route, null, Check, partialDeps, hasBeen(Committed) ? Add : TrySet);
-
-        if (!hasBeen(Committed))
-            populateWaitingOn(safeStore);
-        setStatus(PreApplied);
-        logger.trace("{}: apply, status set to Executed with executeAt: {}, deps: {}", txnId(), executeAt, partialDeps);
-
-        safeStore.progressLog().executed(this, shard);
 
-        maybeExecute(safeStore, shard, true, true);
-        return ApplyOutcome.Success;
+        @Override
+        public Ballot accepted()
+        {
+            return accepted;
+        }
     }
 
-    @Override
-    public PreLoadContext listenerPreLoadContext(TxnId caller)
+    public static class Committed extends Accepted
     {
-        return PreLoadContext.contextFor(listOf(txnId(), caller));
-    }
+        private final ImmutableSortedSet<TxnId> waitingOnCommit;
+        private final ImmutableSortedMap<Timestamp, TxnId> waitingOnApply;
 
-    @Override
-    public void onChange(SafeCommandStore safeStore, Command command)
-    {
-        logger.trace("{}: updating as listener in response to change on {} with status {} ({})",
-                     txnId(), command.txnId(), command.status(), command);
-        switch (command.status())
+        private Committed(CommonAttributes common, SaveStatus status, Timestamp executeAt, Ballot promised, Ballot accepted, ImmutableSortedSet<TxnId> waitingOnCommit, ImmutableSortedMap<Timestamp, TxnId> waitingOnApply)
         {
-            default:
-                throw new IllegalStateException();
-            case NotWitnessed:
-            case PreAccepted:
-            case Accepted:
-            case AcceptedInvalidate:
-                break;
-
-            case PreCommitted:
-            case Committed:
-            case ReadyToExecute:
-            case PreApplied:
-            case Applied:
-            case Invalidated:
-                updatePredecessor(command);
-                maybeExecute(safeStore, progressShard(safeStore), false, true);
-                break;
+            super(common, status, executeAt, promised, accepted);
+            this.waitingOnCommit = waitingOnCommit;
+            this.waitingOnApply = waitingOnApply;
         }
-    }
 
-    protected void postApply(SafeCommandStore safeStore)
-    {
-        logger.trace("{} applied, setting status to Applied and notifying listeners", txnId());
-        setStatus(Applied);
-        notifyListeners(safeStore);
-    }
+        private Committed(CommonAttributes common, SaveStatus status, Timestamp executeAt, Ballot promised, Ballot accepted, WaitingOn waitingOn)
+        {
+            this(common, status, executeAt, promised, accepted, waitingOn.waitingOnCommit, waitingOn.waitingOnApply);
+        }
 
-    private static Function<SafeCommandStore, Void> callPostApply(TxnId txnId)
-    {
-        return safeStore -> {
-            safeStore.command(txnId).postApply(safeStore);
-            return null;
-        };
-    }
+        @Override
+        public Command updateAttributes(CommonAttributes attrs, Ballot promised)
+        {
+            return new Committed(attrs, saveStatus(), executeAt(), promised, accepted(), waitingOnCommit(), waitingOnApply());
+        }
 
-    protected Future<Void> apply(SafeCommandStore safeStore)
-    {
-        // important: we can't include a reference to *this* in the lambda, since the C* implementation may evict
-        // the command instance from memory between now and the write completing (and post apply being called)
-        CommandStore unsafeStore = safeStore.commandStore();
-        return writes().apply(safeStore).flatMap(unused ->
-            unsafeStore.submit(this, callPostApply(txnId()))
-        );
-    }
+        @Override
+        public boolean equals(Object o)
+        {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            if (!super.equals(o)) return false;
+            Committed committed = (Committed) o;
+            return Objects.equals(waitingOnCommit, committed.waitingOnCommit)
+                    && Objects.equals(waitingOnApply, committed.waitingOnApply);
+        }
 
-    public Future<Data> read(SafeCommandStore safeStore)
-    {
-        return partialTxn().read(safeStore, this);
-    }
+        @Override
+        public int hashCode()
+        {
+            int hash = super.hashCode();
+            hash = 31 * hash + Objects.hashCode(waitingOnCommit);
+            hash = 31 * hash + Objects.hashCode(waitingOnApply);
+            return hash;
+        }
 
-    // TODO (expected, API consistency): maybe split into maybeExecute and maybeApply?
-    private boolean maybeExecute(SafeCommandStore safeStore, ProgressShard shard, boolean alwaysNotifyListeners, boolean notifyWaitingOn)
-    {
-        if (logger.isTraceEnabled())
-            logger.trace("{}: Maybe executing with status {}. Will notify listeners on noop: {}", txnId(), status(), alwaysNotifyListeners);
+        private static Committed committed(Committed command, CommonAttributes common, Ballot promised, SaveStatus status, ImmutableSortedSet<TxnId> waitingOnCommit, ImmutableSortedMap<Timestamp, TxnId> waitingOnApply)
+        {
+            checkPromised(command, promised);
+            checkSameClass(command, Committed.class, "Cannot update");
+            return new Committed(common, status, command.executeAt(), promised, command.accepted(), waitingOnCommit, waitingOnApply);
+        }
 
-        if (status() != Committed && status() != PreApplied)
+        static Committed committed(Committed command, CommonAttributes common, Ballot promised)
         {
-            if (alwaysNotifyListeners)
-                notifyListeners(safeStore);
-            return false;
+            return committed(command, common, promised, command.saveStatus(), command.waitingOnCommit(), command.waitingOnApply());
         }
 
-        if (isWaitingOnDependency())
+        static Committed committed(Committed command, CommonAttributes common, SaveStatus status)
         {
-            if (alwaysNotifyListeners)
-                notifyListeners(safeStore);
+            return committed(command, common, command.promised(), status, command.waitingOnCommit(), command.waitingOnApply());
+        }
 
-            if (notifyWaitingOn)
-                new NotifyWaitingOn(this).accept(safeStore);
-            return false;
+        static Committed committed(Committed command, CommonAttributes common, WaitingOn waitingOn)
+        {
+            return committed(command, common, command.promised(), command.saveStatus(), waitingOn.waitingOnCommit, waitingOn.waitingOnApply);
         }
 
-        switch (status())
+        static Committed committed(CommonAttributes common, SaveStatus status, Timestamp executeAt, Ballot promised, Ballot accepted, ImmutableSortedSet<TxnId> waitingOnCommit, ImmutableSortedMap<Timestamp, TxnId> waitingOnApply)
         {
-            case Committed:
-                // TODO (desirable, efficiency): maintain distinct ReadyToRead and ReadyToWrite states
-                setStatus(ReadyToExecute);
-                logger.trace("{}: set to ReadyToExecute", txnId());
-                safeStore.progressLog().readyToExecute(this, shard);
-                notifyListeners(safeStore);
-                break;
+            return new Committed(common, status, executeAt, promised, accepted, waitingOnCommit, waitingOnApply);
+        }
 
-            case PreApplied:
-                Ranges executeRanges = executeRanges(safeStore, executeAt());
-                boolean intersects = writes().keys.intersects(executeRanges);
-
-                if (intersects)
-                {
-                    logger.trace("{}: applying", txnId());
-                    apply(safeStore);
-                }
-                else
-                {
-                    // TODO (desirable, performance): This could be performed immediately upon Committed
-                    //      but: if we later support transitive dependency elision this could be dangerous
-                    logger.trace("{}: applying no-op", txnId());
-                    setStatus(Applied);
-                    notifyListeners(safeStore);
-                }
-        }
-        return true;
-    }
+        public AsyncChain<Data> read(SafeCommandStore safeStore)
+        {
+            return partialTxn().read(safeStore, this);
+        }
 
-    /**
-     * @param dependency is either committed or invalidated
-     * @return true iff {@code maybeExecute} might now have a different outcome
-     */
-    private boolean updatePredecessor(Command dependency)
-    {
-        Invariants.checkState(dependency.hasBeen(PreCommitted));
-        if (dependency.hasBeen(Invalidated))
+        public WaitingOn waitingOn()
         {
-            logger.trace("{}: {} is invalidated. Stop listening and removing from waiting on commit set.", txnId(), dependency.txnId());
-            dependency.removeListener(this);
-            removeWaitingOnCommit(dependency.txnId());
-            return true;
+            return new WaitingOn(waitingOnCommit, waitingOnApply);
         }
-        else if (dependency.executeAt().compareTo(executeAt()) > 0)
+
+        public ImmutableSortedSet<TxnId> waitingOnCommit()
         {
-            // dependency cannot be a predecessor if it executes later
-            logger.trace("{}: {} executes after us. Stop listening and removing from waiting on apply set.", txnId(), dependency.txnId());
-            removeWaitingOn(dependency.txnId(), dependency.executeAt());
-            dependency.removeListener(this);
-            return true;
+            return waitingOnCommit;
         }
-        else if (dependency.hasBeen(Applied))
+
+        public boolean isWaitingOnCommit()
         {
-            logger.trace("{}: {} has been applied. Stop listening and removing from waiting on apply set.", txnId(), dependency.txnId());
-            removeWaitingOn(dependency.txnId(), dependency.executeAt());
-            dependency.removeListener(this);
-            return true;
+            return waitingOnCommit != null && !waitingOnCommit.isEmpty();
         }
-        else if (isWaitingOnDependency())
+
+        public TxnId firstWaitingOnCommit()
         {
-            logger.trace("{}: adding {} to waiting on apply set.", txnId(), dependency.txnId());
-            addWaitingOnApplyIfAbsent(dependency.txnId(), dependency.executeAt());
-            removeWaitingOnCommit(dependency.txnId());
-            return false;
+            return isWaitingOnCommit() ? waitingOnCommit.first() : null;
         }
-        else
+
+        public ImmutableSortedMap<Timestamp, TxnId> waitingOnApply()
         {
-            throw new IllegalStateException();
+            return waitingOnApply;
         }
-    }
 
-    private void insertPredecessor(Command dependency)
-    {
-        Invariants.checkState(dependency.hasBeen(PreCommitted));
-        if (dependency.hasBeen(Invalidated))
+        public boolean isWaitingOnApply()
         {
-            logger.trace("{}: {} is invalidated. Do not insert.", txnId(), dependency.txnId());
+            return waitingOnApply != null && !waitingOnApply.isEmpty();
         }
-        else if (dependency.executeAt().compareTo(executeAt()) > 0)
+
+        public TxnId firstWaitingOnApply()
         {
-            // dependency cannot be a predecessor if it executes later
-            logger.trace("{}: {} executes after us. Do not insert.", txnId(), dependency.txnId());
+            return isWaitingOnApply() ? waitingOnApply.firstEntry().getValue() : null;
         }
-        else if (dependency.hasBeen(Applied))
+
+        public boolean hasBeenWitnessed()
         {
-            logger.trace("{}: {} has been applied. Do not insert.", txnId(), dependency.txnId());
+            return partialTxn() != null;

Review Comment:
   Perhaps use `Known`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org