You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2020/01/31 13:44:54 UTC

[cassandra] branch cassandra-3.11 updated (ffab2b8 -> f09e1be)

This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a change to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git.


    from ffab2b8  C* 3.0 sstables w/ UDTs are corrupted in C* 3.11 and 4.0
     new b2f2c70  Add message interceptors to in-jvm dtests
     new a270929  Merge branch 'cassandra-2.2' into cassandra-3.0
     new f09e1be  Merge branch 'cassandra-3.0' into cassandra-3.11

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/cassandra/distributed/api/IMessage.java |   8 +-
 .../cassandra/distributed/api/IMessageFilters.java |  28 ++-
 .../distributed/impl/AbstractCluster.java          |  17 +-
 .../distributed/impl/IInvokableInstance.java       |   1 -
 .../cassandra/distributed/impl/Instance.java       | 142 ++++++++------
 .../cassandra/distributed/impl/MessageFilters.java |  76 ++++----
 .../distributed/test/MessageFiltersTest.java       | 210 +++++++++++++++++++++
 7 files changed, 366 insertions(+), 116 deletions(-)
 create mode 100644 test/distributed/org/apache/cassandra/distributed/test/MessageFiltersTest.java


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/01: Merge branch 'cassandra-3.0' into cassandra-3.11

Posted by if...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit f09e1be19e5368bf18b4a81c9bed3bf0fc6643eb
Merge: ffab2b8 a270929
Author: Alex Petrov <ol...@gmail.com>
AuthorDate: Fri Jan 31 14:39:52 2020 +0100

    Merge branch 'cassandra-3.0' into cassandra-3.11

 .../apache/cassandra/distributed/api/IMessage.java |   8 +-
 .../cassandra/distributed/api/IMessageFilters.java |  28 ++-
 .../distributed/impl/AbstractCluster.java          |  17 +-
 .../distributed/impl/IInvokableInstance.java       |   1 -
 .../cassandra/distributed/impl/Instance.java       | 142 ++++++++------
 .../cassandra/distributed/impl/MessageFilters.java |  76 ++++----
 .../distributed/test/MessageFiltersTest.java       | 210 +++++++++++++++++++++
 7 files changed, 366 insertions(+), 116 deletions(-)

