You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by GitBox <gi...@apache.org> on 2022/11/02 22:33:48 UTC

[GitHub] [cassandra] aweisberg commented on a diff in pull request #1951: Partial replication

aweisberg commented on code in PR #1951:
URL: https://github.com/apache/cassandra/pull/1951#discussion_r1012323742


##########
src/java/org/apache/cassandra/dht/RandomPartitioner.java:
##########
@@ -276,6 +276,14 @@ public Token increaseSlightly()
             return new BigIntegerToken(token.add(BigInteger.ONE));
         }
 
+        @Override
+        public Token decreaseSlightly()
+        {
+            if (token.equals(MINIMUM.token))

Review Comment:
   Increase doesn't have this protection and there is a maximum value



##########
src/java/org/apache/cassandra/service/accord/api/AccordKey.java:
##########
@@ -229,37 +79,38 @@ public boolean equals(Object o)
             if (this == o) return true;

Review Comment:
   Remove this equals method because the super class has it?



##########
src/java/org/apache/cassandra/service/accord/AccordCommand.java:
##########
@@ -524,22 +535,29 @@ public Timestamp executeAt()
     }
 
     @Override
-    public void executeAt(Timestamp timestamp)
+    public Txn.Kind kind()
+    {
+        PartialTxn txn = partialTxn.get();
+        return txn == null ? null : txn.kind();

Review Comment:
   Huh, this is allowed to return `null`? Nullable annotation?



##########
src/java/org/apache/cassandra/service/accord/db/AccordUpdate.java:
##########
@@ -896,20 +941,46 @@ private static <T> ByteBuffer serialize(T item, IVersionedSerializer<T> serializ
         }
     }
 
-    private static <T> ByteBuffer[] serialize(List<T> items, IVersionedSerializer<T> serializer)
+    private static <T> ByteBuffer[] serialize(Keys keys, List<T> items, Function<? super T, ? extends Key> toKey, IVersionedSerializer<T> serializer)

Review Comment:
   A bit odd this is called serialize when it's really populating the arrays and the arrays contain empty byte buffers when the same key appears multiple times.



##########
src/java/org/apache/cassandra/service/accord/AccordCommandStore.java:
##########
@@ -251,15 +284,51 @@ public void processBlocking(Runnable runnable)
     }
 
     @Override
-    public <T> Future<T> submit(PreLoadContext loadCtx, Function<? super CommandStore, T> function)
+    public <T> Future<T> submit(PreLoadContext loadCtx, Function<? super SafeCommandStore, T> function)
     {
         AsyncOperation<T> operation = AsyncOperation.create(this, loadCtx, function);
         executor.execute(operation);
         return operation;
     }
 
     @Override
-    public Future<Void> execute(PreLoadContext preLoadContext, Consumer<? super CommandStore> consumer)
+    public Agent agent()
+    {
+        return agent;
+    }
+
+    @Override
+    public ProgressLog progressLog()
+    {
+        return progressLog;
+    }
+
+    @Override
+    public RangesForEpoch ranges()
+    {
+        return rangesForEpoch;
+    }
+
+    @Override
+    public long latestEpoch()
+    {
+        return time.epoch();
+    }
+
+    @Override
+    public Timestamp maxConflict(Keys keys)

Review Comment:
   Isn't this identical to the function in InMemoryCommandStore? Does it need to be duplicated? A lot of these look identical to InMemoryCommandStore



##########
src/java/org/apache/cassandra/service/accord/AccordCommandStores.java:
##########
@@ -23,36 +23,45 @@
 import accord.api.Agent;
 import accord.api.DataStore;
 import accord.api.ProgressLog;
+import accord.local.AsyncCommandStores;
 import accord.local.CommandStore;
-import accord.local.CommandStores;
 import accord.local.Node;
+import accord.local.NodeTimeService;
 import org.apache.cassandra.concurrent.ExecutorFactory;
 import org.apache.cassandra.utils.ExecutorUtils;
 
