You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2018/09/12 15:02:52 UTC

cassandra git commit: Fail query on transient replica if coordinator only expects full data

Repository: cassandra
Updated Branches:
  refs/heads/trunk 8a73427c6 -> 2046c30ad


Fail query on transient replica if coordinator only expects full data

Patch by Alex Petrov; reviewed by Benedict Elliott Smith for CASSANDRA-14704


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2046c30a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2046c30a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2046c30a

Branch: refs/heads/trunk
Commit: 2046c30adec194fb07bc5dd1c31fc19a64e7895c
Parents: 8a73427
Author: Alex Petrov <ol...@gmail.com>
Authored: Fri Sep 7 11:02:55 2018 +0200
Committer: Alex Petrov <ol...@gmail.com>
Committed: Wed Sep 12 17:02:02 2018 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/db/ReadCommandVerbHandler.java    | 42 +++++++++++
 .../locator/AbstractReplicationStrategy.java    |  7 ++
 .../apache/cassandra/service/StorageProxy.java  |  7 +-
 .../unit/org/apache/cassandra/SchemaLoader.java | 20 +++--
 .../org/apache/cassandra/cql3/CQLTester.java    |  1 +
 .../db/ReadCommandVerbHandlerTest.java          | 77 +++++++++++++-------
 7 files changed, 121 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2046c30a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 264c80f..a30bec7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Fail query on transient replica if coordinator only expects full data (CASSANDRA-14704)
  * Remove mentions of transient replication from repair path (CASSANDRA-14698)
  * Fix handleRepairStatusChangedNotification to remove first then add (CASSANDRA-14720)
  * Allow transient node to serve as a repair coordinator (CASSANDRA-14693)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2046c30a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
index 1b28c2c..0e97dd8 100644
--- a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
@@ -17,8 +17,14 @@
  */
 package org.apache.cassandra.db;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
@@ -29,6 +35,8 @@ import org.apache.cassandra.tracing.Tracing;
 
 public class ReadCommandVerbHandler implements IVerbHandler<ReadCommand>
 {
+    private static final Logger logger = LoggerFactory.getLogger(ReadCommandVerbHandler.class);
+
     protected IVersionedSerializer<ReadResponse> serializer()
     {
         return ReadResponse.serializer;
@@ -42,6 +50,8 @@ public class ReadCommandVerbHandler implements IVerbHandler<ReadCommand>
         }
 
         ReadCommand command = message.payload;
+        validateTransientStatus(message);
+
         command.setMonitoringTime(message.constructionTime, message.isCrossNode(), message.getTimeout(), message.getSlowQueryTimeout());
 
         if (message.parameters.containsKey(ParameterType.TRACK_REPAIRED_DATA))
@@ -65,4 +75,36 @@ public class ReadCommandVerbHandler implements IVerbHandler<ReadCommand>
         MessageOut<ReadResponse> reply = new MessageOut<>(MessagingService.Verb.REQUEST_RESPONSE, response, serializer());
         MessagingService.instance().sendReply(reply, id, message.from);
     }
