You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by "bdeggleston (via GitHub)" <gi...@apache.org> on 2023/03/14 21:41:34 UTC

[GitHub] [cassandra-accord] bdeggleston commented on a diff in pull request #30: CEP-15: (Accord) Migrate Accord away from JDK random to a new interface RandomSource

bdeggleston commented on code in PR #30:
URL: https://github.com/apache/cassandra-accord/pull/30#discussion_r1136224771


##########
accord-core/src/main/java/accord/utils/JDKRandomAPI.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.utils;
+
+import java.util.Random;
+
+public interface JDKRandomAPI
+{

Review Comment:
   why is this a separate interface from RandomSource? It seems like they could be merged



##########
accord-core/src/test/java/accord/burn/TopologyUpdates.java:
##########
@@ -185,46 +186,47 @@ private static Collection<Node.Id> allNodesFor(Txn txn, Topology... topologies)
         return result;
     }
 
-    private static Stream<MessageTask> syncEpochCommands(Node node, long srcEpoch, Ranges ranges, Function<CommandSync, Collection<Node.Id>> recipients, long trgEpoch, boolean committedOnly) throws ExecutionException
+    private static AsyncChain<Stream<MessageTask>> syncEpochCommands(Node node, long srcEpoch, Ranges ranges, Function<CommandSync, Collection<Node.Id>> recipients, long trgEpoch, boolean committedOnly)
     {
         Map<TxnId, CheckStatusOk> syncMessages = new ConcurrentHashMap<>();
         Consumer<Command> commandConsumer = command -> syncMessages.merge(command.txnId(), new CheckStatusOk(node, command), CheckStatusOk::merge);
+        AsyncChain<Void> start;
         if (committedOnly)
-            getUninterruptibly(node.commandStores().forEach(commandStore -> InMemoryCommandStore.inMemory(commandStore).forCommittedInEpoch(ranges, srcEpoch, commandConsumer)));
+            start = node.commandStores().forEach(commandStore -> InMemoryCommandStore.inMemory(commandStore).forCommittedInEpoch(ranges, srcEpoch, commandConsumer));
         else
-            getUninterruptibly(node.commandStores().forEach(commandStore -> InMemoryCommandStore.inMemory(commandStore).forEpochCommands(ranges, srcEpoch, commandConsumer)));
+            start = node.commandStores().forEach(commandStore -> InMemoryCommandStore.inMemory(commandStore).forEpochCommands(ranges, srcEpoch, commandConsumer));
 
-        return syncMessages.entrySet().stream().map(e -> {
+        return start.map(ignore -> syncMessages.entrySet().stream().map(e -> {
             CommandSync sync = new CommandSync(e.getKey(), e.getValue(), srcEpoch, trgEpoch);
             return MessageTask.of(node, recipients.apply(sync), sync.toString(), sync::process);
-        });
+        }));
     }
 
     private static final boolean PREACCEPTED = false;
     private static final boolean COMMITTED_ONLY = true;
 
-    /**
-     * Syncs all replicated commands. Overkill, but useful for confirming issues in optimizedSync
-     */
-    private static Stream<MessageTask> thoroughSync(Node node, long syncEpoch) throws ExecutionException
-    {
-        Topology syncTopology = node.configService().getTopologyForEpoch(syncEpoch);
-        Topology localTopology = syncTopology.forNode(node.id());
-        Function<CommandSync, Collection<Node.Id>> allNodes = cmd -> node.topology().withUnsyncedEpochs(cmd.route, syncEpoch + 1).nodes();
-
-        Ranges ranges = localTopology.ranges();
-        Stream<MessageTask> messageStream = Stream.empty();
-        for (long epoch=1; epoch<=syncEpoch; epoch++)
-        {
-            messageStream = Stream.concat(messageStream, syncEpochCommands(node, epoch, ranges, allNodes, syncEpoch, COMMITTED_ONLY));
-        }
-        return messageStream;
-    }
+//    /**
+//     * Syncs all replicated commands. Overkill, but useful for confirming issues in optimizedSync
+//     */
+//    private static Stream<MessageTask> thoroughSync(Node node, long syncEpoch) throws ExecutionException
+//    {
+//        Topology syncTopology = node.configService().getTopologyForEpoch(syncEpoch);
+//        Topology localTopology = syncTopology.forNode(node.id());
+//        Function<CommandSync, Collection<Node.Id>> allNodes = cmd -> node.topology().withUnsyncedEpochs(cmd.route, syncEpoch + 1).nodes();
+//
+//        Ranges ranges = localTopology.ranges();
+//        Stream<MessageTask> messageStream = Stream.empty();
+//        for (long epoch=1; epoch<=syncEpoch; epoch++)
+//        {
+//            messageStream = Stream.concat(messageStream, syncEpochCommands(node, epoch, ranges, allNodes, syncEpoch, COMMITTED_ONLY));
+//        }
+//        return messageStream;
+//    }

Review Comment:
   why is this commented out?



##########
accord-core/src/main/java/accord/utils/async/AsyncChainCombiner.java:
##########
@@ -165,4 +165,26 @@ static <V> boolean canAppendTo(AsyncChain<? extends V> chain, BiFunction<V, V, V
             return reduce.reducer == reducer;
         }
     }
+
+    static class ReduceWithZero<A, B> extends AsyncChainCombiner<A, B>
+    {
+        private final B zero;
+        private final BiFunction<B, ? super A, B> reducer;
+
+        protected ReduceWithZero(List<? extends AsyncChain<? extends A>> inputs, B zero, BiFunction<B, ? super A, B> reducer)

Review Comment:
   assuming this isn't removed before commit, could we call this ReduceWithInitialValue, or maybe ReduceWithIdentiity? 



##########
accord-core/src/main/java/accord/utils/async/AsyncChains.java:
##########
@@ -423,6 +424,16 @@ public static <V> AsyncChain<V> reduce(AsyncChain<V> a, AsyncChain<V> b, BiFunct
         return new Reduce<>(Lists.newArrayList(a, b), reducer);
     }
 
+    public static <A, B> AsyncChain<B> reduce(List<? extends AsyncChain<? extends A>> chains, B zero, BiFunction<B, ? super A, B> reducer)
+    {
+        switch (chains.size())
+        {
+            case 0: return AsyncChains.success(zero);
+            case 1: return chains.get(0).map(a -> reducer.apply(zero, a));
+        }
+        return new ReduceWithZero<>(chains, zero, reducer);

Review Comment:
   I don't think this is needed. The empty case is already handled above, and the only usage of this doesn't need a special initial value



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org