-public class AccordCommandStores extends CommandStores
+public class AccordCommandStores extends AsyncCommandStores
 {
     private final ExecutorService[] executors;
 
     public AccordCommandStores(int numShards, Node node, Agent agent, DataStore store,
                                ProgressLog.Factory progressLogFactory)
     {
-        super(numShards, node, agent, store, progressLogFactory);
-        this.executors = new ExecutorService[numShards];
-        for (int i=0; i<numShards; i++)
-        {
-            executors[i] = ExecutorFactory.Global.executorFactory().sequential(CommandStore.class.getSimpleName() + '[' + node + ':' + i + ']');
-        }
+        this(numShards, node, agent, store, progressLogFactory, executors(node, numShards));
     }
 
-    @Override
-    protected CommandStore createCommandStore(int generation, int index, int numShards, Node node, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, CommandStore.RangesForEpoch rangesForEpoch)
+    private AccordCommandStores(int numShards, NodeTimeService time, Agent agent, DataStore store,
+                                ProgressLog.Factory progressLogFactory, ExecutorService[] executors)
+    {
+        super(numShards, time, agent, store, progressLogFactory,
+              (id, generation, index, numShards1, node1, agent1, store1, progressLogFactory1, rangesForEpoch)
+                -> new AccordCommandStore(id, generation, index, numShards1, node1, agent1, store1, progressLogFactory1, rangesForEpoch, executors[index]));

Review Comment:
   Should some of these parameters be bound into the lambda instead of being passed to the factory if they already known at this point?



##########
src/java/org/apache/cassandra/db/marshal/ValueAccessor.java:
##########
@@ -315,6 +315,24 @@ default boolean getBoolean(V value, int offset)
     long toLong(V value);
     /** returns a long from offset {@param offset} */
     long getLong(V value, int offset);
+
+    default long getUpTo8Bytes(V value, int offset, int count)

Review Comment:
   Unit test? Is this endian agnostic or does it matter? I am a bit rusty.



##########
src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java:
##########
@@ -42,6 +52,27 @@ public static long key(Key key)
         return ((AccordKey.PartitionKey) key).estimatedSizeOnHeap();
     }
 
+    public static long key(RoutingKey key)
+    {
+        return ((AccordRoutingKey) key).estimatedSizeOnHeap();
+    }
+
+    private static final long EMPTY_KEY_RANGE_SIZE = ObjectSizes.measure(TokenRange.fullRange(TableId.generate()));
+    public static long range(KeyRange range)
+    {
+        return EMPTY_KEY_RANGE_SIZE + key(range.start()) + key(range.end());
+    }
+
+    private static final long EMPTY_KEY_RANGES_SIZE = ObjectSizes.measure(KeyRanges.of());
+    public static long ranges(KeyRanges ranges)
+    {
+        long size = EMPTY_KEY_RANGES_SIZE;
+        size += ObjectSizes.sizeOfReferenceArray(ranges.size());
+        for (int i = 0, mi = ranges.size() ; i < mi ; i++)

Review Comment:
   Worth/possible to optimize for fixed size key ranges where this can just be multiplication?



##########
src/java/org/apache/cassandra/service/accord/async/AsyncWriter.java:
##########
@@ -201,7 +201,7 @@ AccordState<K> getForDenormalization(K key,
             return item;
 
         item = cache.getOrNull(key);
-        if (item != null)
+        if (item != null && !cache.hasLoadFuture(key))

Review Comment:
   Is this necessary to ensure we don't write to something that will end up being replaced when the cache loads?



##########
src/java/org/apache/cassandra/service/accord/AccordPartialCommand.java:
##########
@@ -244,26 +129,23 @@ public int hashCode()
 
             public static Version fromByte(byte b)
             {
-                switch (b)
-                {
-                    case 0:
-                        return VERSION_0;
-                    default:
-                        throw new IllegalArgumentException();
-                }
+                if (b == 0)
+                    return VERSION_0;
+                throw new IllegalArgumentException();
             }
         }
 
-        public void serialize(T command, DataOutputPlus out, Version version) throws IOException
+        public void serialize(AccordPartialCommand command, DataOutputPlus out, Version version) throws IOException

Review Comment:
   Could be private



##########
src/java/org/apache/cassandra/service/accord/AccordCommandsForKey.java:
##########
@@ -364,13 +374,13 @@ public void updateSummaries(AccordCommand command)
             if (command.status.previous() == null || !command.status.previous().hasBeen(Status.Committed))
                 uncommitted.map.blindRemove(command.txnId());
 
-            ByteBuffer bb = AccordPartialCommand.WithDeps.serializer.serialize(command);
-            committedById.map.blindPut(command.txnId(), bb);
-            committedByExecuteAt.map.blindPut(command.executeAt(), bb);
+            ByteBuffer partialCommand = AccordPartialCommand.serializer.serialize(key, command);
+            committedById.map.blindPut(command.txnId(), partialCommand);

Review Comment:
   Will sharing the same command twice confuse memory accounting?



##########
src/java/org/apache/cassandra/service/accord/TokenRange.java:
##########
@@ -20,33 +20,21 @@
 
 import java.io.IOException;
 
-import com.google.common.base.Preconditions;
-
-import accord.api.Key;
+import accord.api.RoutingKey;
 import accord.primitives.KeyRange;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.service.accord.api.AccordKey;
-import org.apache.cassandra.service.accord.api.AccordKey.SentinelKey;
-import org.apache.cassandra.service.accord.api.AccordKey.TokenKey;
+import org.apache.cassandra.service.accord.api.AccordRoutingKey;
+import org.apache.cassandra.service.accord.api.AccordRoutingKey.SentinelKey;
 
 public class TokenRange extends KeyRange.EndInclusive

Review Comment:
   Should be called AccordTokenRange since there is also one in C*?



##########
src/java/org/apache/cassandra/db/marshal/ValueAccessor.java:
##########
@@ -315,6 +315,24 @@ default boolean getBoolean(V value, int offset)
     long toLong(V value);
     /** returns a long from offset {@param offset} */
     long getLong(V value, int offset);
+
+    default long getUpTo8Bytes(V value, int offset, int count)
+    {
+        switch (count)
+        {
+            default: throw new IllegalArgumentException();
+            case 0: return 0;
+            case 1: return getByte(value, offset);
+            case 2: return getShort(value, offset);
+            case 3: return (getShort(value, offset) << 8) | getByte(value, offset + 2);
+            case 4: return getInt(value, offset);
+            case 5: return ((long)getInt(value, offset) << 8) | getByte(value, offset + 4);
+            case 6: return ((long)getInt(value, offset) << 16) | getShort(value, offset + 4);
+            case 7: return ((long)getInt(value, offset) << 24) | (getShort(value, offset + 4) << 8) | getByte(value, offset + 5);

Review Comment:
   This gets two bytes from offset + 4 then one byte from offset + 5 which overlaps



##########
src/java/org/apache/cassandra/service/accord/AccordCommand.java:
##########
@@ -633,40 +652,42 @@ private Future<Void> applyWithCorrectScope()
         return promise;
     }
 
-    private Future<Void> apply(boolean canReschedule)
+    private Future<Void> apply(SafeCommandStore safeStore, boolean canReschedule)
     {
-        Future<Void> future = cache().getWriteFuture(txnId);
+        AccordStateCache.Instance<TxnId, AccordCommand> cache = ((AccordCommandStore) safeStore).commandCache();
+        Future<Void> future = cache.getWriteFuture(txnId);
         if (future != null)
             return future;
 
         // this can be called via a listener callback, in which case we won't
         // have the appropriate commandsForKey in scope, so start a new operation
         // with the correct scope and notify the caller when that completes
-        if (!canApplyWithCurrentScope())
+        if (!canApplyWithCurrentScope(safeStore))
         {
             Preconditions.checkArgument(canReschedule);
-            return applyWithCorrectScope();
+            return applyWithCorrectScope(safeStore.commandStore());
         }
 
-        future = super.apply();
-        cache().setWriteFuture(txnId, future);
+        future = super.apply(safeStore);
+        cache.setWriteFuture(txnId, future);
         return future;
     }
 
     @Override
-    public Future<Void> apply()
+    public Future<Void> apply(SafeCommandStore safeStore)
     {
-        return apply(true);
+        return apply(safeStore, true);
     }
 
     @Override
-    public Future<Data> read(Keys scope)
+    public Future<Data> read(SafeCommandStore safeStore)
     {
-        ReadFuture future = cache().getReadFuture(txnId);
+        AccordStateCache.Instance<TxnId, AccordCommand> cache = ((AccordCommandStore) safeStore).commandCache();
+        Future<Data> future = cache.getReadFuture(txnId);
         if (future != null)
-            return future.scope.equals(scope) ? future : super.read(scope);
-        future = new ReadFuture(scope, super.read(scope));
-        cache().setReadFuture(txnId, future);
+            return future;

Review Comment:
   Why can the scope check be omitted now?



##########
src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java:
##########
@@ -144,8 +144,51 @@ public double size(Token next)
         @Override
         public Token increaseSlightly()
         {
-            throw new UnsupportedOperationException(String.format("Token type %s does not support token allocation.",
-                                                                  getClass().getSimpleName()));
+            // find first byte we can increment
+            int i = token.length - 1;
+            while (i >= 0)
+            {
+                if (token[i] != -1)
+                    break;
+                --i;
+            }
+            if (i == -1)
+                return new BytesToken(Arrays.copyOf(token, token.length + 1));
+
+            // increment and fill remainder with zeros
+            byte[] newToken = token.clone();
+            ++newToken[i];
+            Arrays.fill(newToken, i + 1, newToken.length, (byte)0);
+            return new BytesToken(newToken);
+        }
+
+        @Override
+        public Token decreaseSlightly()
+        {
+            if (token.length == 0)
+                throw new IndexOutOfBoundsException("Cannot create a smaller token the MINIMUM");
+
+            // find first byte we can increment

Review Comment:
   ```suggestion
               // find first byte we can decrement
   ```



##########
src/java/org/apache/cassandra/service/accord/AccordKeyspace.java:
##########
@@ -692,7 +691,7 @@ private static DecoratedKey makeKey(AccordCommandsForKey cfk)
         return makeKey(cfk.commandStore(), cfk.key());
     }
 
-    public static Mutation getCommandsForKeyMutation(AccordCommandsForKey cfk, long timestampMicros)
+    public static Mutation getCommandsForKeyMutation(AccordCommandStore commandStore, AccordCommandsForKey cfk, long timestampMicros)

Review Comment:
   Unused?



##########
src/java/org/apache/cassandra/service/accord/db/AbstractKeyIndexed.java:
##########
@@ -89,99 +113,126 @@ public boolean equals(Object o)
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
         AbstractKeyIndexed<?> that = (AbstractKeyIndexed<?>) o;
-        return serialized.equals(that.serialized);
+        return Arrays.equals(serialized, that.serialized);
     }
 
     @Override
     public int hashCode()
     {
-        return Objects.hash(serialized);
+        return Arrays.hashCode(serialized);
     }
 
     @Override
     public String toString()
     {
-        return getClass().getSimpleName() + serialized.entrySet().stream()
-                         .map(e -> e.getKey() + "=" + deserialize(e.getValue()))
-                         .collect(Collectors.joining(", ", "{", "}"));
+        return getClass().getSimpleName() + IntStream.range(0, keys.size())
+                                                     .mapToObj(i -> keys.get(i) + "=" + deserialize(serialized[i]))
+                                                     .collect(Collectors.joining(", ", "{", "}"));
+    }
+
+    protected AbstractKeyIndexed(Keys keys, ByteBuffer[] serialized)
+    {
+        this.keys = keys;
+        this.serialized = serialized;
     }
 
     public AbstractKeyIndexed(List<T> items, Function<T, PartitionKey> keyFunction)
     {
-        this.serialized = new TreeMap<>();
+        Key[] keys = new Key[items.size()];
+        for (int i=0, mi=items.size(); i<mi; i++)
+            keys[i] = keyFunction.apply(items.get(i));
+        this.keys = Keys.of(keys);
+        this.serialized = new ByteBuffer[items.size()];
         for (int i=0, mi=items.size(); i<mi; i++)
+            serialized[this.keys.indexOf(keyFunction.apply(items.get(i)))] = serialize(items.get(i));
+    }
+
+    protected <V> V slice(KeyRanges ranges, BiFunction<Keys, ByteBuffer[], V> constructor)
+    {
+        // TODO: Routables patch permits us to do this more efficiently
+        Keys keys = this.keys.slice(ranges);
+        ByteBuffer[] serialized = new ByteBuffer[keys.size()];
+        int j = 0;
+        for (int i = 0 ; i < keys.size() ; ++i)
         {
-            T item = items.get(i);
-            PartitionKey key = keyFunction.apply(item);
-            // TODO: support multiple reads/writes per key
-            Preconditions.checkArgument(!this.serialized.containsKey(key));
-            this.serialized.put(key, serialize(item));
+            j = this.keys.findNext(keys.get(i), j);
+            serialized[i] = this.serialized[j++];
         }
+        return constructor.apply(keys, serialized);
     }
 
-    public AbstractKeyIndexed(NavigableMap<PartitionKey, ByteBuffer> serialized)
+    public <V> V merge(AbstractKeyIndexed<?> that, BiFunction<Keys, ByteBuffer[], V> constructor)
     {
-        this.serialized = serialized;
+        // TODO: special method for linear merging keyed and non-keyed lists simultaneously
+        Keys keys = this.keys.union(that.keys);
+        ByteBuffer[] serialized = new ByteBuffer[keys.size()];
+        int i = 0, j = 0, o = 0;
+        while (i < this.keys.size() && j < that.keys.size())
+        {
+            int c = this.keys.get(i).compareTo(that.keys.get(j));
+            if (c < 0) serialized[o++] = this.serialized[i++];
+            else if (c > 0) serialized[o++] = that.serialized[j++];
+            else { serialized[o++] = this.serialized[i++]; j++; }
+        }
+        while (i < this.keys.size())
+            serialized[o++] = this.serialized[i++];
+        while (j < that.keys.size())
+            serialized[o++] = that.serialized[j++];
+        return constructor.apply(keys, serialized);
     }
 
     public T getDeserialized(PartitionKey key)
     {
-        ByteBuffer bytes = serialized.get(key);
-        if (bytes == null)
-            return null;
-        return deserialize(bytes);
+        int i = keys.indexOf(key);
+        if (i < 0) return null;
+        return deserialize(serialized[i]);
     }
 
     public long estimatedSizeOnHeap()
     {
         long size = emptySizeOnHeap();
-        for (Map.Entry<PartitionKey, ByteBuffer> entry : serialized.entrySet())
-        {
-            size += entry.getKey().estimatedSizeOnHeap();
-            size += ByteBufferUtil.EMPTY_SIZE_ON_HEAP + ByteBufferAccessor.instance.size(entry.getValue());
-        }
+        for (Key key : keys) size += ((PartitionKey) key).estimatedSizeOnHeap();

Review Comment:
   Using something from `AccordObjectSizes`?



##########
src/java/org/apache/cassandra/service/accord/AccordCommand.java:
##########
@@ -116,53 +119,52 @@ boolean isReadOnly()
         }
     }
 
-    private final AccordCommandStore commandStore;
     private final TxnId txnId;
     private final int instanceCount = INSTANCE_COUNTER.getAndIncrement();
-    public final StoredValue<Key> homeKey;
-    public final StoredValue<Key> progressKey;
-    public final StoredValue<Txn> txn;
+    public final StoredValue<AbstractRoute> route;
+    public final StoredValue<RoutingKey> homeKey;
+    public final StoredValue<RoutingKey> progressKey;
+    public final StoredValue<PartialTxn> partialTxn;
     public final StoredValue<Ballot> promised;
     public final StoredValue<Ballot> accepted;
     public final StoredValue<Timestamp> executeAt;
-    public final StoredValue<Deps> deps;
+    public final StoredValue<PartialDeps> partialDeps;
     public final StoredValue<Writes> writes;
     public final StoredValue<Result> result;
 
     public final StoredValue.HistoryPreserving<Status> status;
-    public final StoredBoolean isGloballyPersistent;
+    public final StoredValue<Durability> durability;
 
-    public final StoredNavigableMap<TxnId, ByteBuffer> waitingOnCommit;
-    public final StoredNavigableMap<TxnId, ByteBuffer> waitingOnApply;
+    public final StoredSet.Navigable<TxnId> waitingOnCommit;
+    public final StoredNavigableMap<Timestamp, TxnId> waitingOnApply;
+    public final StoredSet.Navigable<TxnId> blockingCommitOn;
+    public final StoredSet.Navigable<TxnId> blockingApplyOn;
 
     public final StoredSet.DeterministicIdentity<ListenerProxy> storedListeners;
     private final Listeners transientListeners;
 
-    public final StoredSet.Navigable<TxnId> blockingCommitOn;
-    public final StoredSet.Navigable<TxnId> blockingApplyOn;
-
-    public AccordCommand(AccordCommandStore commandStore, TxnId txnId)
+    public AccordCommand(TxnId txnId)
     {
         logger.trace("Instantiating new command {} @ {}", txnId, instanceHash());
-        this.commandStore = commandStore;
         this.txnId = txnId;
-        homeKey = new StoredValue<>(kind());
-        progressKey = new StoredValue<>(kind());
-        txn = new StoredValue<>(kind());
-        promised = new StoredValue<>(kind());
-        accepted = new StoredValue<>(kind());
-        executeAt = new StoredValue<>(kind());
-        deps = new StoredValue<>(kind());
-        writes = new StoredValue<>(kind());
-        result = new StoredValue<>(kind());
-        status = new StoredValue.HistoryPreserving<>(kind());
-        isGloballyPersistent = new StoredBoolean(kind());
-        waitingOnCommit = new StoredNavigableMap<>(kind());
-        waitingOnApply = new StoredNavigableMap<>(kind());
-        storedListeners = new StoredSet.DeterministicIdentity<>(kind());
+        homeKey = new StoredValue<>(rw());

Review Comment:
   Why hardcoding to `rw()`?



##########
src/java/org/apache/cassandra/service/accord/AccordCommandStore.java:
##########
@@ -203,7 +203,10 @@ public Command ifLoaded(TxnId txnId)
     {
         AccordCommand command = commandCache.getOrNull(txnId);
         if (command != null && command.isLoaded())
+        {
+            getContext().commands.add(command);

Review Comment:
   What issue is this fixing? The context otherwise wouldn't be populated along with the cache?



##########
src/java/org/apache/cassandra/service/accord/db/AbstractKeyIndexed.java:
##########
@@ -89,99 +113,126 @@ public boolean equals(Object o)
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
         AbstractKeyIndexed<?> that = (AbstractKeyIndexed<?>) o;
-        return serialized.equals(that.serialized);
+        return Arrays.equals(serialized, that.serialized);
     }
 
     @Override
     public int hashCode()
     {
-        return Objects.hash(serialized);
+        return Arrays.hashCode(serialized);
     }
 
     @Override
     public String toString()
     {
-        return getClass().getSimpleName() + serialized.entrySet().stream()
-                         .map(e -> e.getKey() + "=" + deserialize(e.getValue()))
-                         .collect(Collectors.joining(", ", "{", "}"));
+        return getClass().getSimpleName() + IntStream.range(0, keys.size())
+                                                     .mapToObj(i -> keys.get(i) + "=" + deserialize(serialized[i]))
+                                                     .collect(Collectors.joining(", ", "{", "}"));
+    }
+
+    protected AbstractKeyIndexed(Keys keys, ByteBuffer[] serialized)
+    {
+        this.keys = keys;
+        this.serialized = serialized;
     }
 
     public AbstractKeyIndexed(List<T> items, Function<T, PartitionKey> keyFunction)
     {
-        this.serialized = new TreeMap<>();
+        Key[] keys = new Key[items.size()];
+        for (int i=0, mi=items.size(); i<mi; i++)
+            keys[i] = keyFunction.apply(items.get(i));
+        this.keys = Keys.of(keys);
+        this.serialized = new ByteBuffer[items.size()];
         for (int i=0, mi=items.size(); i<mi; i++)
+            serialized[this.keys.indexOf(keyFunction.apply(items.get(i)))] = serialize(items.get(i));
+    }
+
+    protected <V> V slice(KeyRanges ranges, BiFunction<Keys, ByteBuffer[], V> constructor)
+    {
+        // TODO: Routables patch permits us to do this more efficiently
+        Keys keys = this.keys.slice(ranges);
+        ByteBuffer[] serialized = new ByteBuffer[keys.size()];
+        int j = 0;
+        for (int i = 0 ; i < keys.size() ; ++i)
         {
-            T item = items.get(i);
-            PartitionKey key = keyFunction.apply(item);
-            // TODO: support multiple reads/writes per key
-            Preconditions.checkArgument(!this.serialized.containsKey(key));
-            this.serialized.put(key, serialize(item));
+            j = this.keys.findNext(keys.get(i), j);
+            serialized[i] = this.serialized[j++];
         }
+        return constructor.apply(keys, serialized);
     }
 
-    public AbstractKeyIndexed(NavigableMap<PartitionKey, ByteBuffer> serialized)
+    public <V> V merge(AbstractKeyIndexed<?> that, BiFunction<Keys, ByteBuffer[], V> constructor)
     {
-        this.serialized = serialized;
+        // TODO: special method for linear merging keyed and non-keyed lists simultaneously
+        Keys keys = this.keys.union(that.keys);
+        ByteBuffer[] serialized = new ByteBuffer[keys.size()];
+        int i = 0, j = 0, o = 0;
+        while (i < this.keys.size() && j < that.keys.size())
+        {
+            int c = this.keys.get(i).compareTo(that.keys.get(j));
+            if (c < 0) serialized[o++] = this.serialized[i++];
+            else if (c > 0) serialized[o++] = that.serialized[j++];
+            else { serialized[o++] = this.serialized[i++]; j++; }
+        }
+        while (i < this.keys.size())
+            serialized[o++] = this.serialized[i++];
+        while (j < that.keys.size())
+            serialized[o++] = that.serialized[j++];
+        return constructor.apply(keys, serialized);
     }
 
     public T getDeserialized(PartitionKey key)
     {
-        ByteBuffer bytes = serialized.get(key);
-        if (bytes == null)
-            return null;
-        return deserialize(bytes);
+        int i = keys.indexOf(key);
+        if (i < 0) return null;
+        return deserialize(serialized[i]);
     }
 
     public long estimatedSizeOnHeap()
     {
         long size = emptySizeOnHeap();
-        for (Map.Entry<PartitionKey, ByteBuffer> entry : serialized.entrySet())
-        {
-            size += entry.getKey().estimatedSizeOnHeap();
-            size += ByteBufferUtil.EMPTY_SIZE_ON_HEAP + ByteBufferAccessor.instance.size(entry.getValue());
-        }
+        for (Key key : keys) size += ((PartitionKey) key).estimatedSizeOnHeap();
+        for (ByteBuffer buffer : serialized) size += ByteBufferUtil.EMPTY_SIZE_ON_HEAP + ByteBufferAccessor.instance.size(buffer);

Review Comment:
   `ObjectSizes.sizeOnHeapOf`
   Also I think this is missing the reference array?
   
   I think there may even be a helper for size of an array of ByteBuffers.



##########
src/java/org/apache/cassandra/service/accord/db/AccordUpdate.java:
##########
@@ -689,22 +695,13 @@ public long serializedBodySize(IncrementingUpdate update, int version)
         };
     }
 
-    private static Keys keysFrom(List<AbstractUpdate> updates, List<UpdatePredicate> predicates)
-    {
-        Set<Key> keys = new HashSet<>();
-        for (AbstractUpdate update : updates)
-            keys.add(update.partitionKey());
-        for (UpdatePredicate predicate : predicates)
-            keys.add(new PartitionKey(predicate.table.id, predicate.key));
-
-        return new Keys(keys);
-    }
-
     public AccordUpdate(List<AbstractUpdate> updates, List<UpdatePredicate> predicates)

Review Comment:
   Modifies its inputs, which for efficiency is of course good. It's only used from `AccordTxnBuilder` so seems fine.



##########
src/java/org/apache/cassandra/service/accord/async/AsyncWriter.java:
##########
@@ -226,21 +227,38 @@ private void denormalize(AccordCommand command, AsyncContext context, Object cal
 
         // notify commands we're waiting on that they need to update the summaries in our maps
         if (command.waitingOnCommit.hasModifications())
+        {
             denormalizeBlockedOn(command, context, cmd -> cmd.waitingOnCommit, cmd -> cmd.blockingCommitOn);
+        }
         if (command.waitingOnApply.hasModifications())
-            denormalizeBlockedOn(command, context, cmd -> cmd.waitingOnApply, cmd -> cmd.blockingApplyOn);
+        {
+            denormalizeBlockedOn(command, context, cmd -> new StoredSet.Changes<TxnId>()
+            {
+                @Override
+                public void forEachAddition(Consumer<TxnId> consumer)
+                {
+                    cmd.waitingOnApply.forEachAddition((ignore, txnId) -> consumer.accept(txnId));
+                }
+
+                @Override
+                public void forEachDeletion(Consumer<TxnId> consumer)
+                {
+                    cmd.waitingOnApply.forEachDeletion((ignore, txnId) -> consumer.accept(txnId));
+
+                }
+            }, cmd -> cmd.blockingApplyOn);
+        }
 
         if (command.shouldUpdateDenormalizedWaitingOn())
         {
-            ByteBuffer summary = AccordPartialCommand.serializer.serialize(command);
-            denormalizeWaitingOnSummaries(command, context, summary, cmd -> cmd.waitingOnCommit, cmd -> cmd.blockingCommitOn);
-            denormalizeWaitingOnSummaries(command, context, summary, cmd -> cmd.waitingOnApply, cmd -> cmd.blockingApplyOn);
+            denormalizeWaitingOnSummaries(command, context, cmd -> (txnId, ignore) -> cmd.waitingOnCommit.blindAdd(txnId), cmd -> cmd.blockingCommitOn);

Review Comment:
   I don't even know what it means to have multiple `->` chained. A lambda that returns a lambda?



##########
src/java/org/apache/cassandra/service/accord/AccordPartialCommand.java:
##########
@@ -244,26 +129,23 @@ public int hashCode()
 
             public static Version fromByte(byte b)
             {
-                switch (b)
-                {
-                    case 0:
-                        return VERSION_0;
-                    default:
-                        throw new IllegalArgumentException();
-                }
+                if (b == 0)
+                    return VERSION_0;
+                throw new IllegalArgumentException();
             }
         }
 
-        public void serialize(T command, DataOutputPlus out, Version version) throws IOException
+        public void serialize(AccordPartialCommand command, DataOutputPlus out, Version version) throws IOException
         {
             out.write(version.version);
             CommandSerializers.txnId.serialize(command.txnId(), out, version.msg_version);
-            CommandSerializers.status.serialize(command.status(), out, version.msg_version);
-            serializeNullable(command.txn(), out, version.msg_version, CommandSerializers.txn);
             serializeNullable(command.executeAt(), out, version.msg_version, CommandSerializers.timestamp);
+            CommandSerializers.status.serialize(command.status(), out, version.msg_version);
+            serializeNullable(command.kind(), out, version.msg_version, CommandSerializers.kind);
+            CollectionSerializer.serializeCollection(CommandSerializers.txnId, command.deps, out, version.msg_version);
         }
 
-        public ByteBuffer serialize(T command)
+        public ByteBuffer serialize(AccordPartialCommand command)

Review Comment:
   Could be private



##########
src/java/org/apache/cassandra/service/accord/serializers/EnumSerializer.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord.serializers;
+
+import java.io.IOException;
+
+import accord.messages.SimpleReply;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+public class EnumSerializer<E extends Enum<E>> implements IVersionedSerializer<E>
+{
+    public static final EnumSerializer<SimpleReply> simpleReply = new EnumSerializer<>(SimpleReply.class);
+
+    // TODO: should use something other than ordinal for ser/deser
+    final E[] values;
+
+    public EnumSerializer(Class<E> clazz)
+    {
+        this.values = clazz.getEnumConstants();
+    }
+
+    @Override
+    public void serialize(E t, DataOutputPlus out, int version) throws IOException
+    {
+        out.writeUnsignedVInt(t.ordinal());
+    }
+
+    @Override
+    public E deserialize(DataInputPlus in, int version) throws IOException
+    {
+        return values[(int)in.readUnsignedVInt()];
+    }
+
+    @Override
+    public long serializedSize(E t, int version)
+    {
+        return TypeSizes.sizeofUnsignedVInt(t.ordinal());

Review Comment:
   In practice most enums are 1 byte and this could be hard coded for many enums.



##########
src/java/org/apache/cassandra/service/accord/AccordKeyspace.java:
##########
@@ -312,20 +320,20 @@ private static Map<ByteBuffer, ByteBuffer> serializeWaitingOn(Map<TxnId, ByteBuf
         return result;
     }
 
-    private static NavigableMap<TxnId, ByteBuffer> deserializeWaitingOn(Map<ByteBuffer, ByteBuffer> serialized)
+    private static NavigableMap<Timestamp, TxnId> deserializeWaitingOnApply(Map<ByteBuffer, ByteBuffer> serialized)
     {
         if (serialized == null || serialized.isEmpty())
             return new TreeMap<>();
 
-        NavigableMap<TxnId, ByteBuffer> result = new TreeMap<>();
+        NavigableMap<Timestamp, TxnId> result = new TreeMap<>();
         for (Map.Entry<ByteBuffer, ByteBuffer> entry : serialized.entrySet())
-            result.put(deserializeTimestampOrNull(entry.getKey(), TxnId::new), entry.getValue());
+            result.put(deserializeTimestampOrNull(entry.getKey(), Timestamp::new), deserializeTimestampOrNull(entry.getValue(), TxnId::new));
         return result;
     }
 
-    private static NavigableMap<TxnId, ByteBuffer> deserializeWaitingOn(UntypedResultSet.Row row, String name)
+    private static NavigableMap<Timestamp, TxnId> deserializeWaitingOnApply(UntypedResultSet.Row row, String name)

Review Comment:
   Do we need the wrapper method if it is the only caller?



##########
src/java/org/apache/cassandra/utils/ByteArrayUtil.java:
##########
@@ -195,6 +195,53 @@ public static void putDouble(byte[] b, int off, double val) {
         putLong(b, off, Double.doubleToLongBits(val));
     }
 
+    /**
+     * An efficient way to write the type {@code bytes} of a long
+     *
+     * @param register - the long value to be written
+     * @param bytes - the number of bytes the register occupies. Valid values are between 1 and 8 inclusive.
+     * @throws IOException
+     */
+    public static void putBytes(byte[] b, int off, long register, int bytes)

Review Comment:
   Unit test it works in all 9 cases?



##########
src/java/org/apache/cassandra/service/accord/db/AccordUpdate.java:
##########
@@ -689,22 +695,13 @@ public long serializedBodySize(IncrementingUpdate update, int version)
         };
     }
 
-    private static Keys keysFrom(List<AbstractUpdate> updates, List<UpdatePredicate> predicates)
-    {
-        Set<Key> keys = new HashSet<>();
-        for (AbstractUpdate update : updates)
-            keys.add(update.partitionKey());
-        for (UpdatePredicate predicate : predicates)
-            keys.add(new PartitionKey(predicate.table.id, predicate.key));
-
-        return new Keys(keys);
-    }
-
     public AccordUpdate(List<AbstractUpdate> updates, List<UpdatePredicate> predicates)
     {
-        this.keys = keysFrom(updates, predicates);
-        this.updates = serialize(updates, AbstractUpdate.serializer);
-        this.predicates = serialize(predicates, predicateSerializer);
+        updates.sort(Comparator.comparing(AbstractUpdate::partitionKey));
+        predicates.sort(Comparator.comparing(UpdatePredicate::partitionKey));
+        this.keys = Keys.of(updates, AbstractUpdate::partitionKey).union(Keys.of(predicates, UpdatePredicate::partitionKey));

Review Comment:
   Linear merge without the copy?



##########
src/java/org/apache/cassandra/service/accord/db/AccordData.java:
##########
@@ -47,35 +49,50 @@
 
 public class AccordData extends AbstractKeyIndexed<FilteredPartition> implements Data, Result, Iterable<FilteredPartition>
 {
-    private static final long EMPTY_SIZE = ObjectSizes.measureDeep(new AccordData());
+    private static final long EMPTY_SIZE = ObjectSizes.measureDeep(new AccordData(Collections.emptyList()));
 
     private static PartitionKey getKey(FilteredPartition partition)
     {
         return new PartitionKey(partition.metadata().id, partition.partitionKey());
     }
 
-    public AccordData()
+    public static class Builder

Review Comment:
   Unused?



##########
src/java/org/apache/cassandra/utils/ByteArrayUtil.java:
##########
@@ -195,6 +195,53 @@ public static void putDouble(byte[] b, int off, double val) {
         putLong(b, off, Double.doubleToLongBits(val));
     }
 
+    /**
+     * An efficient way to write the type {@code bytes} of a long
+     *
+     * @param register - the long value to be written
+     * @param bytes - the number of bytes the register occupies. Valid values are between 1 and 8 inclusive.
+     * @throws IOException
+     */
+    public static void putBytes(byte[] b, int off, long register, int bytes)
+    {
+        switch (bytes)
+        {
+            case 0:
+                break;
+            case 1:
+                b[off] = (byte)(register >>> 56);
+                break;
+            case 2:
+                putShort(b, off, (short)(register >> 48));
+                break;
+            case 3:
+                putShort(b, off, (short)(register >> 48));
+                b[off + 2] = (byte)(register >> 40);
+                break;
+            case 4:
+                putInt(b, off, (int)(register >> 32));
+                break;
+            case 5:
+                putInt(b, off, (int)(register >> 32));
+                b[off + 4] = (byte)(register >> 24);
+                break;
+            case 6:
+                putInt(b, off, (int)(register >> 32));
+                putShort(b, off + 4, (short)(register >> 16));
+                break;
+            case 7:
+                putInt(b, off, (int)(register >> 32));
+                putShort(b, off + 4, (short)(register >> 16));

Review Comment:
   This has the same off + 4 and 5 issue?



##########
src/java/org/apache/cassandra/utils/ByteBufferUtil.java:
##########
@@ -726,6 +726,53 @@ public static int getShortLength(ByteBuffer bb, int position)
         return getUnsignedShort(bb, position);
     }
 
+    /**
+     * An efficient way to write the type {@code bytes} of a long
+     *
+     * @param register - the long value to be written
+     * @param bytes - the number of bytes the register occupies. Valid values are between 1 and 8 inclusive.
+     * @throws IOException
+     */
+    public static void putBytes(ByteBuffer b, int off, long register, int bytes)

Review Comment:
   Unit test it works in all 9 cases?



##########
src/java/org/apache/cassandra/service/accord/db/AccordUpdate.java:
##########
@@ -766,7 +769,54 @@ public Write apply(Data data)
         return new AccordWrite(new ArrayList<>(updateMap.values()));
     }
 
-    UpdatePredicate getPredicate(int i)
+    @Override
+    public Update slice(KeyRanges ranges)
+    {
+        Keys keys = this.keys.slice(ranges);
+        return new AccordUpdate(keys, select(this.keys, keys, updates), select(this.keys, keys, predicates));
+    }
+
+    private static ByteBuffer[] select(Keys in, Keys out, ByteBuffer[] from)
+    {
+        ByteBuffer[] result = new ByteBuffer[out.size()];
+        int j = 0;
+        for (int i = 0 ; i < in.size() ; ++i)
+        {
+            j = in.findNext(out.get(i), j);
+            result[i] = from[j];
+        }
+        return result;
+    }
+
+    @Override
+    public Update merge(Update update)
+    {
+        // TODO: special method for linear merging keyed and non-keyed lists simultaneously

Review Comment:
   Seen this comment several times, what are non-keyed lists?



##########
src/java/org/apache/cassandra/service/accord/db/AccordUpdate.java:
##########
@@ -867,27 +913,26 @@ public AccordUpdate deserialize(DataInputPlus in, int version) throws IOExceptio
         public long serializedSize(AccordUpdate update, int version)
         {
             long size = KeySerializers.keys.serializedSize(update.keys, version);
-
-            size += TypeSizes.sizeof(update.updates.length);
             for (ByteBuffer buffer : update.updates)
                 size += ByteBufferUtil.serializedSizeWithVIntLength(buffer);
-
-            size += TypeSizes.sizeof(update.predicates.length);
             for (ByteBuffer buffer : update.predicates)
                 size += ByteBufferUtil.serializedSizeWithVIntLength(buffer);
-
             return size;
         }
     };
 
-    private static <T> ByteBuffer serialize(T item, IVersionedSerializer<T> serializer)
+    private static <T> ByteBuffer serialize(List<T> items, int start, int end, IVersionedSerializer<T> serializer, int version)
     {
-        int version = MessagingService.current_version;
-        long size = serializer.serializedSize(item, version) + TypeSizes.INT_SIZE;
+        long size = TypeSizes.sizeofUnsignedVInt(version) + TypeSizes.sizeofUnsignedVInt(end - start);

Review Comment:
   Version is another thing that is a constant size in practice.



##########
src/java/org/apache/cassandra/service/accord/store/StoredSet.java:
##########
@@ -179,11 +179,17 @@ public long estimatedSizeOnHeap(ToLongFunction<T> measure)
         return size;
     }
 
-    public static class Navigable<T extends Comparable<?>> extends StoredSet<T, NavigableSet<T>>
+    public interface Changes<T>

Review Comment:
   Pretty weird interface to stash here when it is just used in AsyncWriter for AsyncWriter things.



##########
src/java/org/apache/cassandra/utils/ByteBufferUtil.java:
##########
@@ -726,6 +726,53 @@ public static int getShortLength(ByteBuffer bb, int position)
         return getUnsignedShort(bb, position);
     }
 
+    /**
+     * An efficient way to write the type {@code bytes} of a long
+     *
+     * @param register - the long value to be written
+     * @param bytes - the number of bytes the register occupies. Valid values are between 1 and 8 inclusive.
+     * @throws IOException
+     */
+    public static void putBytes(ByteBuffer b, int off, long register, int bytes)
+    {
+        switch (bytes)
+        {
+            case 0:
+                break;
+            case 1:
+                b.put(off, (byte)(register >>> 56));
+                break;
+            case 2:
+                b.putShort(off, (short)(register >> 48));
+                break;
+            case 3:
+                b.putShort(off, (short)(register >> 48));
+                b.put(off + 2, (byte)(register >> 40));
+                break;
+            case 4:
+                b.putInt(off, (int)(register >> 32));
+                break;
+            case 5:
+                b.putInt(off, (int)(register >> 32));
+                b.put(off + 4, (byte)(register >> 24));
+                break;
+            case 6:
+                b.putInt(off, (int)(register >> 32));
+                b.putShort(off + 4, (short)(register >> 16));
+                break;
+            case 7:
+                b.putInt(off, (int)(register >> 32));
+                b.putShort(off + 4, (short)(register >> 16));

Review Comment:
   Has the offset + 4 and 5 issue



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org