diff --cc test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 986c864,5a4dcf4..bf4889e
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@@ -236,6 -259,6 +260,7 @@@ public class Instance extends IsolatedE
      {
          private final BiConsumer<InetAddressAndPort, IMessage> deliver;
          private final Function<InetAddress, InetAddressAndPort> lookupAddressAndPort;
++
          MessageDeliverySink(BiConsumer<InetAddressAndPort, IMessage> deliver, Function<InetAddress, InetAddressAndPort> lookupAddressAndPort)
          {
              this.deliver = deliver;
@@@ -244,46 -267,34 +269,35 @@@
  
          public boolean allowOutgoingMessage(MessageOut messageOut, int id, InetAddress to)
          {
-             try (DataOutputBuffer out = new DataOutputBuffer(1024))
++
+             InetAddressAndPort from = broadcastAddressAndPort();
+             assert from.equals(lookupAddressAndPort.apply(messageOut.from));
 -            InetAddressAndPort toFull = lookupAddressAndPort.apply(to);
 -            IMessage message = serializeMessage(messageOut, id, from, toFull);
+ 
+             // Tracing logic - similar to org.apache.cassandra.net.OutboundTcpConnection.writeConnected
+             byte[] sessionBytes = (byte[]) messageOut.parameters.get(Tracing.TRACE_HEADER);
+             if (sessionBytes != null)
              {
-                 InetAddressAndPort from = broadcastAddressAndPort();
-                 assert from.equals(lookupAddressAndPort.apply(messageOut.from));
-                 InetAddressAndPort toFull = lookupAddressAndPort.apply(to);
-                 int version = MessagingService.instance().getVersion(to);
- 
-                 // Tracing logic - similar to org.apache.cassandra.net.OutboundTcpConnection.writeConnected
-                 byte[] sessionBytes = (byte[]) messageOut.parameters.get(Tracing.TRACE_HEADER);
-                 if (sessionBytes != null)
+                 UUID sessionId = UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes));
+                 TraceState state = Tracing.instance.get(sessionId);
 -                String traceMessage = String.format("Sending %s message to %s", messageOut.verb, toFull.address);
++                String message = String.format("Sending %s message to %s", messageOut.verb, to);
+                 // session may have already finished; see CASSANDRA-5668
+                 if (state == null)
                  {
-                     UUID sessionId = UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes));
-                     TraceState state = Tracing.instance.get(sessionId);
-                     String message = String.format("Sending %s message to %s", messageOut.verb, toFull.address);
-                     // session may have already finished; see CASSANDRA-5668
-                     if (state == null)
-                     {
-                         byte[] traceTypeBytes = (byte[]) messageOut.parameters.get(Tracing.TRACE_TYPE);
-                         Tracing.TraceType traceType = traceTypeBytes == null ? Tracing.TraceType.QUERY : Tracing.TraceType.deserialize(traceTypeBytes[0]);
-                         Tracing.instance.trace(ByteBuffer.wrap(sessionBytes), message, traceType.getTTL());
-                     }
-                     else
-                     {
-                         state.trace(message);
-                         if (messageOut.verb == MessagingService.Verb.REQUEST_RESPONSE)
-                             Tracing.instance.doneWithNonLocalSession(state);
-                     }
+                     byte[] traceTypeBytes = (byte[]) messageOut.parameters.get(Tracing.TRACE_TYPE);
+                     Tracing.TraceType traceType = traceTypeBytes == null ? Tracing.TraceType.QUERY : Tracing.TraceType.deserialize(traceTypeBytes[0]);
 -                    TraceState.mutateWithTracing(ByteBuffer.wrap(sessionBytes), traceMessage, -1, traceType.getTTL());
++                    Tracing.instance.trace(ByteBuffer.wrap(sessionBytes), message, traceType.getTTL());
+                 }
+                 else
+                 {
 -                    state.trace(traceMessage);
++                    state.trace(message);
+                     if (messageOut.verb == MessagingService.Verb.REQUEST_RESPONSE)
+                         Tracing.instance.doneWithNonLocalSession(state);
                  }
- 
-                 out.writeInt(MessagingService.PROTOCOL_MAGIC);
-                 out.writeInt(id);
-                 long timestamp = System.currentTimeMillis();
-                 out.writeInt((int) timestamp);
-                 messageOut.serialize(out, version);
-                 deliver.accept(toFull, new Message(messageOut.verb.ordinal(), out.toByteArray(), id, version, from));
-             }
-             catch (IOException e)
-             {
-                 throw new RuntimeException(e);
              }
+ 
 -            deliver.accept(toFull, message);
++            InetAddressAndPort toFull = lookupAddressAndPort.apply(to);
++            deliver.accept(toFull, serializeMessage(messageOut, id, from, toFull));
++
              return false;
          }
  
@@@ -294,39 -305,47 +308,51 @@@
          }
      }
  
 -
 -    public static Pair<MessageIn<Object>, Integer> deserializeMessage(IMessage imessage)
