You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by "belliottsmith (via GitHub)" <gi...@apache.org> on 2023/02/08 16:02:28 UTC

[GitHub] [cassandra] belliottsmith commented on a diff in pull request #2144: CEP-15 Accord: Immutable state integration

belliottsmith commented on code in PR #2144:
URL: https://github.com/apache/cassandra/pull/2144#discussion_r1099286274


##########
src/java/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializer.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord.serializers;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import accord.impl.CommandsForKey.CommandLoader;
+import accord.local.Command;
+import accord.local.SaveStatus;
+import accord.primitives.PartialDeps;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.utils.Invariants;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.db.marshal.ByteBufferAccessor;
+import org.apache.cassandra.db.marshal.ValueAccessor;
+import org.apache.cassandra.io.LocalVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.service.accord.AccordSerializerVersion;
+
+public class CommandsForKeySerializer
+{
+    private static final LocalVersionedSerializer<PartialDeps> depsSerializer = new LocalVersionedSerializer<>(AccordSerializerVersion.CURRENT, AccordSerializerVersion.serializer, DepsSerializer.partialDeps);
+    public static final CommandLoader<ByteBuffer> loader = new AccordCFKLoader();
+    private static class AccordCFKLoader implements CommandLoader<ByteBuffer>
+    {
+        private static final int HAS_DEPS = 0x01;
+
+        private static final long FIXED_SIZE;
+        private static final int FLAG_OFFSET;
+        private static final int STATUS_OFFSET;
+        private static final int TXNID_OFFSET;
+        private static final int EXECUTEAT_OFFSET;
+        private static final int DEPS_OFFSET;
+
+        static
+        {
+            long size = 0;
+
+            FLAG_OFFSET = (int) size;
+            size += TypeSizes.BYTE_SIZE;
+
+            STATUS_OFFSET = (int) size;
+            size += TypeSizes.BYTE_SIZE;
+
+            TXNID_OFFSET = (int) size;
+            size += CommandSerializers.txnId.serializedSize();
+
+            EXECUTEAT_OFFSET = (int) size;
+            size += CommandSerializers.timestamp.serializedSize();
+
+            DEPS_OFFSET = (int) size;
+            FIXED_SIZE = size;
+        }
+
+        private int serializedSize(Command command)
+        {
+            return (int) (FIXED_SIZE + (command.partialDeps() != null ? depsSerializer.serializedSize(command.partialDeps()) : 0));
+        }
+
+        private static final ValueAccessor<ByteBuffer> accessor = ByteBufferAccessor.instance;
+
+        private static byte toByte(int v)
+        {
+            Invariants.checkArgument(v < Byte.MAX_VALUE);
+            return (byte) v;
+        }
+
+        private AccordCFKLoader() {}
+
+        @Override
+        public ByteBuffer saveForCFK(Command command)
+        {
+            int flags = 0;
+
+            PartialDeps deps = command.partialDeps();
+            if (deps != null)
+                flags |= HAS_DEPS;
+
+            int size = serializedSize(command);
+            ByteBuffer buffer = accessor.allocate(size);
+            accessor.putByte(buffer, FLAG_OFFSET, toByte(flags));
+            accessor.putByte(buffer, STATUS_OFFSET, toByte(command.status().ordinal()));
+            CommandSerializers.txnId.serialize(command.txnId(), buffer, accessor, TXNID_OFFSET);
+            CommandSerializers.timestamp.serialize(command.executeAt(), buffer, accessor, EXECUTEAT_OFFSET);
+            if (deps != null)

Review Comment:
   This looks to be a significant performance regression? We don't need to persist the whole deps, only the txnId that are relevant for this key.



##########
src/java/org/apache/cassandra/service/accord/AccordCommands.java:
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import accord.local.Command;
+import accord.primitives.TxnId;
+import org.apache.cassandra.service.accord.AccordStateCache.ItemAccessor;
+
+public class AccordCommands
+{
+    public static final ItemAccessor<TxnId, Command> ITEM_ACCESSOR = new ItemAccessor<TxnId, Command>()

Review Comment:
   Perhaps declare in its own class, as this is otherwise empty?



##########
src/java/org/apache/cassandra/service/accord/AccordStateCache.java:
##########
@@ -366,10 +409,9 @@ private V getOrCreate(K key, boolean createIfAbsent)
             {
                 stats.misses++;
                 AccordStateCache.this.stats.misses++;
-                if (!createIfAbsent)
+                if (loadFunction == null)
                     return null;
-                V value = factory.apply(key);
-                node = new Node<>(value);
+                node = new Node<>(key, new PendingLoad<>(key, loadFunction), accessor);

Review Comment:
   eviction should happen in this branch, right? Not point calling `maybeEvict` if its already in cache? 



##########
src/java/org/apache/cassandra/service/accord/async/AsyncWriter.java:
##########
@@ -76,35 +71,26 @@ public AsyncWriter(AccordCommandStore commandStore)
         this.cfkCache = commandStore.commandsForKeyCache();
     }
 
-    private interface StateMutationFunction<K, V extends AccordState<K>>
+    private interface StateMutationFunction<V>
     {
-        Mutation apply(AccordCommandStore commandStore, V state, long timestamp);
+        Mutation apply(AccordCommandStore commandStore, V previous, V updated, long timestamp);
     }
 
-    private static <K, V extends AccordState<K>> List<Future<?>> dispatchWrites(AsyncContext.Group<K, V> ctxGroup,
-                                                                                AccordStateCache.Instance<K, V> cache,
-                                                                                StateMutationFunction<K, V> mutationFunction,
-                                                                                long timestamp,
-                                                                                AccordCommandStore commandStore,
-                                                                                List<Future<?>> futures,
-                                                                                Object callback)
+    private static <K, V extends ImmutableState> List<AsyncChain<Void>> dispatchWrites(ImmutableMap<K, ContextValue<V>> values,
+                                                                                       AccordStateCache.Instance<K, V> cache,
+                                                                                       StateMutationFunction<V> mutationFunction,
+                                                                                       long timestamp,
+                                                                                       AccordCommandStore commandStore,
+                                                                                       List<AsyncChain<Void>> results,
+                                                                                       Object callback)
     {
-        for (V item : ctxGroup.items.values())
-        {
-            if (!item.hasModifications())
-            {
-                if (logger.isTraceEnabled())
-                    logger.trace("No modifications for {} for {}, {}", item.key(), callback, item);
-                continue;
-            }
-
-            if (futures == null)
-                futures = new ArrayList<>();
-            K key = item.key();
-            Mutation mutation = mutationFunction.apply(commandStore, item, timestamp);
+        values.forEach((key, value) -> {
+            if (value.original() == value.current())
+                return;
+            Mutation mutation = mutationFunction.apply(commandStore, value.original(), value.current(), timestamp);
             if (logger.isTraceEnabled())
-                logger.trace("Dispatching mutation for {} for {}, {} -> {}", key, callback, item, mutation);
-            Future<?> future = Stage.MUTATION.submit(() -> {
+                logger.trace("Dispatching mutation for {} for {}, {} -> {}", key, callback, value.current(), mutation);
+            AsyncResult<Void> result = ofRunnable(Stage.MUTATION.executor(), () -> {

Review Comment:
   Same here: do we want to submit a separate thread for each mutation? Particularly for writes this seems unnecessary, since we only usually write to memory and probably don't gain much from parallelism?



##########
src/java/org/apache/cassandra/service/accord/AccordKeyspace.java:
##########
@@ -320,187 +343,194 @@ private static <T> T deserializeOrNull(ByteBuffer bytes, LocalVersionedSerialize
         return bytes != null && ! ByteBufferAccessor.instance.isEmpty(bytes) ? deserialize(bytes, serializer) : null;
     }
 
-    private static NavigableMap<Timestamp, TxnId> deserializeWaitingOnApply(Map<ByteBuffer, ByteBuffer> serialized)
+    private static ImmutableSortedMap<Timestamp, TxnId> deserializeWaitingOnApply(Map<ByteBuffer, ByteBuffer> serialized)
     {
         if (serialized == null || serialized.isEmpty())
-            return new TreeMap<>();
+            return ImmutableSortedMap.of();
 
         NavigableMap<Timestamp, TxnId> result = new TreeMap<>();
         for (Map.Entry<ByteBuffer, ByteBuffer> entry : serialized.entrySet())
             result.put(deserializeTimestampOrNull(entry.getKey(), Timestamp::fromBits), deserializeTimestampOrNull(entry.getValue(), TxnId::fromBits));
-        return result;
+        return ImmutableSortedMap.copyOf(result);
     }
 
-    private static <T extends Timestamp, S extends Set<T>> S deserializeTimestampSet(Set<ByteBuffer> serialized, Supplier<S> setFactory, TimestampFactory<T> timestampFactory)
+    private static <T extends Timestamp> ImmutableSortedSet<T> deserializeTimestampSet(Set<ByteBuffer> serialized, TimestampFactory<T> timestampFactory)
     {
-        S result = setFactory.get();
         if (serialized == null || serialized.isEmpty())
-            return result;
+            return ImmutableSortedSet.of();
 
+        NavigableSet<T> result = new TreeSet<>();
         for (ByteBuffer bytes : serialized)
             result.add(deserializeTimestampOrNull(bytes, timestampFactory));
 
-        return result;
+        return ImmutableSortedSet.copyOf(result);
     }
 
-    private static NavigableSet<TxnId> deserializeTxnIdNavigableSet(UntypedResultSet.Row row, String name)
+    private static ImmutableSortedSet<TxnId> deserializeTxnIdNavigableSet(UntypedResultSet.Row row, String name)
     {
-        return deserializeTimestampSet(row.getSet(name, BytesType.instance), TreeSet::new, TxnId::fromBits);
+        return deserializeTimestampSet(row.getSet(name, BytesType.instance), TxnId::fromBits);
     }
 
-    private static DeterministicIdentitySet<ListenerProxy> deserializeListeners(Set<ByteBuffer> serialized) throws IOException
+    private static ImmutableSet<CommandListener> deserializeListeners(Set<ByteBuffer> serialized) throws IOException
     {
         if (serialized == null || serialized.isEmpty())
-            return new DeterministicIdentitySet<>();
-        DeterministicIdentitySet<ListenerProxy> result = new DeterministicIdentitySet<>();
+            return ImmutableSet.of();
+        ImmutableSet.Builder<CommandListener> result = ImmutableSet.builder();
         for (ByteBuffer bytes : serialized)
         {
-            result.add(ListenerProxy.deserialize(bytes, ByteBufferAccessor.instance, 0));
+            result.add(deserialize(bytes, CommandsSerializers.listeners));
         }
-        return result;
+        return result.build();
     }
 
-    private static DeterministicIdentitySet<ListenerProxy> deserializeListeners(UntypedResultSet.Row row, String name) throws IOException
+    private static ImmutableSet<CommandListener> deserializeListeners(UntypedResultSet.Row row, String name) throws IOException
     {
         return deserializeListeners(row.getSet(name, BytesType.instance));
     }
 
-    private static <K extends Comparable<?>, V> void addStoredMapChanges(Row.Builder builder,
-                                                                         ColumnMetadata column,
-                                                                         long timestamp,
-                                                                         int nowInSec,
-                                                                         StoredNavigableMap<K, V> map,
-                                                                         Function<K, ByteBuffer> serializeKey,
-                                                                         Function<V, ByteBuffer> serializeVal)
+    private interface SerializeFunction<V>
     {
-        if (map.wasCleared())
-        {
-            if (!map.hasAdditions())
-            {
-                builder.addComplexDeletion(column, new DeletionTime(timestamp, nowInSec));
-                return;
-            }
-            else
-                builder.addComplexDeletion(column, new DeletionTime(timestamp - 1, nowInSec));
-        }
-
-        map.forEachAddition((k, v) -> builder.addCell(live(column, timestamp, serializeVal.apply(v), CellPath.create(serializeKey.apply(k)))));
-
-        if (!map.wasCleared())
-            map.forEachDeletion(k -> builder.addCell(tombstone(column, timestamp, nowInSec, CellPath.create(serializeKey.apply(k)))));
+        ByteBuffer apply(V v) throws IOException;
     }
 
-    private static <T extends Comparable<?>> void addStoredSetChanges(Row.Builder builder,
-                                                                      ColumnMetadata column,
-                                                                      long timestamp,
-                                                                      int nowInSec,
-                                                                      StoredSet<T, ?> map,
-                                                                      Function<T, ByteBuffer> serialize)
+    private static <C, V> boolean valueModified(Function<C, V> get, C original, C current)
     {
-        if (map.wasCleared())
-        {
-            if (!map.hasAdditions())
-            {
-                builder.addComplexDeletion(column, new DeletionTime(timestamp, nowInSec));
-                return;
-            }
-            else
-                builder.addComplexDeletion(column, new DeletionTime(timestamp - 1, nowInSec));
-        }
+        V prev = original != null ? get.apply(original) : null;
+        V value = get.apply(current);
 
-        map.forEachAddition(i -> builder.addCell(live(column, timestamp, EMPTY_BYTE_BUFFER, CellPath.create(serialize.apply(i)))));
+        return prev != value;
+    }
 
-        if (!map.wasCleared())
-            map.forEachDeletion(k -> builder.addCell(tombstone(column, timestamp, nowInSec, CellPath.create(serialize.apply(k)))));
+    private static <C, V> void addCellIfModified(ColumnMetadata column, Function<C, V> get, SerializeFunction<V> serialize, Row.Builder builder, long timestampMicros, C original, C current) throws IOException
+    {
+        if (valueModified(get, original, current))
+            builder.addCell(live(column, timestampMicros, serialize.apply(get.apply(current))));
     }
 
-    public static Mutation getCommandMutation(AccordCommandStore commandStore, AccordCommand command, long timestampMicros)
+    private static <C extends Command, V> void addCellIfModified(ColumnMetadata column, Function<C, V> get, LocalVersionedSerializer<V> serializer, Row.Builder builder, long timestampMicros, C original, C command) throws IOException
     {
-        try
-        {
-            Preconditions.checkArgument(command.hasModifications());
+        addCellIfModified(column, get, v -> serializeOrNull(v, serializer), builder, timestampMicros, original, command);
+    }
 
-            // TODO: convert to byte arrays
-            ValueAccessor<ByteBuffer> accessor = ByteBufferAccessor.instance;
+    private static <C extends Command, V> void addKeyCellIfModified(ColumnMetadata column, Function<C, V> get, Row.Builder builder, long timestampMicros, C original, C command) throws IOException
+    {
+        addCellIfModified(column, get, v -> serializeOrNull((AccordRoutingKey) v, CommandsSerializers.routingKey), builder, timestampMicros, original, command);
+    }
 
-            Row.Builder builder = BTreeRow.unsortedBuilder();
-            builder.newRow(Clustering.EMPTY);
-            int nowInSeconds = (int) TimeUnit.MICROSECONDS.toSeconds(timestampMicros);
+    private static <C extends Command, V extends Enum<V>> void addEnumCellIfModified(ColumnMetadata column, Function<C, V> get, Row.Builder builder, long timestampMicros, C original, C command) throws IOException
+    {
+        // TODO: convert to byte arrays
+        ValueAccessor<ByteBuffer> accessor = ByteBufferAccessor.instance;
+        addCellIfModified(column, get, v -> accessor.valueOf(v.ordinal()), builder, timestampMicros, original, command);
+    }
 
+    private static <C, V> void addSetChanges(ColumnMetadata column, Function<C, Set<V>> get, SerializeFunction<V> serialize, Row.Builder builder, long timestampMicros, int nowInSec, C original, C command) throws IOException
+    {
+        Set<V> prev = original != null ? get.apply(original) : Collections.emptySet();
+        if (prev == null) prev = Collections.emptySet();
+        Set<V> value = get.apply(command);
+        if (value == null) value = Collections.emptySet();
 
-            if (command.status.hasModifications())
-                builder.addCell(live(CommandsColumns.status, timestampMicros, accessor.valueOf(command.status.get().ordinal())));
+        if (value.isEmpty() && !prev.isEmpty())
+        {
+            builder.addComplexDeletion(column, new DeletionTime(timestampMicros, nowInSec));
+            return;
+        }
 
-            if (command.homeKey.hasModifications())
-                builder.addCell(live(CommandsColumns.home_key, timestampMicros, serializeOrNull((AccordRoutingKey) command.homeKey.get(), CommandsSerializers.routingKey)));
+        for (V item : Sets.difference(value, prev))
+            builder.addCell(live(column, timestampMicros, EMPTY_BYTE_BUFFER, CellPath.create(serialize.apply(item))));
 
-            if (command.progressKey.hasModifications())
-                builder.addCell(live(CommandsColumns.progress_key, timestampMicros, serializeOrNull((AccordRoutingKey) command.progressKey.get(), CommandsSerializers.routingKey)));
+        for (V item : Sets.difference(prev, value))
+            builder.addCell(tombstone(column, timestampMicros, nowInSec, CellPath.create(serialize.apply(item))));
+    }
 
-            if (command.route.hasModifications())
-                builder.addCell(live(CommandsColumns.route, timestampMicros, serializeOrNull(command.route.get(), CommandsSerializers.route)));
+    private static <C, K, V> void addMapChanges(ColumnMetadata column, Function<C, Map<K, V>> get, SerializeFunction<K> serializeKey, SerializeFunction<V> serializeVal, Row.Builder builder, long timestampMicros, int nowInSec, C original, C command) throws IOException
+    {
+        Map<K, V> prev = original != null ? get.apply(original) : Collections.emptyMap();
+        if (prev == null) prev = Collections.emptyMap();
+        Map<K, V> value = get.apply(command);
+        if (value == null) value = Collections.emptyMap();
 
-            if (command.durability.hasModifications())
-                builder.addCell(live(CommandsColumns.durability, timestampMicros, accessor.valueOf(command.durability.get().ordinal())));
+        if (value.isEmpty() && !prev.isEmpty())
+        {
+            builder.addComplexDeletion(column, new DeletionTime(timestampMicros, nowInSec));
+            return;
+        }
 
-            if (command.partialTxn.hasModifications())
-                builder.addCell(live(CommandsColumns.txn, timestampMicros, serializeOrNull(command.partialTxn.get(), CommandsSerializers.partialTxn)));
+        for (Map.Entry<K, V> entry : value.entrySet())
+        {
+            K key = entry.getKey();
+            V pVal = prev.get(key);
+            if (pVal != null && pVal.equals(entry.getValue()))
+                continue;
+            builder.addCell(live(column, timestampMicros, serializeVal.apply(entry.getValue()), CellPath.create(serializeKey.apply(key))));
+        }
+        for (K key : Sets.difference(prev.keySet(), value.keySet()))
+            builder.addCell(tombstone(column, timestampMicros, nowInSec, CellPath.create(serializeKey.apply(key))));
+    }
 
-            if (command.kind.hasModifications() && command.kind.get() != null) // initialize sets hasModification(), and don't want to persist null
-                builder.addCell(live(CommandsColumns.kind, timestampMicros, accessor.valueOf(command.kind.get().ordinal())));
+    private static <K, V> int estimateMapChanges(Map<K, V> prev, Map<K, V> value)

Review Comment:
   Is this complexity worth it, to avoid resizing the `PartitionUpdate.Builder`?



##########
src/java/org/apache/cassandra/service/accord/AccordCommandsForKeys.java:
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import accord.impl.CommandsForKey;
+import accord.primitives.RoutableKey;
+import org.apache.cassandra.service.accord.AccordStateCache.ItemAccessor;
+
+public class AccordCommandsForKeys
+{
+    public static final ItemAccessor<RoutableKey, CommandsForKey> ITEM_ACCESSOR = new ItemAccessor<RoutableKey, CommandsForKey>()

Review Comment:
   Declare in its own class, or perhaps in AccordStateCache?



##########
src/java/org/apache/cassandra/service/accord/AccordKeyspace.java:
##########
@@ -320,187 +343,194 @@ private static <T> T deserializeOrNull(ByteBuffer bytes, LocalVersionedSerialize
         return bytes != null && ! ByteBufferAccessor.instance.isEmpty(bytes) ? deserialize(bytes, serializer) : null;
     }
 
-    private static NavigableMap<Timestamp, TxnId> deserializeWaitingOnApply(Map<ByteBuffer, ByteBuffer> serialized)
+    private static ImmutableSortedMap<Timestamp, TxnId> deserializeWaitingOnApply(Map<ByteBuffer, ByteBuffer> serialized)
     {
         if (serialized == null || serialized.isEmpty())
-            return new TreeMap<>();
+            return ImmutableSortedMap.of();
 
         NavigableMap<Timestamp, TxnId> result = new TreeMap<>();
         for (Map.Entry<ByteBuffer, ByteBuffer> entry : serialized.entrySet())
             result.put(deserializeTimestampOrNull(entry.getKey(), Timestamp::fromBits), deserializeTimestampOrNull(entry.getValue(), TxnId::fromBits));
-        return result;
+        return ImmutableSortedMap.copyOf(result);
     }
 
-    private static <T extends Timestamp, S extends Set<T>> S deserializeTimestampSet(Set<ByteBuffer> serialized, Supplier<S> setFactory, TimestampFactory<T> timestampFactory)
+    private static <T extends Timestamp> ImmutableSortedSet<T> deserializeTimestampSet(Set<ByteBuffer> serialized, TimestampFactory<T> timestampFactory)
     {
-        S result = setFactory.get();
         if (serialized == null || serialized.isEmpty())
-            return result;
+            return ImmutableSortedSet.of();
 
+        NavigableSet<T> result = new TreeSet<>();
         for (ByteBuffer bytes : serialized)
             result.add(deserializeTimestampOrNull(bytes, timestampFactory));
 
-        return result;
+        return ImmutableSortedSet.copyOf(result);
     }
 
-    private static NavigableSet<TxnId> deserializeTxnIdNavigableSet(UntypedResultSet.Row row, String name)
+    private static ImmutableSortedSet<TxnId> deserializeTxnIdNavigableSet(UntypedResultSet.Row row, String name)
     {
-        return deserializeTimestampSet(row.getSet(name, BytesType.instance), TreeSet::new, TxnId::fromBits);
+        return deserializeTimestampSet(row.getSet(name, BytesType.instance), TxnId::fromBits);
     }
 
-    private static DeterministicIdentitySet<ListenerProxy> deserializeListeners(Set<ByteBuffer> serialized) throws IOException
+    private static ImmutableSet<CommandListener> deserializeListeners(Set<ByteBuffer> serialized) throws IOException
     {
         if (serialized == null || serialized.isEmpty())
-            return new DeterministicIdentitySet<>();
-        DeterministicIdentitySet<ListenerProxy> result = new DeterministicIdentitySet<>();
+            return ImmutableSet.of();
+        ImmutableSet.Builder<CommandListener> result = ImmutableSet.builder();
         for (ByteBuffer bytes : serialized)
         {
-            result.add(ListenerProxy.deserialize(bytes, ByteBufferAccessor.instance, 0));
+            result.add(deserialize(bytes, CommandsSerializers.listeners));
         }
-        return result;
+        return result.build();
     }
 
-    private static DeterministicIdentitySet<ListenerProxy> deserializeListeners(UntypedResultSet.Row row, String name) throws IOException
+    private static ImmutableSet<CommandListener> deserializeListeners(UntypedResultSet.Row row, String name) throws IOException
     {
         return deserializeListeners(row.getSet(name, BytesType.instance));
     }
 
-    private static <K extends Comparable<?>, V> void addStoredMapChanges(Row.Builder builder,
-                                                                         ColumnMetadata column,
-                                                                         long timestamp,
-                                                                         int nowInSec,
-                                                                         StoredNavigableMap<K, V> map,
-                                                                         Function<K, ByteBuffer> serializeKey,
-                                                                         Function<V, ByteBuffer> serializeVal)
+    private interface SerializeFunction<V>
     {
-        if (map.wasCleared())
-        {
-            if (!map.hasAdditions())
-            {
-                builder.addComplexDeletion(column, new DeletionTime(timestamp, nowInSec));
-                return;
-            }
-            else
-                builder.addComplexDeletion(column, new DeletionTime(timestamp - 1, nowInSec));
-        }
-
-        map.forEachAddition((k, v) -> builder.addCell(live(column, timestamp, serializeVal.apply(v), CellPath.create(serializeKey.apply(k)))));
-
-        if (!map.wasCleared())
-            map.forEachDeletion(k -> builder.addCell(tombstone(column, timestamp, nowInSec, CellPath.create(serializeKey.apply(k)))));
+        ByteBuffer apply(V v) throws IOException;
     }
 
-    private static <T extends Comparable<?>> void addStoredSetChanges(Row.Builder builder,
-                                                                      ColumnMetadata column,
-                                                                      long timestamp,
-                                                                      int nowInSec,
-                                                                      StoredSet<T, ?> map,
-                                                                      Function<T, ByteBuffer> serialize)
+    private static <C, V> boolean valueModified(Function<C, V> get, C original, C current)
     {
-        if (map.wasCleared())
-        {
-            if (!map.hasAdditions())
-            {
-                builder.addComplexDeletion(column, new DeletionTime(timestamp, nowInSec));
-                return;
-            }
-            else
-                builder.addComplexDeletion(column, new DeletionTime(timestamp - 1, nowInSec));
-        }
+        V prev = original != null ? get.apply(original) : null;
+        V value = get.apply(current);
 
-        map.forEachAddition(i -> builder.addCell(live(column, timestamp, EMPTY_BYTE_BUFFER, CellPath.create(serialize.apply(i)))));
+        return prev != value;
+    }
 
-        if (!map.wasCleared())
-            map.forEachDeletion(k -> builder.addCell(tombstone(column, timestamp, nowInSec, CellPath.create(serialize.apply(k)))));
+    private static <C, V> void addCellIfModified(ColumnMetadata column, Function<C, V> get, SerializeFunction<V> serialize, Row.Builder builder, long timestampMicros, C original, C current) throws IOException
+    {
+        if (valueModified(get, original, current))
+            builder.addCell(live(column, timestampMicros, serialize.apply(get.apply(current))));
     }
 
-    public static Mutation getCommandMutation(AccordCommandStore commandStore, AccordCommand command, long timestampMicros)
+    private static <C extends Command, V> void addCellIfModified(ColumnMetadata column, Function<C, V> get, LocalVersionedSerializer<V> serializer, Row.Builder builder, long timestampMicros, C original, C command) throws IOException
     {
-        try
-        {
-            Preconditions.checkArgument(command.hasModifications());
+        addCellIfModified(column, get, v -> serializeOrNull(v, serializer), builder, timestampMicros, original, command);
+    }
 
-            // TODO: convert to byte arrays
-            ValueAccessor<ByteBuffer> accessor = ByteBufferAccessor.instance;
+    private static <C extends Command, V> void addKeyCellIfModified(ColumnMetadata column, Function<C, V> get, Row.Builder builder, long timestampMicros, C original, C command) throws IOException
+    {
+        addCellIfModified(column, get, v -> serializeOrNull((AccordRoutingKey) v, CommandsSerializers.routingKey), builder, timestampMicros, original, command);
+    }
 
-            Row.Builder builder = BTreeRow.unsortedBuilder();
-            builder.newRow(Clustering.EMPTY);
-            int nowInSeconds = (int) TimeUnit.MICROSECONDS.toSeconds(timestampMicros);
+    private static <C extends Command, V extends Enum<V>> void addEnumCellIfModified(ColumnMetadata column, Function<C, V> get, Row.Builder builder, long timestampMicros, C original, C command) throws IOException
+    {
+        // TODO: convert to byte arrays
+        ValueAccessor<ByteBuffer> accessor = ByteBufferAccessor.instance;
+        addCellIfModified(column, get, v -> accessor.valueOf(v.ordinal()), builder, timestampMicros, original, command);
+    }
 
+    private static <C, V> void addSetChanges(ColumnMetadata column, Function<C, Set<V>> get, SerializeFunction<V> serialize, Row.Builder builder, long timestampMicros, int nowInSec, C original, C command) throws IOException
+    {
+        Set<V> prev = original != null ? get.apply(original) : Collections.emptySet();
+        if (prev == null) prev = Collections.emptySet();
+        Set<V> value = get.apply(command);
+        if (value == null) value = Collections.emptySet();
 
-            if (command.status.hasModifications())
-                builder.addCell(live(CommandsColumns.status, timestampMicros, accessor.valueOf(command.status.get().ordinal())));
+        if (value.isEmpty() && !prev.isEmpty())
+        {
+            builder.addComplexDeletion(column, new DeletionTime(timestampMicros, nowInSec));
+            return;
+        }
 
-            if (command.homeKey.hasModifications())
-                builder.addCell(live(CommandsColumns.home_key, timestampMicros, serializeOrNull((AccordRoutingKey) command.homeKey.get(), CommandsSerializers.routingKey)));
+        for (V item : Sets.difference(value, prev))
+            builder.addCell(live(column, timestampMicros, EMPTY_BYTE_BUFFER, CellPath.create(serialize.apply(item))));
 
-            if (command.progressKey.hasModifications())
-                builder.addCell(live(CommandsColumns.progress_key, timestampMicros, serializeOrNull((AccordRoutingKey) command.progressKey.get(), CommandsSerializers.routingKey)));
+        for (V item : Sets.difference(prev, value))
+            builder.addCell(tombstone(column, timestampMicros, nowInSec, CellPath.create(serialize.apply(item))));
+    }
 
-            if (command.route.hasModifications())
-                builder.addCell(live(CommandsColumns.route, timestampMicros, serializeOrNull(command.route.get(), CommandsSerializers.route)));
+    private static <C, K, V> void addMapChanges(ColumnMetadata column, Function<C, Map<K, V>> get, SerializeFunction<K> serializeKey, SerializeFunction<V> serializeVal, Row.Builder builder, long timestampMicros, int nowInSec, C original, C command) throws IOException
+    {
+        Map<K, V> prev = original != null ? get.apply(original) : Collections.emptyMap();
+        if (prev == null) prev = Collections.emptyMap();
+        Map<K, V> value = get.apply(command);
+        if (value == null) value = Collections.emptyMap();
 
-            if (command.durability.hasModifications())
-                builder.addCell(live(CommandsColumns.durability, timestampMicros, accessor.valueOf(command.durability.get().ordinal())));
+        if (value.isEmpty() && !prev.isEmpty())
+        {
+            builder.addComplexDeletion(column, new DeletionTime(timestampMicros, nowInSec));
+            return;
+        }
 
-            if (command.partialTxn.hasModifications())
-                builder.addCell(live(CommandsColumns.txn, timestampMicros, serializeOrNull(command.partialTxn.get(), CommandsSerializers.partialTxn)));
+        for (Map.Entry<K, V> entry : value.entrySet())
+        {
+            K key = entry.getKey();
+            V pVal = prev.get(key);
+            if (pVal != null && pVal.equals(entry.getValue()))
+                continue;
+            builder.addCell(live(column, timestampMicros, serializeVal.apply(entry.getValue()), CellPath.create(serializeKey.apply(key))));
+        }
+        for (K key : Sets.difference(prev.keySet(), value.keySet()))
+            builder.addCell(tombstone(column, timestampMicros, nowInSec, CellPath.create(serializeKey.apply(key))));
+    }
 
-            if (command.kind.hasModifications() && command.kind.get() != null) // initialize sets hasModification(), and don't want to persist null
-                builder.addCell(live(CommandsColumns.kind, timestampMicros, accessor.valueOf(command.kind.get().ordinal())));
+    private static <K, V> int estimateMapChanges(Map<K, V> prev, Map<K, V> value)
+    {
+        return Sets.difference(value.keySet(), prev.keySet()).size() + Sets.difference(prev.keySet(), value.keySet()).size();
+    }
 
-            if (command.executeAt.hasModifications())
-                builder.addCell(live(CommandsColumns.execute_at, timestampMicros, serializeTimestamp(command.executeAt.get())));
+    private static <C, K, V> int estimateMapChanges(Function<C, Map<K, V>> get, C original, C command)
+    {
+        Map<K, V> prev = original != null ? get.apply(original) : Collections.emptyMap();
+        if (prev == null) prev = Collections.emptyMap();
+        Map<K, V> value = get.apply(command);
+        if (value == null) value = Collections.emptyMap();
+        return estimateMapChanges(prev, value);
+    }
 
-            if (command.promised.hasModifications())
-                builder.addCell(live(CommandsColumns.promised_ballot, timestampMicros, serializeTimestamp(command.promised.get())));
 
-            if (command.accepted.hasModifications())
-                builder.addCell(live(CommandsColumns.accepted_ballot, timestampMicros, serializeTimestamp(command.accepted.get())));
+    public static Mutation getCommandMutation(AccordCommandStore commandStore, Command original, Command command, long timestampMicros)
+    {
+        try
+        {
+            Preconditions.checkArgument(original != command);
+            if (original != null)
+            {
+                original.checkIsSuperseded();
+                original.markCleaningUp();
+            }
+            command.checkIsActive();
 
-            if (command.partialDeps.hasModifications())
-                builder.addCell(live(CommandsColumns.dependencies, timestampMicros, serializeOrNull(command.partialDeps.get(), CommandsSerializers.partialDeps)));
+            Row.Builder builder = BTreeRow.unsortedBuilder();
+            builder.newRow(Clustering.EMPTY);
+            int nowInSeconds = (int) TimeUnit.MICROSECONDS.toSeconds(timestampMicros);
 
-            if (command.writes.hasModifications())
-                builder.addCell(live(CommandsColumns.writes, timestampMicros, serialize(command.writes.get(), CommandsSerializers.writes)));
+            addEnumCellIfModified(CommandsColumns.status, Command::saveStatus, builder, timestampMicros, original, command);
+            addKeyCellIfModified(CommandsColumns.home_key, Command::homeKey, builder, timestampMicros, original, command);
+            addKeyCellIfModified(CommandsColumns.progress_key, Command::progressKey, builder, timestampMicros, original, command);
+            addCellIfModified(CommandsColumns.route, Command::route, CommandsSerializers.route, builder, timestampMicros, original, command);
+            addEnumCellIfModified(CommandsColumns.durability, Command::durability, builder, timestampMicros, original, command);
+            addCellIfModified(CommandsColumns.txn, Command::partialTxn, CommandsSerializers.partialTxn, builder, timestampMicros, original, command);
 
-            if (command.result.hasModifications())
-                builder.addCell(live(CommandsColumns.result, timestampMicros, serialize((TxnData) command.result.get(), CommandsSerializers.result)));
+            addCellIfModified(CommandsColumns.execute_at, Command::executeAt, AccordKeyspace::serializeTimestamp, builder, timestampMicros, original, command);
+            addCellIfModified(CommandsColumns.promised_ballot, Command::promised, AccordKeyspace::serializeTimestamp, builder, timestampMicros, original, command);
+            addCellIfModified(CommandsColumns.accepted_ballot, Command::accepted, AccordKeyspace::serializeTimestamp, builder, timestampMicros, original, command);
 
-            if (command.waitingOnCommit.hasModifications())
-            {
-                addStoredSetChanges(builder, CommandsColumns.waiting_on_commit,
-                                    timestampMicros, nowInSeconds, command.waitingOnCommit,
-                                    AccordKeyspace::serializeTimestamp);
-            }
+            addCellIfModified(CommandsColumns.dependencies, Command::partialDeps, CommandsSerializers.partialDeps, builder, timestampMicros, original, command);
 
-            if (command.blockingCommitOn.hasModifications())
-            {
-                addStoredSetChanges(builder, CommandsColumns.blocking_commit_on,
-                                    timestampMicros, nowInSeconds, command.blockingApplyOn,
-                                    AccordKeyspace::serializeTimestamp);
-            }
+            addSetChanges(CommandsColumns.listeners, cmd -> Sets.filter(cmd.listeners(), l -> !l.isTransient()), v -> serialize(v, CommandsSerializers.listeners), builder, timestampMicros, nowInSeconds, original, command);
 
-            if (command.waitingOnApply.hasModifications())
+            if (command.isCommitted())
             {
-                addStoredMapChanges(builder, CommandsColumns.waiting_on_apply,
-                                    timestampMicros, nowInSeconds, command.waitingOnApply,
-                                    AccordKeyspace::serializeTimestamp, AccordKeyspace::serializeTimestamp);
+                Command.Committed committed = command.asCommitted();
+                Command.Committed originalCommitted = original != null && original.isCommitted() ? original.asCommitted() : null;
+                addSetChanges(CommandsColumns.waiting_on_commit, Command.Committed::waitingOnCommit, AccordKeyspace::serializeTimestamp, builder, timestampMicros, nowInSeconds, originalCommitted, committed);
+                addMapChanges(CommandsColumns.waiting_on_apply, Command.Committed::waitingOnApply, AccordKeyspace::serializeTimestamp, AccordKeyspace::serializeTimestamp, builder, timestampMicros, nowInSeconds, originalCommitted, committed);
             }
 
-            if (command.blockingApplyOn.hasModifications())
+            if (command.isExecuted())

Review Comment:
   nest in isCommitted?



##########
src/java/org/apache/cassandra/service/accord/AccordStateCache.java:
##########
@@ -357,7 +400,7 @@ private V getOrCreate(K key, boolean createIfAbsent)
                 stats.hits++;
                 AccordStateCache.this.stats.hits++;
                 node.references++;
-                return node.value;
+                return node;
             }
 
             node = (Node<K, V>) cache.remove(key);

Review Comment:
   What do we gain from shuffling entries between `cache` and `active`?



##########
src/java/org/apache/cassandra/service/accord/AccordStateCache.java:
##########
@@ -151,14 +191,13 @@ public NamedMap(String name)
 
     public final Map<Object, Node<?, ?>> active = new HashMap<>();
     private final Map<Object, Node<?, ?>> cache = new HashMap<>();
-    private final Map<Object, WriteOnlyGroup<?, ?>> pendingWriteOnly = new HashMap<>();
     private final Set<Instance<?, ?>> instances = new HashSet<>();
 
-    private final NamedMap<Object, Future<?>> loadFutures = new NamedMap<>("loadFutures");
-    private final NamedMap<Object, Future<?>> saveFutures = new NamedMap<>("saveFutures");
+    private final NamedMap<Object, Node<?, ?>> pendingLoads = new NamedMap<>("pendingLoads");

Review Comment:
   unused?



##########
src/java/org/apache/cassandra/service/accord/AccordKeyspace.java:
##########
@@ -320,187 +343,194 @@ private static <T> T deserializeOrNull(ByteBuffer bytes, LocalVersionedSerialize
         return bytes != null && ! ByteBufferAccessor.instance.isEmpty(bytes) ? deserialize(bytes, serializer) : null;
     }
 
-    private static NavigableMap<Timestamp, TxnId> deserializeWaitingOnApply(Map<ByteBuffer, ByteBuffer> serialized)
+    private static ImmutableSortedMap<Timestamp, TxnId> deserializeWaitingOnApply(Map<ByteBuffer, ByteBuffer> serialized)
     {
         if (serialized == null || serialized.isEmpty())
-            return new TreeMap<>();
+            return ImmutableSortedMap.of();
 
         NavigableMap<Timestamp, TxnId> result = new TreeMap<>();
         for (Map.Entry<ByteBuffer, ByteBuffer> entry : serialized.entrySet())
             result.put(deserializeTimestampOrNull(entry.getKey(), Timestamp::fromBits), deserializeTimestampOrNull(entry.getValue(), TxnId::fromBits));
-        return result;
+        return ImmutableSortedMap.copyOf(result);
     }
 
-    private static <T extends Timestamp, S extends Set<T>> S deserializeTimestampSet(Set<ByteBuffer> serialized, Supplier<S> setFactory, TimestampFactory<T> timestampFactory)
+    private static <T extends Timestamp> ImmutableSortedSet<T> deserializeTimestampSet(Set<ByteBuffer> serialized, TimestampFactory<T> timestampFactory)
     {
-        S result = setFactory.get();
         if (serialized == null || serialized.isEmpty())
-            return result;
+            return ImmutableSortedSet.of();
 
+        NavigableSet<T> result = new TreeSet<>();

Review Comment:
   use a List or Builder?



##########
src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java:
##########
@@ -57,131 +59,94 @@
     private State state = State.INITIALIZED;
     private final AccordCommandStore commandStore;
 
-    private final Iterable<TxnId> txnIds;
-    private final Iterable<PartitionKey> keys;
+    private final List<TxnId> txnIds;
+    private final List<RoutableKey> keys;
 
-    protected Future<?> readFuture;
+    protected AsyncResult<?> readResult;
 
-    public AsyncLoader(AccordCommandStore commandStore, Iterable<TxnId> txnIds, Iterable<PartitionKey> keys)
+    public AsyncLoader(AccordCommandStore commandStore, Iterable<TxnId> txnIds, Iterable<RoutableKey> keys)
     {
         this.commandStore = commandStore;
-        this.txnIds = txnIds;
-        this.keys = keys;
+        this.txnIds = Lists.newArrayList(txnIds);
+        this.keys = Lists.newArrayList(keys);
     }
 
-    private <K, V extends AccordState<K>> Future<?> referenceAndDispatch(K key,
-                                                                         AccordStateCache.Instance<K, V> cache,
-                                                                         Map<K, V> context,
-                                                                         Function<V, Future<?>> readFunction,
-                                                                         Object callback)
-    {
-        V item;
-        Future<?> future = cache.getLoadFuture(key);
-        if (future != null)
-        {
-            // if a load future exists for this, it must be present in the cache
-            item = cache.getOrNull(key);
-            Preconditions.checkState(item != null);
-            context.put(key, item);
-            if (logger.isTraceEnabled())
-                logger.trace("Existing load future found for {} while loading for {}. ({})", item.key(), callback, item);
-            return future;
-        }
-
-        item = cache.getOrCreate(key);
-        context.put(key, item);
-        if (item.isLoaded())
-        {
-            if (logger.isTraceEnabled())
-                logger.trace("Cached item found for {} while loading for {}. ({})", item.key(), callback, item);
-            return null;
-        }
-
-        future = readFunction.apply(item);
-        cache.setLoadFuture(item.key(), future);
-        if (logger.isTraceEnabled())
-            logger.trace("Loading new item for {} while loading for {}. ({})", item.key(), callback, item);
-        return future;
-    }
-
-
-    private <K, V extends AccordState<K>> List<Future<?>> referenceAndDispatchReads(Iterable<K> keys,
+    private <K, V extends ImmutableState> List<AsyncChain<Void>> referenceAndDispatchReads(Iterable<K> keys,
                                                                                            AccordStateCache.Instance<K, V> cache,
-                                                                                           Map<K, V> context,
-                                                                                           Function<V, Future<?>> readFunction,
-                                                                                           List<Future<?>> futures,
+                                                                                           LoadFunction<K, V> loadFunction,
+                                                                                           List<AsyncChain<Void>> results,
                                                                                            Object callback)
     {
         for (K key : keys)
         {
-            Future<?> future = referenceAndDispatch(key, cache, context, readFunction, callback);
-            if (future == null)
+            AsyncResult<Void> result = cache.referenceAndLoad(key, loadFunction);
+            if (result == null)
                 continue;
 
-            if (futures == null)
-                futures = new ArrayList<>();
+            if (results == null)
+                results = new ArrayList<>();
 
-            futures.add(future);
+            results.add(result);
         }
 
-        return futures;
+        return results;
     }
 
     @VisibleForTesting
-    Function<AccordCommand, Future<?>> loadCommandFunction(Object callback)
+    LoadFunction<TxnId, Command> loadCommandFunction(Object callback)

Review Comment:
   Are we sure we want to parallelise the loading? It reduces latency, but potentially also reduces throughput. I feel like we probably want to bound the parallelism at least, so we don't end up processing one accord transaction per node at a time...



##########
src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java:
##########
@@ -57,131 +59,94 @@
     private State state = State.INITIALIZED;
     private final AccordCommandStore commandStore;
 
-    private final Iterable<TxnId> txnIds;
-    private final Iterable<PartitionKey> keys;
+    private final List<TxnId> txnIds;
+    private final List<RoutableKey> keys;
 
-    protected Future<?> readFuture;
+    protected AsyncResult<?> readResult;
 
-    public AsyncLoader(AccordCommandStore commandStore, Iterable<TxnId> txnIds, Iterable<PartitionKey> keys)
+    public AsyncLoader(AccordCommandStore commandStore, Iterable<TxnId> txnIds, Iterable<RoutableKey> keys)
     {
         this.commandStore = commandStore;
-        this.txnIds = txnIds;
-        this.keys = keys;
+        this.txnIds = Lists.newArrayList(txnIds);
+        this.keys = Lists.newArrayList(keys);
     }
 
-    private <K, V extends AccordState<K>> Future<?> referenceAndDispatch(K key,
-                                                                         AccordStateCache.Instance<K, V> cache,
-                                                                         Map<K, V> context,
-                                                                         Function<V, Future<?>> readFunction,
-                                                                         Object callback)
-    {
-        V item;
-        Future<?> future = cache.getLoadFuture(key);
-        if (future != null)
-        {
-            // if a load future exists for this, it must be present in the cache
-            item = cache.getOrNull(key);
-            Preconditions.checkState(item != null);
-            context.put(key, item);
-            if (logger.isTraceEnabled())
-                logger.trace("Existing load future found for {} while loading for {}. ({})", item.key(), callback, item);
-            return future;
-        }
-
-        item = cache.getOrCreate(key);
-        context.put(key, item);
-        if (item.isLoaded())
-        {
-            if (logger.isTraceEnabled())
-                logger.trace("Cached item found for {} while loading for {}. ({})", item.key(), callback, item);
-            return null;
-        }
-
-        future = readFunction.apply(item);
-        cache.setLoadFuture(item.key(), future);
-        if (logger.isTraceEnabled())
-            logger.trace("Loading new item for {} while loading for {}. ({})", item.key(), callback, item);
-        return future;
-    }
-
-
-    private <K, V extends AccordState<K>> List<Future<?>> referenceAndDispatchReads(Iterable<K> keys,
+    private <K, V extends ImmutableState> List<AsyncChain<Void>> referenceAndDispatchReads(Iterable<K> keys,
                                                                                            AccordStateCache.Instance<K, V> cache,
-                                                                                           Map<K, V> context,
-                                                                                           Function<V, Future<?>> readFunction,
-                                                                                           List<Future<?>> futures,
+                                                                                           LoadFunction<K, V> loadFunction,
+                                                                                           List<AsyncChain<Void>> results,
                                                                                            Object callback)

Review Comment:
   unused?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org