You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2018/09/12 15:02:52 UTC
cassandra git commit: Fail query on transient replica if coordinator
only expects full data
Repository: cassandra
Updated Branches:
refs/heads/trunk 8a73427c6 -> 2046c30ad
Fail query on transient replica if coordinator only expects full data
Patch by Alex Petrov; reviewed by Benedict Elliott Smith for CASSANDRA-14704
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2046c30a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2046c30a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2046c30a
Branch: refs/heads/trunk
Commit: 2046c30adec194fb07bc5dd1c31fc19a64e7895c
Parents: 8a73427
Author: Alex Petrov <ol...@gmail.com>
Authored: Fri Sep 7 11:02:55 2018 +0200
Committer: Alex Petrov <ol...@gmail.com>
Committed: Wed Sep 12 17:02:02 2018 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/db/ReadCommandVerbHandler.java | 42 +++++++++++
.../locator/AbstractReplicationStrategy.java | 7 ++
.../apache/cassandra/service/StorageProxy.java | 7 +-
.../unit/org/apache/cassandra/SchemaLoader.java | 20 +++--
.../org/apache/cassandra/cql3/CQLTester.java | 1 +
.../db/ReadCommandVerbHandlerTest.java | 77 +++++++++++++-------
7 files changed, 121 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2046c30a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 264c80f..a30bec7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0
+ * Fail query on transient replica if coordinator only expects full data (CASSANDRA-14704)
* Remove mentions of transient replication from repair path (CASSANDRA-14698)
* Fix handleRepairStatusChangedNotification to remove first then add (CASSANDRA-14720)
* Allow transient node to serve as a repair coordinator (CASSANDRA-14693)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2046c30a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
index 1b28c2c..0e97dd8 100644
--- a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
@@ -17,8 +17,14 @@
*/
package org.apache.cassandra.db;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
@@ -29,6 +35,8 @@ import org.apache.cassandra.tracing.Tracing;
public class ReadCommandVerbHandler implements IVerbHandler<ReadCommand>
{
+ private static final Logger logger = LoggerFactory.getLogger(ReadCommandVerbHandler.class);
+
protected IVersionedSerializer<ReadResponse> serializer()
{
return ReadResponse.serializer;
@@ -42,6 +50,8 @@ public class ReadCommandVerbHandler implements IVerbHandler<ReadCommand>
}
ReadCommand command = message.payload;
+ validateTransientStatus(message);
+
command.setMonitoringTime(message.constructionTime, message.isCrossNode(), message.getTimeout(), message.getSlowQueryTimeout());
if (message.parameters.containsKey(ParameterType.TRACK_REPAIRED_DATA))
@@ -65,4 +75,36 @@ public class ReadCommandVerbHandler implements IVerbHandler<ReadCommand>
MessageOut<ReadResponse> reply = new MessageOut<>(MessagingService.Verb.REQUEST_RESPONSE, response, serializer());
MessagingService.instance().sendReply(reply, id, message.from);
}
+
+ private void validateTransientStatus(MessageIn<ReadCommand> message)
+ {
+ ReadCommand command = message.payload;
+ Token token;
+
+ if (command.isLimitedToOnePartition())
+ token = ((SinglePartitionReadCommand) command).partitionKey().getToken();
+ else
+ token = ((PartitionRangeReadCommand) command).dataRange().keyRange().right.getToken();
+
+ Replica replica = Keyspace.open(command.metadata().keyspace)
+ .getReplicationStrategy()
+ .getLocalReplicaFor(token);
+
+ if (replica == null)
+ {
+ logger.warn("Received a read request from {} for a range that is not owned by the current replica {}.",
+ message.from,
+ command);
+ return;
+ }
+
+ if (!command.acceptsTransient() && replica.isTransient())
+ {
+ MessagingService.instance().incrementDroppedMessages(message, message.getLifetimeInMS());
+ throw new InvalidRequestException(String.format("Attempted to serve %s data request from %s node in %s",
+ replica.isTransient() ? "transient" : "full",
+ command.acceptsTransient() ? "transient" : "full",
+ this));
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2046c30a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
index 0ddc0a4..d168052 100644
--- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
@@ -124,6 +124,13 @@ public abstract class AbstractReplicationStrategy
return endpoints;
}
+ public Replica getLocalReplicaFor(RingPosition searchPosition)
+ {
+ return getNaturalReplicas(searchPosition)
+ .byEndpoint()
+ .get(FBUtilities.getBroadcastAddressAndPort());
+ }
+
/**
* calculate the natural endpoints for the given token
*
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2046c30a/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 9d9c628..5eb43cf 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1919,7 +1919,7 @@ public class StorageProxy implements StorageProxyMBean
EndpointsForRange targetReplicas = consistency.filterForQuery(keyspace, liveReplicas);
int minResponses = Math.min(targetReplicas.size(), blockFor);
- // Endpoitns for range here as well
+ // Endpoints for range here as well
return ReplicaLayout.forRangeRead(keyspace, consistency, range,
liveReplicas, targetReplicas.subList(0, minResponses));
}
@@ -2146,9 +2146,10 @@ public class StorageProxy implements StorageProxyMBean
for (Replica replica : replicaLayout.selected())
{
Tracing.trace("Enqueuing request to {}", replica);
- MessageOut<ReadCommand> message = rangeCommand.createMessage();
+ PartitionRangeReadCommand command = replica.isFull() ? rangeCommand : rangeCommand.copyAsTransientQuery();
+ MessageOut<ReadCommand> message = command.createMessage();
if (command.isTrackingRepairedStatus() && replica.isFull())
- message = message.withParameter(ParameterType.TRACK_REPAIRED_DATA, MessagingService.ONE_BYTE);
+ message = message.withParameter(ParameterType.TRACK_REPAIRED_DATA, MessagingService.ONE_BYTE);
MessagingService.instance().sendRRWithFailure(message, replica.endpoint(), handler);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2046c30a/test/unit/org/apache/cassandra/SchemaLoader.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java b/test/unit/org/apache/cassandra/SchemaLoader.java
index 62b9670..41f8095 100644
--- a/test/unit/org/apache/cassandra/SchemaLoader.java
+++ b/test/unit/org/apache/cassandra/SchemaLoader.java
@@ -102,6 +102,8 @@ public class SchemaLoader
String ks_nocommit = testName + "NoCommitlogSpace";
String ks_prsi = testName + "PerRowSecondaryIndex";
String ks_cql = testName + "cql_keyspace";
+ String ks_cql_replicated = testName + "cql_keyspace_replicated";
+ String ks_with_transient = testName + "ks_with_transient";
AbstractType bytes = BytesType.instance;
@@ -218,16 +220,16 @@ public class SchemaLoader
schema.add(KeyspaceMetadata.create(ks_nocommit, KeyspaceParams.simpleTransient(1), Tables.of(
standardCFMD(ks_nocommit, "Standard1").build())));
+ String simpleTable = "CREATE TABLE table1 ("
+ + "k int PRIMARY KEY,"
+ + "v1 text,"
+ + "v2 int"
+ + ")";
// CQLKeyspace
schema.add(KeyspaceMetadata.create(ks_cql, KeyspaceParams.simple(1), Tables.of(
// Column Families
- CreateTableStatement.parse("CREATE TABLE table1 ("
- + "k int PRIMARY KEY,"
- + "v1 text,"
- + "v2 int"
- + ")", ks_cql)
- .build(),
+ CreateTableStatement.parse(simpleTable, ks_cql).build(),
CreateTableStatement.parse("CREATE TABLE table2 ("
+ "k text,"
@@ -237,6 +239,12 @@ public class SchemaLoader
.build()
)));
+ schema.add(KeyspaceMetadata.create(ks_cql_replicated, KeyspaceParams.simple(3),
+ Tables.of(CreateTableStatement.parse(simpleTable, ks_cql_replicated).build())));
+
+ schema.add(KeyspaceMetadata.create(ks_with_transient, KeyspaceParams.simple("3/1"),
+ Tables.of(CreateTableStatement.parse(simpleTable, ks_with_transient).build())));
+
if (DatabaseDescriptor.getPartitioner() instanceof Murmur3Partitioner)
{
schema.add(KeyspaceMetadata.create("sasi",
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2046c30a/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index adadb9c..e6b0e29 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -190,6 +190,7 @@ public abstract class CQLTester
return;
DatabaseDescriptor.daemonInitialization();
+ DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true);
// Cleanup first
try
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2046c30a/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerTest.java b/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerTest.java
index 0c43661..b7e053b 100644
--- a/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerTest.java
@@ -29,11 +29,14 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.net.IMessageSink;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
@@ -41,27 +44,41 @@ import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.ParameterType;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
+import static org.apache.cassandra.Util.token;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class ReadCommandVerbHandlerTest
{
+ private final static Random random = new Random();
+
+ private static ReadCommandVerbHandler handler;
+ private static TableMetadata metadata;
+ private static TableMetadata metadata_with_transient;
+ private static DecoratedKey KEY;
+
private static final String TEST_NAME = "read_command_vh_test_";
- private static final String KEYSPACE = TEST_NAME + "cql_keyspace";
+ private static final String KEYSPACE = TEST_NAME + "cql_keyspace_replicated";
+ private static final String KEYSPACE_WITH_TRANSIENT = TEST_NAME + "ks_with_transient";
private static final String TABLE = "table1";
- private final Random random = new Random();
- private ReadCommandVerbHandler handler;
- private TableMetadata metadata;
-
@BeforeClass
- public static void init()
+ public static void init() throws Throwable
{
SchemaLoader.loadSchema();
SchemaLoader.schemaDefinition(TEST_NAME);
+ metadata = Schema.instance.getTableMetadata(KEYSPACE, TABLE);
+ metadata_with_transient = Schema.instance.getTableMetadata(KEYSPACE_WITH_TRANSIENT, TABLE);
+ KEY = key(metadata, 1);
+
+ TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+ tmd.updateNormalToken(KEY.getToken(), InetAddressAndPort.getByName("127.0.0.2"));
+ tmd.updateNormalToken(key(metadata, 2).getToken(), InetAddressAndPort.getByName("127.0.0.3"));
+ tmd.updateNormalToken(key(metadata, 3).getToken(), FBUtilities.getBroadcastAddressAndPort());
}
@Before
@@ -81,14 +98,13 @@ public class ReadCommandVerbHandlerTest
}
});
- metadata = Schema.instance.getTableMetadata(KEYSPACE, TABLE);
handler = new ReadCommandVerbHandler();
}
@Test
public void setRepairedDataTrackingFlagIfHeaderPresent()
{
- ReadCommand command = command(key());
+ SinglePartitionReadCommand command = command(metadata);
assertFalse(command.isTrackingRepairedStatus());
Map<ParameterType, Object> params = ImmutableMap.of(ParameterType.TRACK_REPAIRED_DATA,
MessagingService.ONE_BYTE);
@@ -104,7 +120,7 @@ public class ReadCommandVerbHandlerTest
@Test
public void dontSetRepairedDataTrackingFlagUnlessHeaderPresent()
{
- ReadCommand command = command(key());
+ SinglePartitionReadCommand command = command(metadata);
assertFalse(command.isTrackingRepairedStatus());
Map<ParameterType, Object> params = ImmutableMap.of(ParameterType.TRACE_SESSION,
UUID.randomUUID());
@@ -120,7 +136,7 @@ public class ReadCommandVerbHandlerTest
@Test
public void dontSetRepairedDataTrackingFlagIfHeadersEmpty()
{
- ReadCommand command = command(key());
+ SinglePartitionReadCommand command = command(metadata);
assertFalse(command.isTrackingRepairedStatus());
handler.doVerb(MessageIn.create(peer(),
command,
@@ -131,17 +147,24 @@ public class ReadCommandVerbHandlerTest
assertFalse(command.isTrackingRepairedStatus());
}
- private int key()
+ @Test (expected = InvalidRequestException.class)
+ public void rejectsRequestWithNonMatchingTransientness()
{
- return random.nextInt();
+ SinglePartitionReadCommand command = command(metadata_with_transient);
+ handler.doVerb(MessageIn.create(peer(),
+ command,
+ ImmutableMap.of(),
+ MessagingService.Verb.READ,
+ MessagingService.current_version),
+ messageId());
}
- private int messageId()
+ private static int messageId()
{
return random.nextInt();
}
- private InetAddressAndPort peer()
+ private static InetAddressAndPort peer()
{
try
{
@@ -153,19 +176,23 @@ public class ReadCommandVerbHandlerTest
}
}
- private ReadCommand command(int key)
+ private static SinglePartitionReadCommand command(TableMetadata metadata)
{
return new SinglePartitionReadCommand(false,
- 0,
- false,
- metadata,
- FBUtilities.nowInSeconds(),
- ColumnFilter.all(metadata),
- RowFilter.NONE,
- DataLimits.NONE,
- metadata.partitioner.decorateKey(ByteBufferUtil.bytes(key)),
- new ClusteringIndexSliceFilter(Slices.ALL, false),
- null);
+ 0,
+ false,
+ metadata,
+ FBUtilities.nowInSeconds(),
+ ColumnFilter.all(metadata),
+ RowFilter.NONE,
+ DataLimits.NONE,
+ KEY,
+ new ClusteringIndexSliceFilter(Slices.ALL, false),
+ null);
}
+ private static DecoratedKey key(TableMetadata metadata, int key)
+ {
+ return metadata.partitioner.decorateKey(ByteBufferUtil.bytes(key));
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org