++    public static MessageIn<Object> deserializeMessage(IMessage imessage)
+     {
+         // Based on org.apache.cassandra.net.IncomingTcpConnection.receiveMessage
+         try (DataInputBuffer input = new DataInputBuffer(imessage.bytes()))
+         {
+             int version = imessage.version();
+             if (version > MessagingService.current_version)
+             {
+                 throw new IllegalStateException(String.format("Received message version %d but current version is %d",
+                                                               version,
+                                                               MessagingService.current_version));
+             }
+ 
+             MessagingService.validateMagic(input.readInt());
+             int id;
+             if (version < MessagingService.VERSION_20)
+                 id = Integer.parseInt(input.readUTF());
+             else
+                 id = input.readInt();
 -            if (imessage.id() != id)
 -                throw new IllegalStateException(String.format("Message id mismatch: %d != %d", imessage.id(), id));
 -
 -            // make sure to readInt, even if cross_node_to is not enabled
 -            int partial = input.readInt();
 -
 -            return Pair.create(MessageIn.read(input, version, id), partial);
++            long currentTime = ApproximateTime.currentTimeMillis();
++            return MessageIn.read(input, version, id, MessageIn.readConstructionTime(imessage.from().address, input, currentTime));
+         }
 -        catch (IOException e)
++        catch (Throwable t)
+         {
 -            throw new RuntimeException();
++            throw new RuntimeException(t);
+         }
+     }
+ 
      public void receiveMessage(IMessage imessage)
      {
          sync(() -> {
 -            Pair<MessageIn<Object>, Integer> deserialized = null;
 +            // Based on org.apache.cassandra.net.IncomingTcpConnection.receiveMessage
-             try (DataInputBuffer input = new DataInputBuffer(imessage.bytes()))
+             try
              {
-                 int version = imessage.version();
-                 if (version > MessagingService.current_version)
-                 {
-                     throw new IllegalStateException(String.format("Node%d received message version %d but current version is %d",
-                                                                   this.config.num(),
-                                                                   version,
-                                                                   MessagingService.current_version));
-                 }
- 
-                 MessagingService.validateMagic(input.readInt());
-                 int id;
-                 if (version < MessagingService.VERSION_20)
-                     id = Integer.parseInt(input.readUTF());
-                 else
-                     id = input.readInt();
-                 long currentTime = ApproximateTime.currentTimeMillis();
-                 MessageIn message = MessageIn.read(input, version, id, MessageIn.readConstructionTime(imessage.from().address, input, currentTime));
 -                deserialized = deserializeMessage(imessage);
++                MessageIn message = deserializeMessage(imessage);
 +                if (message == null)
 +                {
 +                    // callback expired; nothing to do
 +                    return;
 +                }
-                 if (version <= MessagingService.current_version)
++                if (message.version <= MessagingService.current_version)
 +                {
-                     MessagingService.instance().receive(message, id);
++                    MessagingService.instance().receive(message, imessage.id());
 +                }
 +                // else ignore message
              }
              catch (Throwable t)
              {
diff --cc test/distributed/org/apache/cassandra/distributed/impl/MessageFilters.java
index c1607f8,833677b..c92553f
--- a/test/distributed/org/apache/cassandra/distributed/impl/MessageFilters.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/MessageFilters.java
@@@ -19,16 -19,14 +19,11 @@@
  package org.apache.cassandra.distributed.impl;
  
  import java.util.Arrays;
- import java.util.Set;
- import java.util.concurrent.CopyOnWriteArraySet;
- import java.util.function.BiConsumer;
+ import java.util.List;
+ import java.util.concurrent.CopyOnWriteArrayList;
 -import java.util.function.Supplier;
 -
 -import com.google.common.annotations.VisibleForTesting;
  
- import org.apache.cassandra.distributed.api.IInstance;
  import org.apache.cassandra.distributed.api.IMessage;
  import org.apache.cassandra.distributed.api.IMessageFilters;
- import org.apache.cassandra.distributed.api.ICluster;
- import org.apache.cassandra.locator.InetAddressAndPort;
- import org.apache.cassandra.net.MessagingService;
  
  public class MessageFilters implements IMessageFilters
  {
diff --cc test/distributed/org/apache/cassandra/distributed/test/MessageFiltersTest.java
index 0000000,07e7428..ca41030
mode 000000,100644..100644
--- a/test/distributed/org/apache/cassandra/distributed/test/MessageFiltersTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/MessageFiltersTest.java
@@@ -1,0 -1,210 +1,210 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.distributed.test;
+ 
+ import java.util.Arrays;
++import java.util.HashSet;
+ import java.util.Set;
+ import java.util.concurrent.atomic.AtomicInteger;
+ 
 -import com.google.common.collect.Sets;
+ import org.junit.Assert;
+ import org.junit.Test;
+ 
+ import org.apache.cassandra.db.ConsistencyLevel;
+ import org.apache.cassandra.distributed.Cluster;
+ import org.apache.cassandra.distributed.api.IIsolatedExecutor;
+ import org.apache.cassandra.distributed.api.IMessage;
+ import org.apache.cassandra.distributed.api.IMessageFilters;
+ import org.apache.cassandra.distributed.impl.Instance;
+ import org.apache.cassandra.distributed.impl.MessageFilters;
+ import org.apache.cassandra.locator.InetAddressAndPort;
+ import org.apache.cassandra.net.MessageIn;
+ import org.apache.cassandra.net.MessagingService;
+ 
+ public class MessageFiltersTest extends DistributedTestBase
+ {
+     @Test
+     public void simpleFiltersTest() throws Throwable
+     {
+         int VERB1 = MessagingService.Verb.READ.ordinal();
+         int VERB2 = MessagingService.Verb.REQUEST_RESPONSE.ordinal();
+         int VERB3 = MessagingService.Verb.READ_REPAIR.ordinal();
+         int i1 = 1;
+         int i2 = 2;
+         int i3 = 3;
+         String MSG1 = "msg1";
+         String MSG2 = "msg2";
+ 
+         MessageFilters filters = new MessageFilters();
+         MessageFilters.Filter filter = filters.allVerbs().from(1).drop();
+ 
+         Assert.assertFalse(filters.permit(i1, i2, msg(VERB1, MSG1)));
+         Assert.assertFalse(filters.permit(i1, i2, msg(VERB2, MSG1)));
+         Assert.assertFalse(filters.permit(i1, i2, msg(VERB3, MSG1)));
+         Assert.assertTrue(filters.permit(i2, i1, msg(VERB1, MSG1)));
+         filter.off();
+         Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG1)));
+         filters.reset();
+ 
+         filters.verbs(VERB1).from(1).to(2).drop();
+         Assert.assertFalse(filters.permit(i1, i2, msg(VERB1, MSG1)));
+         Assert.assertTrue(filters.permit(i1, i2, msg(VERB2, MSG1)));
+         Assert.assertTrue(filters.permit(i2, i1, msg(VERB1, MSG1)));
+         Assert.assertTrue(filters.permit(i2, i3, msg(VERB2, MSG1)));
+ 
+         filters.reset();
+         AtomicInteger counter = new AtomicInteger();
+         filters.verbs(VERB1).from(1).to(2).messagesMatching((from, to, msg) -> {
+             counter.incrementAndGet();
+             return Arrays.equals(msg.bytes(), MSG1.getBytes());
+         }).drop();
+         Assert.assertFalse(filters.permit(i1, i2, msg(VERB1, MSG1)));
+         Assert.assertEquals(counter.get(), 1);
+         Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG2)));
+         Assert.assertEquals(counter.get(), 2);
+ 
+         // filter chain gets interrupted because a higher level filter returns no match
+         Assert.assertTrue(filters.permit(i2, i1, msg(VERB1, MSG1)));
+         Assert.assertEquals(counter.get(), 2);
+         Assert.assertTrue(filters.permit(i2, i1, msg(VERB2, MSG1)));
+         Assert.assertEquals(counter.get(), 2);
+         filters.reset();
+ 
+         filters.allVerbs().from(3, 2).to(2, 1).drop();
+         Assert.assertFalse(filters.permit(i3, i1, msg(VERB1, MSG1)));
+         Assert.assertFalse(filters.permit(i3, i2, msg(VERB1, MSG1)));
+         Assert.assertFalse(filters.permit(i2, i1, msg(VERB1, MSG1)));
+         Assert.assertTrue(filters.permit(i2, i3, msg(VERB1, MSG1)));
+         Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG1)));
+         Assert.assertTrue(filters.permit(i1, i3, msg(VERB1, MSG1)));
+         filters.reset();
+ 
+         counter.set(0);
+         filters.allVerbs().from(1).to(2).messagesMatching((from, to, msg) -> {
+             counter.incrementAndGet();
+             return false;
+         }).drop();
+         Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG1)));
+         Assert.assertTrue(filters.permit(i1, i3, msg(VERB1, MSG1)));
+         Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG1)));
+         Assert.assertEquals(2, counter.get());
+     }
+ 
+     IMessage msg(int verb, String msg)
+     {
+         return new IMessage()
+         {
+             public int verb() { return verb; }
+             public byte[] bytes() { return msg.getBytes(); }
+             public int id() { return 0; }
+             public int version() { return 0;  }
+             public InetAddressAndPort from() { return null; }
+         };
+     }
+ 
+     @Test
+     public void testFilters() throws Throwable
+     {
+         String read = "SELECT * FROM " + KEYSPACE + ".tbl";
+         String write = "INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)";
+ 
+         try (Cluster cluster = Cluster.create(2))
+         {
+             cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + cluster.size() + "};");
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+ 
+             // Reads and writes are going to time out in both directions
+             cluster.filters().allVerbs().from(1).to(2).drop();
+             for (int i : new int[]{ 1, 2 })
+                 assertTimeOut(() -> cluster.coordinator(i).execute(read, ConsistencyLevel.ALL));
+             for (int i : new int[]{ 1, 2 })
+                 assertTimeOut(() -> cluster.coordinator(i).execute(write, ConsistencyLevel.ALL));
+ 
+             cluster.filters().reset();
+             // Reads are going to timeout only when 1 serves as a coordinator
+             cluster.verbs(MessagingService.Verb.RANGE_SLICE).from(1).to(2).drop();
+             assertTimeOut(() -> cluster.coordinator(1).execute(read, ConsistencyLevel.ALL));
+             cluster.coordinator(2).execute(read, ConsistencyLevel.ALL);
+ 
+             // Writes work in both directions
+             for (int i : new int[]{ 1, 2 })
+                 cluster.coordinator(i).execute(write, ConsistencyLevel.ALL);
+         }
+     }
+ 
+     @Test
+     public void testMessageMatching() throws Throwable
+     {
+         String read = "SELECT * FROM " + KEYSPACE + ".tbl";
+         String write = "INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)";
+ 
+         try (Cluster cluster = Cluster.create(2))
+         {
+             cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + cluster.size() + "};");
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+ 
+             AtomicInteger counter = new AtomicInteger();
+ 
 -            Set<Integer> verbs = Sets.newHashSet(Arrays.asList(MessagingService.Verb.RANGE_SLICE.ordinal(),
 -                                                               MessagingService.Verb.MUTATION.ordinal()));
++            Set<Integer> verbs = new HashSet<>(Arrays.asList(MessagingService.Verb.RANGE_SLICE.ordinal(),
++                                                             MessagingService.Verb.MUTATION.ordinal()));
+ 
+             // Reads and writes are going to time out in both directions
+             IMessageFilters.Filter filter = cluster.filters()
+                                                    .allVerbs()
+                                                    .from(1)
+                                                    .to(2)
+                                                    .messagesMatching((from, to, msg) -> {
+                                                        // Decode and verify message on instance; return the result back here
+                                                        Integer id = cluster.get(1).callsOnInstance((IIsolatedExecutor.SerializableCallable<Integer>) () -> {
 -                                                           MessageIn decoded = Instance.deserializeMessage(msg).left;
++                                                           MessageIn decoded = Instance.deserializeMessage(msg);
+                                                            if (decoded != null)
+                                                                return (Integer) decoded.verb.ordinal();
+                                                            return -1;
+                                                        }).call();
+                                                        if (id > 0)
+                                                            Assert.assertTrue(verbs.contains(id));
+                                                        counter.incrementAndGet();
+                                                        return false;
+                                                    }).drop();
+ 
+             for (int i : new int[]{ 1, 2 })
+                 cluster.coordinator(i).execute(read, ConsistencyLevel.ALL);
+             for (int i : new int[]{ 1, 2 })
+                 cluster.coordinator(i).execute(write, ConsistencyLevel.ALL);
+ 
+             filter.off();
+             Assert.assertEquals(4, counter.get());
+         }
+     }
+ 
+     private static void assertTimeOut(Runnable r)
+     {
+         try
+         {
+             r.run();
+             Assert.fail("Should have timed out");
+         }
+         catch (Throwable t)
+         {
+             if (!t.toString().contains("TimeoutException"))
+                 throw t;
+             // ignore
+         }
+     }
+ }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org