You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by GitBox <gi...@apache.org> on 2022/10/12 23:27:28 UTC

[GitHub] [cassandra] dcapwell commented on a diff in pull request #1845: accord metadata persistence

dcapwell commented on code in PR #1845:
URL: https://github.com/apache/cassandra/pull/1845#discussion_r993945136


##########
simulator.sh:
##########
@@ -0,0 +1,92 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#

Review Comment:
   this is *not* common in our current scripts, but I find this much safer when working with bash.. can we add
   
   ```
   #set -o xtrace
   set -o errexit
   set -o pipefail
   set -o nounset
   ```
   
   `xtrace` is disabled, and just there for when/if debugging is needed; it prints every command.
   `errexit` exits the script with error if any command fails; this means that lack of error checking doesn't produce bad results
   `pipefail` extension of `errexit`, when piping `a | b` if part of the pipe fails the outcome of the pipe fails.
   `nounset` detects when a variable is *used* before its *defined*; this is an issue in this patch
   



##########
simulator.sh:
##########
@@ -0,0 +1,92 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#ant jar simulator-jars
+
+DIR=`pwd`
+JVM_OPTS="$JVM_OPTS -Dcassandra.config=file://$DIR/test/conf/cassandra.yaml"

Review Comment:
   if you add `nounset` you would need to change `$JVM_OPTS` to `${JVM_OPTS:-}`; don't need to do if you don't set that.



