You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by GitBox <gi...@apache.org> on 2020/02/26 12:40:05 UTC

[GitHub] [cassandra] ifesdjeen opened a new pull request #453: CASSANDRA-15539 3.0

ifesdjeen opened a new pull request #453: CASSANDRA-15539 3.0
URL: https://github.com/apache/cassandra/pull/453
 
 
   https://issues.apache.org/jira/browse/CASSANDRA-15539

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] dcapwell commented on a change in pull request #453: CASSANDRA-15539 3.0

Posted by GitBox <gi...@apache.org>.
dcapwell commented on a change in pull request #453: CASSANDRA-15539 3.0
URL: https://github.com/apache/cassandra/pull/453#discussion_r399386546
 
 

 ##########
 File path: test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
 ##########
 @@ -0,0 +1,276 @@
+package org.apache.cassandra.distributed.test;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+
+import static org.junit.Assert.assertEquals;
+
+import static org.apache.cassandra.distributed.shared.AssertUtils.*;
+
+// TODO: this test should be removed after running in-jvm dtests is set up via the shared API repository
+public class SimpleReadWriteTest extends SharedClusterTestBase
 
 Review comment:
   back ported again?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] dcapwell commented on a change in pull request #453: CASSANDRA-15539 3.0

Posted by GitBox <gi...@apache.org>.
dcapwell commented on a change in pull request #453: CASSANDRA-15539 3.0
URL: https://github.com/apache/cassandra/pull/453#discussion_r399383502
 
 

 ##########
 File path: test/distributed/org/apache/cassandra/distributed/impl/Instance.java
 ##########
 @@ -189,65 +215,128 @@ public void schemaChangeInternal(String query)
         }).run();
     }
 
-    private void registerMockMessaging(ICluster cluster)
+    private void registerMockMessaging(ICluster<IInstance> cluster)
     {
-        BiConsumer<InetAddressAndPort, IMessage> deliverToInstance = (to, message) -> cluster.get(to).receiveMessage(message);
-        BiConsumer<InetAddressAndPort, IMessage> deliverToInstanceIfNotFiltered = (to, message) -> {
+        BiConsumer<InetSocketAddress, IMessage> deliverToInstance = (to, message) -> cluster.get(to).receiveMessage(message);
+        BiConsumer<InetSocketAddress, IMessage> deliverToInstanceIfNotFiltered = (to, message) -> {
             int fromNum = config().num();
             int toNum = cluster.get(to).config().num();
 
-            if (cluster.filters().permit(fromNum, toNum, message))
+            if (cluster.filters().permitOutbound(fromNum, toNum, message)
+                && cluster.filters().permitInbound(fromNum, toNum, message))
                 deliverToInstance.accept(to, message);
         };
 
-        Map<InetAddress, InetAddressAndPort> addressAndPortMap = new HashMap<>();
+        Map<InetAddress, InetSocketAddress> addressAndPortMap = new HashMap<>();
         cluster.stream().forEach(instance -> {
-            InetAddressAndPort addressAndPort = instance.broadcastAddressAndPort();
-            if (!addressAndPort.equals(instance.config().broadcastAddressAndPort()))
-                throw new IllegalStateException("addressAndPort mismatch: " + addressAndPort + " vs " + instance.config().broadcastAddressAndPort());
-            InetAddressAndPort prev = addressAndPortMap.put(addressAndPort.address, addressAndPort);
+            InetSocketAddress addressAndPort = instance.broadcastAddress();
+            if (!addressAndPort.equals(instance.config().broadcastAddress()))
+                throw new IllegalStateException("addressAndPort mismatch: " + addressAndPort + " vs " + instance.config().broadcastAddress());
+            InetSocketAddress prev = addressAndPortMap.put(addressAndPort.getAddress(),
+                                                                        addressAndPort);
             if (null != prev)
                 throw new IllegalStateException("This version of Cassandra does not support multiple nodes with the same InetAddress: " + addressAndPort + " vs " + prev);
         });
 
-        MessagingService.instance().addMessageSink(
-                new MessageDeliverySink(deliverToInstanceIfNotFiltered, addressAndPortMap::get));
+        MessagingService.instance().addMessageSink(new MessageDeliverySink(deliverToInstanceIfNotFiltered, addressAndPortMap::get));
     }
 
     // unnecessary if registerMockMessaging used
-    private void registerFilter(ICluster cluster)
+    private void registerFilters(ICluster cluster)
     {
         IInstance instance = this;
         MessagingService.instance().addMessageSink(new IMessageSink()
         {
             public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress toAddress)
             {
                 // Port is not passed in, so take a best guess at the destination port from this instance
-                IInstance to = cluster.get(InetAddressAndPort.getByAddressOverrideDefaults(toAddress, instance.config().broadcastAddressAndPort().port));
+                IInstance to = cluster.get(NetworkTopology.addressAndPort(toAddress,
+                                                                          instance.config().broadcastAddress().getPort()));
                 int fromNum = config().num();
                 int toNum = to.config().num();
-                return cluster.filters().permit(fromNum, toNum, serializeMessage(message, id, broadcastAddressAndPort(), to.broadcastAddressAndPort()));
+                return cluster.filters().permitOutbound(fromNum, toNum, serializeMessage(message, id,
+                                                                                 broadcastAddress(),
+                                                                                 to.broadcastAddress()));
             }
 
             public boolean allowIncomingMessage(MessageIn message, int id)
             {
-                return true;
+                // Port is not passed in, so take a best guess at the destination port from this instance
+                IInstance from = cluster.get(NetworkTopology.addressAndPort(message.from,
+                                                                            instance.config().broadcastAddress().getPort()));
+                int fromNum = from.config().num();
+                int toNum = config().num();
+
+
+                IMessage msg = serializeMessage(message, id, from.broadcastAddress(), broadcastAddress());
+
+                return cluster.filters().permitInbound(fromNum, toNum, msg);
             }
         });
     }
 
-    public static IMessage serializeMessage(MessageOut messageOut, int id, InetAddressAndPort from, InetAddressAndPort to)
+    public static IMessage serializeMessage(MessageOut messageOut, int id, InetSocketAddress from, InetSocketAddress to)
     {
         try (DataOutputBuffer out = new DataOutputBuffer(1024))
         {
-            int version = MessagingService.instance().getVersion(to.address);
+            int version = MessagingService.instance().getVersion(to.getAddress());
 
             out.writeInt(MessagingService.PROTOCOL_MAGIC);
             out.writeInt(id);
             long timestamp = System.currentTimeMillis();
             out.writeInt((int) timestamp);
             messageOut.serialize(out, version);
-            return new Message(messageOut.verb.ordinal(), out.toByteArray(), id, version, from);
+            return new MessageImpl(messageOut.verb.ordinal(), out.toByteArray(), id, version, from);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static IMessage serializeMessage(MessageIn<?> messageIn, int id, InetSocketAddress from, InetSocketAddress to)
+    {
+        try (DataOutputBuffer out = new DataOutputBuffer(1024))
+        {
+            // Serialize header
+            int version = MessagingService.instance().getVersion(to.getAddress());
+
+            out.writeInt(MessagingService.PROTOCOL_MAGIC);
+            out.writeInt(id);
+            long timestamp = System.currentTimeMillis();
+            out.writeInt((int) timestamp);
 
 Review comment:
   Should use `java.lang.Math#toIntExact` else could hit a bug and see negative on reading.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org