+
+    private void validateTransientStatus(MessageIn<ReadCommand> message)
+    {
+        ReadCommand command = message.payload;
+        Token token;
+
+        if (command.isLimitedToOnePartition())
+            token = ((SinglePartitionReadCommand) command).partitionKey().getToken();
+        else
+            token = ((PartitionRangeReadCommand) command).dataRange().keyRange().right.getToken();
+
+        Replica replica = Keyspace.open(command.metadata().keyspace)
+                                  .getReplicationStrategy()
+                                  .getLocalReplicaFor(token);
+
+        if (replica == null)
+        {
+            logger.warn("Received a read request from {} for a range that is not owned by the current replica {}.",
+                        message.from,
+                        command);
+            return;
+        }
+
+        if (!command.acceptsTransient() && replica.isTransient())
+        {
+            MessagingService.instance().incrementDroppedMessages(message, message.getLifetimeInMS());
+            throw new InvalidRequestException(String.format("Attempted to serve %s data request from %s node in %s",
+                                                            replica.isTransient() ? "transient" : "full",
+                                                            command.acceptsTransient() ? "transient" : "full",
+                                                            this));
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2046c30a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
index 0ddc0a4..d168052 100644
--- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
@@ -124,6 +124,13 @@ public abstract class AbstractReplicationStrategy
         return endpoints;
     }
 
+    public Replica getLocalReplicaFor(RingPosition searchPosition)
+    {
+        return getNaturalReplicas(searchPosition)
+               .byEndpoint()
+               .get(FBUtilities.getBroadcastAddressAndPort());
+    }
+
     /**
      * calculate the natural endpoints for the given token
      *

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2046c30a/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 9d9c628..5eb43cf 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1919,7 +1919,7 @@ public class StorageProxy implements StorageProxyMBean
             EndpointsForRange targetReplicas = consistency.filterForQuery(keyspace, liveReplicas);
             int minResponses = Math.min(targetReplicas.size(), blockFor);
 
-            // Endpoitns for range here as well
+            // Endpoints for range here as well
             return ReplicaLayout.forRangeRead(keyspace, consistency, range,
                                               liveReplicas, targetReplicas.subList(0, minResponses));
         }
@@ -2146,9 +2146,10 @@ public class StorageProxy implements StorageProxyMBean
                 for (Replica replica : replicaLayout.selected())
                 {
                     Tracing.trace("Enqueuing request to {}", replica);
-                    MessageOut<ReadCommand> message = rangeCommand.createMessage();
+                    PartitionRangeReadCommand command = replica.isFull() ? rangeCommand : rangeCommand.copyAsTransientQuery();
+                    MessageOut<ReadCommand> message = command.createMessage();
                     if (command.isTrackingRepairedStatus() && replica.isFull())
-                        message =  message.withParameter(ParameterType.TRACK_REPAIRED_DATA, MessagingService.ONE_BYTE);
+                        message = message.withParameter(ParameterType.TRACK_REPAIRED_DATA, MessagingService.ONE_BYTE);
                     MessagingService.instance().sendRRWithFailure(message, replica.endpoint(), handler);
                 }
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2046c30a/test/unit/org/apache/cassandra/SchemaLoader.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java b/test/unit/org/apache/cassandra/SchemaLoader.java
index 62b9670..41f8095 100644
--- a/test/unit/org/apache/cassandra/SchemaLoader.java
+++ b/test/unit/org/apache/cassandra/SchemaLoader.java
@@ -102,6 +102,8 @@ public class SchemaLoader
         String ks_nocommit = testName + "NoCommitlogSpace";
         String ks_prsi = testName + "PerRowSecondaryIndex";
         String ks_cql = testName + "cql_keyspace";
+        String ks_cql_replicated = testName + "cql_keyspace_replicated";
+        String ks_with_transient = testName + "ks_with_transient";
 
         AbstractType bytes = BytesType.instance;
 
@@ -218,16 +220,16 @@ public class SchemaLoader
         schema.add(KeyspaceMetadata.create(ks_nocommit, KeyspaceParams.simpleTransient(1), Tables.of(
                 standardCFMD(ks_nocommit, "Standard1").build())));
 
+        String simpleTable = "CREATE TABLE table1 ("
+                             + "k int PRIMARY KEY,"
+                             + "v1 text,"
+                             + "v2 int"
+                             + ")";
         // CQLKeyspace
         schema.add(KeyspaceMetadata.create(ks_cql, KeyspaceParams.simple(1), Tables.of(
 
         // Column Families
-        CreateTableStatement.parse("CREATE TABLE table1 ("
-                                   + "k int PRIMARY KEY,"
-                                   + "v1 text,"
-                                   + "v2 int"
-                                   + ")", ks_cql)
-                            .build(),
+        CreateTableStatement.parse(simpleTable, ks_cql).build(),
 
         CreateTableStatement.parse("CREATE TABLE table2 ("
                                    + "k text,"
@@ -237,6 +239,12 @@ public class SchemaLoader
                             .build()
         )));
 
+        schema.add(KeyspaceMetadata.create(ks_cql_replicated, KeyspaceParams.simple(3),
+                                           Tables.of(CreateTableStatement.parse(simpleTable, ks_cql_replicated).build())));
+
+        schema.add(KeyspaceMetadata.create(ks_with_transient, KeyspaceParams.simple("3/1"),
+                                           Tables.of(CreateTableStatement.parse(simpleTable, ks_with_transient).build())));
+
         if (DatabaseDescriptor.getPartitioner() instanceof Murmur3Partitioner)
         {
             schema.add(KeyspaceMetadata.create("sasi",

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2046c30a/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index adadb9c..e6b0e29 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -190,6 +190,7 @@ public abstract class CQLTester
             return;
 
         DatabaseDescriptor.daemonInitialization();
+        DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true);
 
         // Cleanup first
         try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2046c30a/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerTest.java b/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerTest.java
index 0c43661..b7e053b 100644
--- a/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerTest.java
@@ -29,11 +29,14 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.net.IMessageSink;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
@@ -41,27 +44,41 @@ import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.ParameterType;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 
+import static org.apache.cassandra.Util.token;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 public class ReadCommandVerbHandlerTest
 {
+    private final static Random random = new Random();
+
+    private static ReadCommandVerbHandler handler;
+    private static TableMetadata metadata;
+    private static TableMetadata metadata_with_transient;
+    private static DecoratedKey KEY;
+
     private static final String TEST_NAME = "read_command_vh_test_";
-    private static final String KEYSPACE = TEST_NAME + "cql_keyspace";
+    private static final String KEYSPACE = TEST_NAME + "cql_keyspace_replicated";
+    private static final String KEYSPACE_WITH_TRANSIENT = TEST_NAME + "ks_with_transient";
     private static final String TABLE = "table1";
 
-    private final Random random = new Random();
-    private ReadCommandVerbHandler handler;
-    private TableMetadata metadata;
-
     @BeforeClass
-    public static void init()
+    public static void init() throws Throwable
     {
         SchemaLoader.loadSchema();
         SchemaLoader.schemaDefinition(TEST_NAME);
+        metadata = Schema.instance.getTableMetadata(KEYSPACE, TABLE);
+        metadata_with_transient = Schema.instance.getTableMetadata(KEYSPACE_WITH_TRANSIENT, TABLE);
+        KEY = key(metadata, 1);
+
+        TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+        tmd.updateNormalToken(KEY.getToken(), InetAddressAndPort.getByName("127.0.0.2"));
+        tmd.updateNormalToken(key(metadata, 2).getToken(), InetAddressAndPort.getByName("127.0.0.3"));
+        tmd.updateNormalToken(key(metadata, 3).getToken(), FBUtilities.getBroadcastAddressAndPort());
     }
 
     @Before
@@ -81,14 +98,13 @@ public class ReadCommandVerbHandlerTest
             }
         });
 
-        metadata = Schema.instance.getTableMetadata(KEYSPACE, TABLE);
         handler = new ReadCommandVerbHandler();
     }
 
     @Test
     public void setRepairedDataTrackingFlagIfHeaderPresent()
     {
-        ReadCommand command = command(key());
+        SinglePartitionReadCommand command = command(metadata);
         assertFalse(command.isTrackingRepairedStatus());
         Map<ParameterType, Object> params = ImmutableMap.of(ParameterType.TRACK_REPAIRED_DATA,
                                                             MessagingService.ONE_BYTE);
@@ -104,7 +120,7 @@ public class ReadCommandVerbHandlerTest
     @Test
     public void dontSetRepairedDataTrackingFlagUnlessHeaderPresent()
     {
-        ReadCommand command = command(key());
+        SinglePartitionReadCommand command = command(metadata);
         assertFalse(command.isTrackingRepairedStatus());
         Map<ParameterType, Object> params = ImmutableMap.of(ParameterType.TRACE_SESSION,
                                                             UUID.randomUUID());
@@ -120,7 +136,7 @@ public class ReadCommandVerbHandlerTest
     @Test
     public void dontSetRepairedDataTrackingFlagIfHeadersEmpty()
     {
-        ReadCommand command = command(key());
+        SinglePartitionReadCommand command = command(metadata);
         assertFalse(command.isTrackingRepairedStatus());
         handler.doVerb(MessageIn.create(peer(),
                                         command,
@@ -131,17 +147,24 @@ public class ReadCommandVerbHandlerTest
         assertFalse(command.isTrackingRepairedStatus());
     }
 
-    private int key()
+    @Test (expected = InvalidRequestException.class)
+    public void rejectsRequestWithNonMatchingTransientness()
     {
-        return random.nextInt();
+        SinglePartitionReadCommand command = command(metadata_with_transient);
+        handler.doVerb(MessageIn.create(peer(),
+                                        command,
+                                        ImmutableMap.of(),
+                                        MessagingService.Verb.READ,
+                                        MessagingService.current_version),
+                       messageId());
     }
 
-    private int messageId()
+    private static int messageId()
     {
         return random.nextInt();
     }
 
-    private InetAddressAndPort peer()
+    private static InetAddressAndPort peer()
     {
         try
         {
@@ -153,19 +176,23 @@ public class ReadCommandVerbHandlerTest
         }
     }
 
-    private ReadCommand command(int key)
+    private static SinglePartitionReadCommand command(TableMetadata metadata)
     {
         return new SinglePartitionReadCommand(false,
-              0,
-              false,
-              metadata,
-              FBUtilities.nowInSeconds(),
-              ColumnFilter.all(metadata),
-              RowFilter.NONE,
-              DataLimits.NONE,
-              metadata.partitioner.decorateKey(ByteBufferUtil.bytes(key)),
-              new ClusteringIndexSliceFilter(Slices.ALL, false),
-              null);
+                                              0,
+                                              false,
+                                              metadata,
+                                              FBUtilities.nowInSeconds(),
+                                              ColumnFilter.all(metadata),
+                                              RowFilter.NONE,
+                                              DataLimits.NONE,
+                                              KEY,
+                                              new ClusteringIndexSliceFilter(Slices.ALL, false),
+                                              null);
     }
 
+    private static DecoratedKey key(TableMetadata metadata, int key)
+    {
+        return metadata.partitioner.decorateKey(ByteBufferUtil.bytes(key));
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org