##########
simulator.sh:
##########
@@ -0,0 +1,92 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#ant jar simulator-jars
+
+DIR=`pwd`
+JVM_OPTS="$JVM_OPTS -Dcassandra.config=file://$DIR/test/conf/cassandra.yaml"
+JVM_OPTS="$JVM_OPTS -Dlogback.configurationFile=file://$DIR/test/conf/logback-simulator.xml"
+JVM_OPTS="$JVM_OPTS -Dcassandra.logdir=$DIR/build/test/logs"
+#JVM_OPTS="$JVM_OPTS -Djava.library.path=$DIR/lib/sigar-bin"
+JVM_OPTS="$JVM_OPTS -Dlegacy-sstable-root=$DIR/test/data/legacy-sstables"
+JVM_OPTS="$JVM_OPTS -Dinvalid-legacy-sstable-root=$DIR/test/data/invalid-legacy-sstables"
+JVM_OPTS="$JVM_OPTS -Dcassandra.ring_delay_ms=1000"
+JVM_OPTS="$JVM_OPTS -Dcassandra.skip_sync=true"
+JVM_OPTS="$JVM_OPTS -ea"
+JVM_OPTS="$JVM_OPTS -XX:MaxMetaspaceSize=1G"
+JVM_OPTS="$JVM_OPTS -XX:SoftRefLRUPolicyMSPerMB=0"
+JVM_OPTS="$JVM_OPTS -Dcassandra.strict.runtime.checks=true"
+JVM_OPTS="$JVM_OPTS -javaagent:$DIR/build/test/lib/jars/simulator-asm.jar"
+JVM_OPTS="$JVM_OPTS -Xbootclasspath/a:$DIR/build/test/lib/jars/simulator-bootstrap.jar"
+JVM_OPTS="$JVM_OPTS -XX:ActiveProcessorCount=4"
+JVM_OPTS="$JVM_OPTS -XX:-TieredCompilation"
+JVM_OPTS="$JVM_OPTS -XX:Tier4CompileThreshold=1000"
+JVM_OPTS="$JVM_OPTS -XX:ReservedCodeCacheSize=256M"
+JVM_OPTS="$JVM_OPTS -Xmx8G"
+JVM_OPTS="$JVM_OPTS -Dcassandra.test.simulator.determinismcheck=strict"
+JVM_OPTS="$JVM_OPTS -Dcassandra.debugrefcount=false"
+JVM_OPTS="$JVM_OPTS -Dcassandra.skip_sync=true"
+JVM_OPTS="$JVM_OPTS -Dcassandra.tolerate_sstable_size=true"
+JVM_OPTS="$JVM_OPTS -Dcassandra.test.simulator.debug=true"
+JVM_OPTS="$JVM_OPTS -Dcassandra.test.simulator.determinismcheck=strict"
+echo $JVM_OPTS
+
+CLASSPATH="$DIR"/build/test/classes
+for dir in "$DIR"/build/classes/*; do
+    CLASSPATH="$CLASSPATH:$jar"

Review Comment:
   `jar` doesn't exist yet, should be `dir`.    If you add `set -o nounset` you would detect this and fail earlier



##########
simulator.sh:
##########
@@ -0,0 +1,92 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#ant jar simulator-jars
+
+DIR=`pwd`
+JVM_OPTS="$JVM_OPTS -Dcassandra.config=file://$DIR/test/conf/cassandra.yaml"
+JVM_OPTS="$JVM_OPTS -Dlogback.configurationFile=file://$DIR/test/conf/logback-simulator.xml"
+JVM_OPTS="$JVM_OPTS -Dcassandra.logdir=$DIR/build/test/logs"
+#JVM_OPTS="$JVM_OPTS -Djava.library.path=$DIR/lib/sigar-bin"
+JVM_OPTS="$JVM_OPTS -Dlegacy-sstable-root=$DIR/test/data/legacy-sstables"
+JVM_OPTS="$JVM_OPTS -Dinvalid-legacy-sstable-root=$DIR/test/data/invalid-legacy-sstables"
+JVM_OPTS="$JVM_OPTS -Dcassandra.ring_delay_ms=1000"
+JVM_OPTS="$JVM_OPTS -Dcassandra.skip_sync=true"
+JVM_OPTS="$JVM_OPTS -ea"
+JVM_OPTS="$JVM_OPTS -XX:MaxMetaspaceSize=1G"
+JVM_OPTS="$JVM_OPTS -XX:SoftRefLRUPolicyMSPerMB=0"
+JVM_OPTS="$JVM_OPTS -Dcassandra.strict.runtime.checks=true"
+JVM_OPTS="$JVM_OPTS -javaagent:$DIR/build/test/lib/jars/simulator-asm.jar"
+JVM_OPTS="$JVM_OPTS -Xbootclasspath/a:$DIR/build/test/lib/jars/simulator-bootstrap.jar"
+JVM_OPTS="$JVM_OPTS -XX:ActiveProcessorCount=4"
+JVM_OPTS="$JVM_OPTS -XX:-TieredCompilation"
+JVM_OPTS="$JVM_OPTS -XX:Tier4CompileThreshold=1000"
+JVM_OPTS="$JVM_OPTS -XX:ReservedCodeCacheSize=256M"
+JVM_OPTS="$JVM_OPTS -Xmx8G"
+JVM_OPTS="$JVM_OPTS -Dcassandra.test.simulator.determinismcheck=strict"
+JVM_OPTS="$JVM_OPTS -Dcassandra.debugrefcount=false"
+JVM_OPTS="$JVM_OPTS -Dcassandra.skip_sync=true"
+JVM_OPTS="$JVM_OPTS -Dcassandra.tolerate_sstable_size=true"
+JVM_OPTS="$JVM_OPTS -Dcassandra.test.simulator.debug=true"
+JVM_OPTS="$JVM_OPTS -Dcassandra.test.simulator.determinismcheck=strict"
+echo $JVM_OPTS
+
+CLASSPATH="$DIR"/build/test/classes
+for dir in "$DIR"/build/classes/*; do
+    CLASSPATH="$CLASSPATH:$jar"
+done
+
+for jar in "$DIR"/lib/*.jar; do
+  if [[ $jar != *"logback-classic"* ]]; then

Review Comment:
   can we remove this `if`?  We *need* `logback-classic* else all logging is disabled...  right now we fail silently rather than telling others why we failed.



##########
simulator.sh:
##########
@@ -0,0 +1,92 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#ant jar simulator-jars
+
+DIR=`pwd`
+JVM_OPTS="$JVM_OPTS -Dcassandra.config=file://$DIR/test/conf/cassandra.yaml"
+JVM_OPTS="$JVM_OPTS -Dlogback.configurationFile=file://$DIR/test/conf/logback-simulator.xml"
+JVM_OPTS="$JVM_OPTS -Dcassandra.logdir=$DIR/build/test/logs"
+#JVM_OPTS="$JVM_OPTS -Djava.library.path=$DIR/lib/sigar-bin"
+JVM_OPTS="$JVM_OPTS -Dlegacy-sstable-root=$DIR/test/data/legacy-sstables"
+JVM_OPTS="$JVM_OPTS -Dinvalid-legacy-sstable-root=$DIR/test/data/invalid-legacy-sstables"
+JVM_OPTS="$JVM_OPTS -Dcassandra.ring_delay_ms=1000"
+JVM_OPTS="$JVM_OPTS -Dcassandra.skip_sync=true"
+JVM_OPTS="$JVM_OPTS -ea"
+JVM_OPTS="$JVM_OPTS -XX:MaxMetaspaceSize=1G"
+JVM_OPTS="$JVM_OPTS -XX:SoftRefLRUPolicyMSPerMB=0"
+JVM_OPTS="$JVM_OPTS -Dcassandra.strict.runtime.checks=true"
+JVM_OPTS="$JVM_OPTS -javaagent:$DIR/build/test/lib/jars/simulator-asm.jar"
+JVM_OPTS="$JVM_OPTS -Xbootclasspath/a:$DIR/build/test/lib/jars/simulator-bootstrap.jar"
+JVM_OPTS="$JVM_OPTS -XX:ActiveProcessorCount=4"
+JVM_OPTS="$JVM_OPTS -XX:-TieredCompilation"
+JVM_OPTS="$JVM_OPTS -XX:Tier4CompileThreshold=1000"
+JVM_OPTS="$JVM_OPTS -XX:ReservedCodeCacheSize=256M"
+JVM_OPTS="$JVM_OPTS -Xmx8G"
+JVM_OPTS="$JVM_OPTS -Dcassandra.test.simulator.determinismcheck=strict"
+JVM_OPTS="$JVM_OPTS -Dcassandra.debugrefcount=false"
+JVM_OPTS="$JVM_OPTS -Dcassandra.skip_sync=true"
+JVM_OPTS="$JVM_OPTS -Dcassandra.tolerate_sstable_size=true"
+JVM_OPTS="$JVM_OPTS -Dcassandra.test.simulator.debug=true"
+JVM_OPTS="$JVM_OPTS -Dcassandra.test.simulator.determinismcheck=strict"
+echo $JVM_OPTS
+
+CLASSPATH="$DIR"/build/test/classes
+for dir in "$DIR"/build/classes/*; do
+    CLASSPATH="$CLASSPATH:$jar"
+done
+
+for jar in "$DIR"/lib/*.jar; do
+  if [[ $jar != *"logback-classic"* ]]; then
+    CLASSPATH="$CLASSPATH:$jar"
+  fi
+done
+for jar in "$DIR"/build/*.jar; do
+  if [[ $jar != *"logback-classic"* ]]; then
+    CLASSPATH="$CLASSPATH:$jar"
+  fi
+done
+for jar in "$DIR"/build/lib/jars/*.jar; do
+  if [[ $jar != *"logback-classic"* ]]; then
+    CLASSPATH="$CLASSPATH:$jar"
+  fi
+done
+for jar in "$DIR"/build/test/lib/jars/*.jar; do
+  if [[ $jar != *"logback-classic"* ]]; then
+    CLASSPATH="$CLASSPATH:$jar"
+  fi
+done
+
+CLASS="org.apache.cassandra.simulator.paxos.AccordSimulationRunner"
+OPTS="run -n 3..6 -t 1000 --cluster-action-limit -1 -c 2 -s 30"
+
+echo "java -cp <...> $CLASS $OPTS $@"
+
+echo ""
+java -cp $CLASSPATH $JVM_OPTS $CLASS $OPTS $@
+while true
+do
+  echo ""
+  java -cp $CLASSPATH $JVM_OPTS $CLASS $OPTS $@
+  status=$?

Review Comment:
   if you set `errexit` then this logic would become simpler as you just do
   
   ```
   while true; do
     echo ""
     java -cp $CLASSPATH $JVM_OPTS $CLASS $OPTS $@
   done
   ```



##########
src/java/org/apache/cassandra/concurrent/SingleThreadExecutorPlus.java:
##########
@@ -25,6 +25,7 @@ public class SingleThreadExecutorPlus extends ThreadPoolExecutorPlus implements
 {
     public static class AtLeastOnce extends AtomicBoolean implements AtLeastOnceTrigger, Runnable
     {
+        private static final long serialVersionUID = 0;  // for simulator support

Review Comment:
   not for this patch, but if simulator needs all `Serializable` types to implement *should* we detect this in our build?  Super easy to miss and not sure how easy it is to tell that simulator didn't like this missing...



##########
simulator.sh:
##########
@@ -0,0 +1,92 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#ant jar simulator-jars
+
+DIR=`pwd`
+JVM_OPTS="$JVM_OPTS -Dcassandra.config=file://$DIR/test/conf/cassandra.yaml"
+JVM_OPTS="$JVM_OPTS -Dlogback.configurationFile=file://$DIR/test/conf/logback-simulator.xml"
+JVM_OPTS="$JVM_OPTS -Dcassandra.logdir=$DIR/build/test/logs"
+#JVM_OPTS="$JVM_OPTS -Djava.library.path=$DIR/lib/sigar-bin"
+JVM_OPTS="$JVM_OPTS -Dlegacy-sstable-root=$DIR/test/data/legacy-sstables"
+JVM_OPTS="$JVM_OPTS -Dinvalid-legacy-sstable-root=$DIR/test/data/invalid-legacy-sstables"
+JVM_OPTS="$JVM_OPTS -Dcassandra.ring_delay_ms=1000"
+JVM_OPTS="$JVM_OPTS -Dcassandra.skip_sync=true"
+JVM_OPTS="$JVM_OPTS -ea"
+JVM_OPTS="$JVM_OPTS -XX:MaxMetaspaceSize=1G"
+JVM_OPTS="$JVM_OPTS -XX:SoftRefLRUPolicyMSPerMB=0"
+JVM_OPTS="$JVM_OPTS -Dcassandra.strict.runtime.checks=true"
+JVM_OPTS="$JVM_OPTS -javaagent:$DIR/build/test/lib/jars/simulator-asm.jar"
+JVM_OPTS="$JVM_OPTS -Xbootclasspath/a:$DIR/build/test/lib/jars/simulator-bootstrap.jar"
+JVM_OPTS="$JVM_OPTS -XX:ActiveProcessorCount=4"
+JVM_OPTS="$JVM_OPTS -XX:-TieredCompilation"
+JVM_OPTS="$JVM_OPTS -XX:Tier4CompileThreshold=1000"
+JVM_OPTS="$JVM_OPTS -XX:ReservedCodeCacheSize=256M"
+JVM_OPTS="$JVM_OPTS -Xmx8G"
+JVM_OPTS="$JVM_OPTS -Dcassandra.test.simulator.determinismcheck=strict"
+JVM_OPTS="$JVM_OPTS -Dcassandra.debugrefcount=false"
+JVM_OPTS="$JVM_OPTS -Dcassandra.skip_sync=true"
+JVM_OPTS="$JVM_OPTS -Dcassandra.tolerate_sstable_size=true"
+JVM_OPTS="$JVM_OPTS -Dcassandra.test.simulator.debug=true"
+JVM_OPTS="$JVM_OPTS -Dcassandra.test.simulator.determinismcheck=strict"
+echo $JVM_OPTS
+
+CLASSPATH="$DIR"/build/test/classes
+for dir in "$DIR"/build/classes/*; do
+    CLASSPATH="$CLASSPATH:$jar"
+done
+
+for jar in "$DIR"/lib/*.jar; do
+  if [[ $jar != *"logback-classic"* ]]; then
+    CLASSPATH="$CLASSPATH:$jar"
+  fi
+done
+for jar in "$DIR"/build/*.jar; do
+  if [[ $jar != *"logback-classic"* ]]; then
+    CLASSPATH="$CLASSPATH:$jar"
+  fi
+done
+for jar in "$DIR"/build/lib/jars/*.jar; do
+  if [[ $jar != *"logback-classic"* ]]; then
+    CLASSPATH="$CLASSPATH:$jar"
+  fi
+done
+for jar in "$DIR"/build/test/lib/jars/*.jar; do
+  if [[ $jar != *"logback-classic"* ]]; then
+    CLASSPATH="$CLASSPATH:$jar"
+  fi
+done
+
+CLASS="org.apache.cassandra.simulator.paxos.AccordSimulationRunner"
+OPTS="run -n 3..6 -t 1000 --cluster-action-limit -1 -c 2 -s 30"
+
+echo "java -cp <...> $CLASS $OPTS $@"
+
+echo ""
+java -cp $CLASSPATH $JVM_OPTS $CLASS $OPTS $@

Review Comment:
   this isn't needed, we don't check the status so its just a wasted exec.



##########
src/java/org/apache/cassandra/config/DatabaseDescriptor.java:
##########
@@ -497,6 +497,9 @@ else if (conf.disk_access_mode == Config.DiskAccessMode.mmap_index_only)
         if (conf.concurrent_counter_writes < 2)
             throw new ConfigurationException("concurrent_counter_writes must be at least 2, but was " + conf.concurrent_counter_writes, false);
 
+        if (conf.concurrent_accord_operations < 1)

Review Comment:
   not for this patch: we really should make this simpler in our config... always hate changing DD to do our validation as that will then get skipped when done in other contexts (such as updating our vtable or JMX)



##########
src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java:
##########
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.util.List;
 import java.nio.ByteBuffer;
+import java.util.Objects;

Review Comment:
   unused import



##########
src/java/org/apache/cassandra/db/filter/AbstractClusteringIndexFilter.java:
##########
@@ -18,6 +18,7 @@
 package org.apache.cassandra.db.filter;
 
 import java.io.IOException;
+import java.util.Objects;

Review Comment:
   unused import



##########
src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import java.util.Map;
+
+import accord.api.Key;
+import accord.local.Node;
+import accord.primitives.Deps;
+import accord.primitives.Keys;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.txn.Txn;
+import accord.txn.Writes;
+import org.apache.cassandra.service.accord.api.AccordKey;
+import org.apache.cassandra.service.accord.db.AccordQuery;
+import org.apache.cassandra.service.accord.db.AccordRead;
+import org.apache.cassandra.service.accord.db.AccordUpdate;
+import org.apache.cassandra.service.accord.db.AccordWrite;
+import org.apache.cassandra.utils.ObjectSizes;
+
+public class AccordObjectSizes
+{
+    public static long key(Key key)
+    {
+        return ((AccordKey.PartitionKey) key).estimatedSizeOnHeap();

Review Comment:
   feels weird to only take PartitionKey here as we have token and sentinel... but not seeing a non-partition key touch this... I remember us dealing with that in serializer...



##########
src/java/org/apache/cassandra/service/accord/AccordMessageSink.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import java.util.EnumMap;
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.api.MessageSink;
+import accord.local.Node;
+import accord.messages.Callback;
+import accord.messages.MessageType;
+import accord.messages.Reply;
+import accord.messages.ReplyContext;
+import accord.messages.Request;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.Verb;
+
+import static org.apache.cassandra.service.accord.EndpointMapping.getEndpoint;
+
+public class AccordMessageSink implements MessageSink
+{
+    private static final Logger logger = LoggerFactory.getLogger(AccordMessageSink.class);
+
+    private static class VerbMapping
+    {
+        private static final VerbMapping instance = new VerbMapping();
+
+        private final Map<MessageType, Verb> mapping = new EnumMap<>(MessageType.class);
+
+        private VerbMapping()
+        {
+            mapping.put(MessageType.PREACCEPT_REQ,          Verb.ACCORD_PREACCEPT_REQ);
+            mapping.put(MessageType.PREACCEPT_RSP,          Verb.ACCORD_PREACCEPT_RSP);
+            mapping.put(MessageType.ACCEPT_REQ,             Verb.ACCORD_ACCEPT_REQ);
+            mapping.put(MessageType.ACCEPT_RSP,             Verb.ACCORD_ACCEPT_RSP);
+            mapping.put(MessageType.ACCEPT_INVALIDATE_REQ,  Verb.ACCORD_ACCEPT_INVALIDATE_REQ);
+            mapping.put(MessageType.COMMIT_REQ,             Verb.ACCORD_COMMIT_REQ);
+            mapping.put(MessageType.COMMIT_INVALIDATE,      Verb.ACCORD_COMMIT_INVALIDATE_REQ);
+            mapping.put(MessageType.APPLY_REQ,              Verb.ACCORD_APPLY_REQ);
+            mapping.put(MessageType.APPLY_RSP,              Verb.ACCORD_APPLY_RSP);
+            mapping.put(MessageType.APPLY_AND_CHECK_REQ,    Verb.ACCORD_APPLY_AND_CHECK_REQ);
+            mapping.put(MessageType.APPLY_AND_CHECK_RSP,    Verb.ACCORD_APPLY_AND_CHECK_RSP);
+            mapping.put(MessageType.READ_REQ,               Verb.ACCORD_READ_REQ);
+            mapping.put(MessageType.READ_RSP,               Verb.ACCORD_READ_RSP);
+            mapping.put(MessageType.BEGIN_RECOVER_REQ,      Verb.ACCORD_RECOVER_REQ);
+            mapping.put(MessageType.BEGIN_RECOVER_RSP,      Verb.ACCORD_RECOVER_RSP);
+            mapping.put(MessageType.BEGIN_INVALIDATE_REQ,   Verb.ACCORD_BEGIN_INVALIDATE_REQ);
+            mapping.put(MessageType.BEGIN_INVALIDATE_RSP,   Verb.ACCORD_BEGIN_INVALIDATE_RSP);
+            mapping.put(MessageType.WAIT_ON_COMMIT_REQ,     Verb.ACCORD_WAIT_COMMIT_REQ);
+            mapping.put(MessageType.WAIT_ON_COMMIT_RSP,     Verb.ACCORD_WAIT_COMMIT_RSP);
+            mapping.put(MessageType.INFORM_REQ,             Verb.ACCORD_INFORM_OF_TXN_REQ);
+            mapping.put(MessageType.INFORM_RSP,             Verb.ACCORD_INFORM_OF_TXN_RSP);
+            mapping.put(MessageType.INFORM_PERSISTED_REQ,   Verb.ACCORD_INFORM_OF_PERSIETENCE_REQ);
+            mapping.put(MessageType.INFORM_PERSISTED_RSP,   Verb.ACCORD_INFORM_OF_PERSISTENCE_RSP);
+            mapping.put(MessageType.CHECK_STATUS_REQ,       Verb.ACCORD_CHECK_STATUS_REQ);
+            mapping.put(MessageType.CHECK_STATUS_RSP,       Verb.ACCORD_CHECK_STATUS_RSP);
+        }
+    }
+
+    private static Verb getVerb(MessageType type)
+    {
+        return VerbMapping.instance.mapping.get(type);
+    }
+
+    @Override
+    public void send(Node.Id to, Request request)
+    {
+        Verb verb = getVerb(request.type());
+        Preconditions.checkArgument(verb != null);

Review Comment:
   there is `checkNotNull` and `java.util.Objects#requireNonNull(T)`



##########
src/java/org/apache/cassandra/service/accord/AccordPartialCommand.java:
##########
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+
+import accord.local.Listener;
+import accord.local.PartialCommand;
+import accord.local.Status;
+import accord.primitives.Deps;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.txn.Txn;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.accord.async.AsyncContext;
+import org.apache.cassandra.service.accord.serializers.CommandSerializers;
+
+import static org.apache.cassandra.service.accord.serializers.NullableSerializer.deserializeNullable;
+import static org.apache.cassandra.service.accord.serializers.NullableSerializer.serializeNullable;
+import static org.apache.cassandra.service.accord.serializers.NullableSerializer.serializedSizeNullable;
+
+public class AccordPartialCommand implements PartialCommand
+{
+    public static final PartialCommandSerializer<PartialCommand> serializer = new PartialCommandSerializer<PartialCommand>()
+    {
+        @Override
+        public PartialCommand getCachedFull(TxnId txnId, AsyncContext context)
+        {
+            return context.commands.get(txnId);
+        }
+
+        @Override
+        public void addToContext(PartialCommand command, AsyncContext context)
+        {
+            context.commands.addPartialCommand((AccordPartialCommand) command);
+        }
+
+        @Override
+        public PartialCommand deserializeBody(TxnId txnId, Txn txn, Timestamp executeAt, Status status, DataInputPlus in, Version version) throws IOException
+        {
+            return new AccordPartialCommand(txnId, txn, executeAt, status);
+        }
+    };
+
+    private final TxnId txnId;
+    private final Txn txn;
+    private final Timestamp executeAt;
+    private final Status status;
+    private List<Listener> removedListeners = null;
+
+    public AccordPartialCommand(TxnId txnId, Txn txn, Timestamp executeAt, Status status)
+    {
+        this.txnId = txnId;
+        this.txn = txn;
+        this.executeAt = executeAt;
+        this.status = status;
+    }
+
+    @Override
+    public TxnId txnId()
+    {
+        return txnId;
+    }
+
+    @Override
+    public Txn txn()
+    {
+        return txn;
+    }
+
+    @Override
+    public Timestamp executeAt()
+    {
+        return executeAt;
+    }
+
+    @Override
+    public Status status()
+    {
+        return status;
+    }
+
+    @Override
+    public void removeListener(Listener listener)
+    {
+        if (removedListeners == null)
+            removedListeners = new ArrayList<>();
+        removedListeners.add(listener);
+    }
+
+    public boolean hasRemovedListeners()
+    {
+        return removedListeners != null && !removedListeners.isEmpty();
+    }
+
+    public void forEachRemovedListener(Consumer<Listener> consumer)
+    {
+        removedListeners.forEach(consumer);

Review Comment:
   should have a null check



##########
src/java/org/apache/cassandra/service/accord/AccordService.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import java.util.Arrays;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import accord.impl.SimpleProgressLog;
+import accord.local.Node;
+import accord.messages.Reply;
+import accord.messages.Request;
+import org.apache.cassandra.concurrent.Shutdownable;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.service.accord.api.AccordAgent;
+import org.apache.cassandra.service.accord.api.AccordScheduler;
+import org.apache.cassandra.utils.Clock;
+import org.apache.cassandra.utils.ExecutorUtils;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class AccordService implements Shutdownable
+{
+    public final Node node;
+    private final Shutdownable nodeShutdown;
+    private final AccordMessageSink messageSink;
+    public final AccordConfigurationService configService;
+    private final AccordScheduler scheduler;
+    private final AccordVerbHandler verbHandler;
+
+    private static class Handle
+    {
+        public static final AccordService instance = new AccordService();
+    }
+
+    public static AccordService instance()
+    {
+        return Handle.instance;
+    }
+
+    public static long uniqueNow()
+    {
+        return TimeUnit.MILLISECONDS.toMicros(Clock.Global.currentTimeMillis());

Review Comment:
   nit: time doesn't need to move forward and can go backwards, so this isn't `unique`; this is `java.lang.System#currentTimeMillis` in normal operations



##########
src/java/org/apache/cassandra/service/accord/db/AbstractKeyIndexed.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord.db;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.TreeMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.db.marshal.ByteBufferAccessor;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.accord.api.AccordKey.PartitionKey;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+/**
+ * Contains a map of objects serialized to byte buffers
+ * @param <T>
+ */
+public abstract class AbstractKeyIndexed<T>
+{
+    final NavigableMap<PartitionKey, ByteBuffer> serialized;
+
+    abstract void serialize(T t, DataOutputPlus out, int version) throws IOException;
+    abstract T deserialize(DataInputPlus in, int version) throws IOException;
+    abstract long serializedSize(T t, int version);
+    abstract long emptySizeOnHeap();
+
+    protected ByteBuffer serialize(T item)
+    {
+        int version = MessagingService.current_version;

Review Comment:
   this could become an issue in the future, version can change and be mixed mode, so relying on the messaging version without knowing the peer we talk to may be an issue down the road.



##########
src/java/org/apache/cassandra/db/partitions/Partition.java:
##########
@@ -21,10 +21,12 @@
 
 import javax.annotation.Nullable;
 
+import org.apache.cassandra.schema.TableId;

Review Comment:
   unused imports



##########
src/java/org/apache/cassandra/net/ResponseVerbHandler.java:
##########
@@ -26,7 +26,7 @@
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
 
-class ResponseVerbHandler implements IVerbHandler
+public class ResponseVerbHandler implements IVerbHandler

Review Comment:
   only usage is `Verb` which is the same package; why do we need this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org