You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2020/01/31 13:44:52 UTC

[cassandra] branch cassandra-3.0 updated (cd82046 -> a270929)

This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a change to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git.


    from cd82046  Merge branch 'cassandra-2.2' into cassandra-3.0
     new b2f2c70  Add message interceptors to in-jvm dtests
     new a270929  Merge branch 'cassandra-2.2' into cassandra-3.0

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/cassandra/distributed/api/IMessage.java |   8 +-
 .../cassandra/distributed/api/IMessageFilters.java |  28 ++-
 .../distributed/impl/AbstractCluster.java          |  17 +-
 .../distributed/impl/IInvokableInstance.java       |   1 -
 .../cassandra/distributed/impl/Instance.java       | 187 ++++++++++--------
 .../cassandra/distributed/impl/MessageFilters.java |  79 ++++----
 .../distributed/test/MessageFiltersTest.java       | 210 +++++++++++++++++++++
 7 files changed, 395 insertions(+), 135 deletions(-)
 create mode 100644 test/distributed/org/apache/cassandra/distributed/test/MessageFiltersTest.java


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/01: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by if...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit a27092926b42a83ec0a1e6188677329737c5a3f4
Merge: cd82046 b2f2c70
Author: Alex Petrov <ol...@gmail.com>
AuthorDate: Fri Jan 31 14:37:25 2020 +0100

    Merge branch 'cassandra-2.2' into cassandra-3.0

 .../apache/cassandra/distributed/api/IMessage.java |   8 +-
 .../cassandra/distributed/api/IMessageFilters.java |  28 ++-
 .../distributed/impl/AbstractCluster.java          |  17 +-
 .../distributed/impl/IInvokableInstance.java       |   1 -
 .../cassandra/distributed/impl/Instance.java       | 187 ++++++++++--------
 .../cassandra/distributed/impl/MessageFilters.java |  79 ++++----
 .../distributed/test/MessageFiltersTest.java       | 210 +++++++++++++++++++++
 7 files changed, 395 insertions(+), 135 deletions(-)

diff --cc test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 3de5ed8,0647198..5a4dcf4
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@@ -192,7 -192,9 +193,10 @@@ public class Instance extends IsolatedE
      {
          BiConsumer<InetAddressAndPort, IMessage> deliverToInstance = (to, message) -> cluster.get(to).receiveMessage(message);
          BiConsumer<InetAddressAndPort, IMessage> deliverToInstanceIfNotFiltered = (to, message) -> {
-             if (cluster.filters().permit(this, cluster.get(to), message.verb()))
+             int fromNum = config().num();
+             int toNum = cluster.get(to).config().num();
++
+             if (cluster.filters().permit(fromNum, toNum, message))
                  deliverToInstance.accept(to, message);
          };
  
@@@ -242,46 -265,35 +267,34 @@@
  
          public boolean allowOutgoingMessage(MessageOut messageOut, int id, InetAddress to)
          {
-             try (DataOutputBuffer out = new DataOutputBuffer(1024))
+             InetAddressAndPort from = broadcastAddressAndPort();
 -            InetAddressAndPort toFull = lookupAddressAndPort.apply(to);
+             assert from.equals(lookupAddressAndPort.apply(messageOut.from));
 -
 -            IMessage serialized = serializeMessage(messageOut, id, broadcastAddressAndPort(), lookupAddressAndPort.apply(messageOut.from));
++            InetAddressAndPort toFull = lookupAddressAndPort.apply(to);
++            IMessage message = serializeMessage(messageOut, id, from, toFull);
+ 
+             // Tracing logic - similar to org.apache.cassandra.net.OutboundTcpConnection.writeConnected
+             byte[] sessionBytes = (byte[]) messageOut.parameters.get(Tracing.TRACE_HEADER);
+             if (sessionBytes != null)
              {
-                 InetAddressAndPort from = broadcastAddressAndPort();
-                 assert from.equals(lookupAddressAndPort.apply(messageOut.from));
-                 InetAddressAndPort toFull = lookupAddressAndPort.apply(to);
-                 int version = MessagingService.instance().getVersion(to);
- 
-                 // Tracing logic - similar to org.apache.cassandra.net.OutboundTcpConnection.writeConnected
-                 byte[] sessionBytes = (byte[]) messageOut.parameters.get(Tracing.TRACE_HEADER);
-                 if (sessionBytes != null)
+                 UUID sessionId = UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes));
+                 TraceState state = Tracing.instance.get(sessionId);
 -                String message = String.format("Sending %s message to %s", messageOut.verb, to);
++                String traceMessage = String.format("Sending %s message to %s", messageOut.verb, toFull.address);
+                 // session may have already finished; see CASSANDRA-5668
+                 if (state == null)
                  {
-                     UUID sessionId = UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes));
-                     TraceState state = Tracing.instance.get(sessionId);
-                     String message = String.format("Sending %s message to %s", messageOut.verb, toFull.address);
-                     // session may have already finished; see CASSANDRA-5668
-                     if (state == null)
-                     {
-                         byte[] traceTypeBytes = (byte[]) messageOut.parameters.get(Tracing.TRACE_TYPE);
-                         Tracing.TraceType traceType = traceTypeBytes == null ? Tracing.TraceType.QUERY : Tracing.TraceType.deserialize(traceTypeBytes[0]);
-                         TraceState.mutateWithTracing(ByteBuffer.wrap(sessionBytes), message, -1, traceType.getTTL());
-                     }
-                     else
-                     {
-                         state.trace(message);
-                         if (messageOut.verb == MessagingService.Verb.REQUEST_RESPONSE)
-                             Tracing.instance.doneWithNonLocalSession(state);
-                     }
+                     byte[] traceTypeBytes = (byte[]) messageOut.parameters.get(Tracing.TRACE_TYPE);
+                     Tracing.TraceType traceType = traceTypeBytes == null ? Tracing.TraceType.QUERY : Tracing.TraceType.deserialize(traceTypeBytes[0]);
 -                    TraceState.mutateWithTracing(ByteBuffer.wrap(sessionBytes), message, -1, traceType.getTTL());
++                    TraceState.mutateWithTracing(ByteBuffer.wrap(sessionBytes), traceMessage, -1, traceType.getTTL());
+                 }
+                 else
+                 {
 -                    state.trace(message);
++                    state.trace(traceMessage);
+                     if (messageOut.verb == MessagingService.Verb.REQUEST_RESPONSE)
+                         Tracing.instance.doneWithNonLocalSession(state);
                  }
- 
-                 out.writeInt(MessagingService.PROTOCOL_MAGIC);
-                 out.writeInt(id);
-                 long timestamp = System.currentTimeMillis();
-                 out.writeInt((int) timestamp);
-                 messageOut.serialize(out, version);
-                 deliver.accept(toFull, new Message(messageOut.verb.ordinal(), out.toByteArray(), id, version, from));
-             }
-             catch (IOException e)
-             {
-                 throw new RuntimeException(e);
              }
+ 
 -            deliver.accept(toFull, serialized);
++            deliver.accept(toFull, message);
              return false;
          }
  
@@@ -292,50 -304,46 +305,47 @@@
          }
      }
  
-     public void receiveMessage(IMessage imessage)
 -    public static Pair<MessageIn<Object>, Integer> deserializeMessage(IMessage msg)
++
++    public static Pair<MessageIn<Object>, Integer> deserializeMessage(IMessage imessage)
      {
-         sync(() -> {
-             // Based on org.apache.cassandra.net.IncomingTcpConnection.receiveMessage
-             try (DataInputBuffer input = new DataInputBuffer(imessage.bytes()))
+         // Based on org.apache.cassandra.net.IncomingTcpConnection.receiveMessage
 -        try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(msg.bytes())))
++        try (DataInputBuffer input = new DataInputBuffer(imessage.bytes()))
+         {
 -            int version = msg.version();
++            int version = imessage.version();
+             if (version > MessagingService.current_version)
              {
-                 int version = imessage.version();
-                 if (version > MessagingService.current_version)
-                 {
-                     throw new IllegalStateException(String.format("Node%d received message version %d but current version is %d",
-                                                                   this.config.num(),
-                                                                   version,
-                                                                   MessagingService.current_version));
-                 }
+                 throw new IllegalStateException(String.format("Received message version %d but current version is %d",
+                                                               version,
+                                                               MessagingService.current_version));
+             }
  
-                 MessagingService.validateMagic(input.readInt());
-                 int id;
-                 if (version < MessagingService.VERSION_20)
-                     id = Integer.parseInt(input.readUTF());
-                 else
-                     id = input.readInt();
+             MessagingService.validateMagic(input.readInt());
+             int id;
+             if (version < MessagingService.VERSION_20)
+                 id = Integer.parseInt(input.readUTF());
+             else
+                 id = input.readInt();
 -            if (msg.id() != id)
 -                throw new IllegalStateException(String.format("Message id mismatch: %d != %d", msg.id(), id));
++            if (imessage.id() != id)
++                throw new IllegalStateException(String.format("Message id mismatch: %d != %d", imessage.id(), id));
  
-                 long timestamp = System.currentTimeMillis();
-                 boolean isCrossNodeTimestamp = false;
-                 // make sure to readInt, even if cross_node_to is not enabled
-                 int partial = input.readInt();
-                 if (DatabaseDescriptor.hasCrossNodeTimeout())
-                 {
-                     long crossNodeTimestamp = (timestamp & 0xFFFFFFFF00000000L) | (((partial & 0xFFFFFFFFL) << 2) >> 2);
-                     isCrossNodeTimestamp = (timestamp != crossNodeTimestamp);
-                     timestamp = crossNodeTimestamp;
-                 }
+             // make sure to readInt, even if cross_node_to is not enabled
+             int partial = input.readInt();
  
-                 MessageIn message = MessageIn.read(input, version, id);
-                 if (message == null)
-                 {
-                     // callback expired; nothing to do
-                     return;
-                 }
-                 if (version <= MessagingService.current_version)
-                 {
-                     MessagingService.instance().receive(message, id, timestamp, isCrossNodeTimestamp);
-                 }
-                 // else ignore message
+             return Pair.create(MessageIn.read(input, version, id), partial);
+         }
+         catch (IOException e)
+         {
+             throw new RuntimeException();
+         }
+     }
+ 
+     public void receiveMessage(IMessage imessage)
+     {
+         sync(() -> {
+             Pair<MessageIn<Object>, Integer> deserialized = null;
+             try
+             {
+                 deserialized = deserializeMessage(imessage);
              }
              catch (Throwable t)
              {
diff --cc test/distributed/org/apache/cassandra/distributed/impl/MessageFilters.java
index c1607f8,c92553f..833677b
--- a/test/distributed/org/apache/cassandra/distributed/impl/MessageFilters.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/MessageFilters.java
@@@ -19,16 -19,11 +19,14 @@@
  package org.apache.cassandra.distributed.impl;
  
  import java.util.Arrays;
- import java.util.Set;
- import java.util.concurrent.CopyOnWriteArraySet;
- import java.util.function.BiConsumer;
+ import java.util.List;
+ import java.util.concurrent.CopyOnWriteArrayList;
++import java.util.function.Supplier;
++
++import com.google.common.annotations.VisibleForTesting;
  
- import org.apache.cassandra.distributed.api.IInstance;
  import org.apache.cassandra.distributed.api.IMessage;
  import org.apache.cassandra.distributed.api.IMessageFilters;
- import org.apache.cassandra.distributed.api.ICluster;
- import org.apache.cassandra.locator.InetAddressAndPort;
- import org.apache.cassandra.net.MessagingService;
  
  public class MessageFilters implements IMessageFilters
  {
diff --cc test/distributed/org/apache/cassandra/distributed/test/MessageFiltersTest.java
index 0000000,96974d8..07e7428
mode 000000,100644..100644
--- a/test/distributed/org/apache/cassandra/distributed/test/MessageFiltersTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/MessageFiltersTest.java
@@@ -1,0 -1,210 +1,210 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.distributed.test;
+ 
+ import java.util.Arrays;
 -import java.util.HashSet;
+ import java.util.Set;
+ import java.util.concurrent.atomic.AtomicInteger;
+ 
++import com.google.common.collect.Sets;
+ import org.junit.Assert;
+ import org.junit.Test;
+ 
+ import org.apache.cassandra.db.ConsistencyLevel;
+ import org.apache.cassandra.distributed.Cluster;
+ import org.apache.cassandra.distributed.api.IIsolatedExecutor;
+ import org.apache.cassandra.distributed.api.IMessage;
+ import org.apache.cassandra.distributed.api.IMessageFilters;
+ import org.apache.cassandra.distributed.impl.Instance;
+ import org.apache.cassandra.distributed.impl.MessageFilters;
+ import org.apache.cassandra.locator.InetAddressAndPort;
+ import org.apache.cassandra.net.MessageIn;
+ import org.apache.cassandra.net.MessagingService;
+ 
+ public class MessageFiltersTest extends DistributedTestBase
+ {
+     @Test
+     public void simpleFiltersTest() throws Throwable
+     {
+         int VERB1 = MessagingService.Verb.READ.ordinal();
+         int VERB2 = MessagingService.Verb.REQUEST_RESPONSE.ordinal();
+         int VERB3 = MessagingService.Verb.READ_REPAIR.ordinal();
+         int i1 = 1;
+         int i2 = 2;
+         int i3 = 3;
+         String MSG1 = "msg1";
+         String MSG2 = "msg2";
+ 
+         MessageFilters filters = new MessageFilters();
+         MessageFilters.Filter filter = filters.allVerbs().from(1).drop();
+ 
+         Assert.assertFalse(filters.permit(i1, i2, msg(VERB1, MSG1)));
+         Assert.assertFalse(filters.permit(i1, i2, msg(VERB2, MSG1)));
+         Assert.assertFalse(filters.permit(i1, i2, msg(VERB3, MSG1)));
+         Assert.assertTrue(filters.permit(i2, i1, msg(VERB1, MSG1)));
+         filter.off();
+         Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG1)));
+         filters.reset();
+ 
+         filters.verbs(VERB1).from(1).to(2).drop();
+         Assert.assertFalse(filters.permit(i1, i2, msg(VERB1, MSG1)));
+         Assert.assertTrue(filters.permit(i1, i2, msg(VERB2, MSG1)));
+         Assert.assertTrue(filters.permit(i2, i1, msg(VERB1, MSG1)));
+         Assert.assertTrue(filters.permit(i2, i3, msg(VERB2, MSG1)));
+ 
+         filters.reset();
+         AtomicInteger counter = new AtomicInteger();
+         filters.verbs(VERB1).from(1).to(2).messagesMatching((from, to, msg) -> {
+             counter.incrementAndGet();
+             return Arrays.equals(msg.bytes(), MSG1.getBytes());
+         }).drop();
+         Assert.assertFalse(filters.permit(i1, i2, msg(VERB1, MSG1)));
+         Assert.assertEquals(counter.get(), 1);
+         Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG2)));
+         Assert.assertEquals(counter.get(), 2);
+ 
+         // filter chain gets interrupted because a higher level filter returns no match
+         Assert.assertTrue(filters.permit(i2, i1, msg(VERB1, MSG1)));
+         Assert.assertEquals(counter.get(), 2);
+         Assert.assertTrue(filters.permit(i2, i1, msg(VERB2, MSG1)));
+         Assert.assertEquals(counter.get(), 2);
+         filters.reset();
+ 
+         filters.allVerbs().from(3, 2).to(2, 1).drop();
+         Assert.assertFalse(filters.permit(i3, i1, msg(VERB1, MSG1)));
+         Assert.assertFalse(filters.permit(i3, i2, msg(VERB1, MSG1)));
+         Assert.assertFalse(filters.permit(i2, i1, msg(VERB1, MSG1)));
+         Assert.assertTrue(filters.permit(i2, i3, msg(VERB1, MSG1)));
+         Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG1)));
+         Assert.assertTrue(filters.permit(i1, i3, msg(VERB1, MSG1)));
+         filters.reset();
+ 
+         counter.set(0);
+         filters.allVerbs().from(1).to(2).messagesMatching((from, to, msg) -> {
+             counter.incrementAndGet();
+             return false;
+         }).drop();
+         Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG1)));
+         Assert.assertTrue(filters.permit(i1, i3, msg(VERB1, MSG1)));
+         Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG1)));
+         Assert.assertEquals(2, counter.get());
+     }
+ 
+     IMessage msg(int verb, String msg)
+     {
+         return new IMessage()
+         {
+             public int verb() { return verb; }
+             public byte[] bytes() { return msg.getBytes(); }
+             public int id() { return 0; }
+             public int version() { return 0;  }
+             public InetAddressAndPort from() { return null; }
+         };
+     }
+ 
+     @Test
+     public void testFilters() throws Throwable
+     {
+         String read = "SELECT * FROM " + KEYSPACE + ".tbl";
+         String write = "INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)";
+ 
+         try (Cluster cluster = Cluster.create(2))
+         {
+             cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + cluster.size() + "};");
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+ 
+             // Reads and writes are going to time out in both directions
+             cluster.filters().allVerbs().from(1).to(2).drop();
+             for (int i : new int[]{ 1, 2 })
+                 assertTimeOut(() -> cluster.coordinator(i).execute(read, ConsistencyLevel.ALL));
+             for (int i : new int[]{ 1, 2 })
+                 assertTimeOut(() -> cluster.coordinator(i).execute(write, ConsistencyLevel.ALL));
+ 
+             cluster.filters().reset();
+             // Reads are going to timeout only when 1 serves as a coordinator
+             cluster.verbs(MessagingService.Verb.RANGE_SLICE).from(1).to(2).drop();
+             assertTimeOut(() -> cluster.coordinator(1).execute(read, ConsistencyLevel.ALL));
+             cluster.coordinator(2).execute(read, ConsistencyLevel.ALL);
+ 
+             // Writes work in both directions
+             for (int i : new int[]{ 1, 2 })
+                 cluster.coordinator(i).execute(write, ConsistencyLevel.ALL);
+         }
+     }
+ 
+     @Test
+     public void testMessageMatching() throws Throwable
+     {
+         String read = "SELECT * FROM " + KEYSPACE + ".tbl";
+         String write = "INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)";
+ 
+         try (Cluster cluster = Cluster.create(2))
+         {
+             cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + cluster.size() + "};");
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+ 
+             AtomicInteger counter = new AtomicInteger();
+ 
 -            Set<Integer> verbs = new HashSet<>(Arrays.asList(MessagingService.Verb.RANGE_SLICE.ordinal(),
 -                                                             MessagingService.Verb.MUTATION.ordinal()));
++            Set<Integer> verbs = Sets.newHashSet(Arrays.asList(MessagingService.Verb.RANGE_SLICE.ordinal(),
++                                                               MessagingService.Verb.MUTATION.ordinal()));
+ 
+             // Reads and writes are going to time out in both directions
+             IMessageFilters.Filter filter = cluster.filters()
+                                                    .allVerbs()
+                                                    .from(1)
+                                                    .to(2)
+                                                    .messagesMatching((from, to, msg) -> {
+                                                        // Decode and verify message on instance; return the result back here
+                                                        Integer id = cluster.get(1).callsOnInstance((IIsolatedExecutor.SerializableCallable<Integer>) () -> {
+                                                            MessageIn decoded = Instance.deserializeMessage(msg).left;
+                                                            if (decoded != null)
+                                                                return (Integer) decoded.verb.ordinal();
+                                                            return -1;
+                                                        }).call();
+                                                        if (id > 0)
+                                                            Assert.assertTrue(verbs.contains(id));
+                                                        counter.incrementAndGet();
+                                                        return false;
+                                                    }).drop();
+ 
+             for (int i : new int[]{ 1, 2 })
+                 cluster.coordinator(i).execute(read, ConsistencyLevel.ALL);
+             for (int i : new int[]{ 1, 2 })
+                 cluster.coordinator(i).execute(write, ConsistencyLevel.ALL);
+ 
+             filter.off();
+             Assert.assertEquals(4, counter.get());
+         }
+     }
+ 
+     private static void assertTimeOut(Runnable r)
+     {
+         try
+         {
+             r.run();
+             Assert.fail("Should have timed out");
+         }
+         catch (Throwable t)
+         {
+             if (!t.toString().contains("TimeoutException"))
+                 throw t;
+             // ignore
+         }
+     }
+ }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org