You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2018/09/21 13:12:06 UTC
cassandra git commit: Add a check for receiving digest response from
transient node
Repository: cassandra
Updated Branches:
refs/heads/trunk 57b87d21a -> 59de35332
Add a check for receiving digest response from transient node
Patch by Alex Petrov; reviewed by Benedict Elliot Smith for CASSANDRA-14750
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/59de3533
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/59de3533
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/59de3533
Branch: refs/heads/trunk
Commit: 59de353325768b6bb8f4dc18a1a2ace5071f8f84
Parents: 57b87d2
Author: Alex Petrov <ol...@gmail.com>
Authored: Wed Sep 12 23:20:34 2018 +0200
Committer: Alex Petrov <ol...@gmail.com>
Committed: Fri Sep 21 15:11:39 2018 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/db/PartitionRangeReadCommand.java | 7 +--
.../org/apache/cassandra/db/ReadCommand.java | 49 +++++++++++++++++++-
.../db/SinglePartitionReadCommand.java | 10 +++-
.../apache/cassandra/service/StorageProxy.java | 2 +-
.../service/reads/AbstractReadExecutor.java | 8 ++--
.../cassandra/service/reads/DigestResolver.java | 6 ---
.../service/reads/ResponseResolver.java | 4 ++
.../apache/cassandra/db/ReadCommandTest.java | 46 ++++++++++++++++++
.../apache/cassandra/locator/ReplicaUtils.java | 10 ++++
10 files changed, 125 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59de3533/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 76b7b8b..f9d2f3c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0
+ * Add a check for receiving digest response from transient node (CASSANDRA-14750)
* Fail query on transient replica if coordinator only expects full data (CASSANDRA-14704)
* Remove mentions of transient replication from repair path (CASSANDRA-14698)
* Fix handleRepairStatusChangedNotification to remove first then add (CASSANDRA-14720)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59de3533/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index 79db18a..7928039 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -21,7 +21,6 @@ import java.io.IOException;
import com.google.common.annotations.VisibleForTesting;
-import org.apache.cassandra.net.ParameterType;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.filter.*;
@@ -173,7 +172,8 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
indexMetadata());
}
- public PartitionRangeReadCommand copyAsDigestQuery()
+ @Override
+ protected PartitionRangeReadCommand copyAsDigestQuery()
{
return new PartitionRangeReadCommand(true,
digestVersion(),
@@ -187,7 +187,8 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
indexMetadata());
}
- public PartitionRangeReadCommand copyAsTransientQuery()
+ @Override
+ protected PartitionRangeReadCommand copyAsTransientQuery()
{
return new PartitionRangeReadCommand(false,
0,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59de3533/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index e146b8a..ada4ae6 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -25,6 +25,7 @@ import java.util.function.LongPredicate;
import javax.annotation.Nullable;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.hash.Hasher;
@@ -49,6 +50,8 @@ import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaCollection;
import org.apache.cassandra.metrics.TableMetrics;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.schema.IndexMetadata;
@@ -142,6 +145,9 @@ public abstract class ReadCommand extends AbstractReadQuery
IndexMetadata index)
{
super(metadata, nowInSec, columnFilter, rowFilter, limits);
+ if (acceptsTransient && isDigestQuery)
+ throw new IllegalArgumentException("Attempted to issue a digest response to transient replica");
+
this.kind = kind;
this.isDigestQuery = isDigestQuery;
this.digestVersion = digestVersion;
@@ -308,10 +314,49 @@ public abstract class ReadCommand extends AbstractReadQuery
public abstract ReadCommand copy();
/**
+ * Returns a copy of this command with acceptsTransient set to true.
+ */
+ public ReadCommand copyAsTransientQuery(Replica replica)
+ {
+ Preconditions.checkArgument(replica.isTransient(),
+ "Can't make a transient request on a full replica: " + replica);
+ return copyAsTransientQuery();
+ }
+
+ /**
+ * Returns a copy of this command with acceptsTransient set to true.
+ */
+ public ReadCommand copyAsTransientQuery(ReplicaCollection<?> replicas)
+ {
+ if (Iterables.any(replicas, Replica::isFull))
+ throw new IllegalArgumentException("Can't make a transient request on full replicas: " + replicas.filter(Replica::isFull));
+ return copyAsTransientQuery();
+ }
+
+ protected abstract ReadCommand copyAsTransientQuery();
+
+ /**
* Returns a copy of this command with isDigestQuery set to true.
*/
- public abstract ReadCommand copyAsDigestQuery();
- public abstract ReadCommand copyAsTransientQuery();
+ public ReadCommand copyAsDigestQuery(Replica replica)
+ {
+ Preconditions.checkArgument(replica.isFull(),
+ "Can't make a digest request on a transient replica " + replica);
+ return copyAsDigestQuery();
+ }
+
+ /**
+ * Returns a copy of this command with isDigestQuery set to true.
+ */
+ public ReadCommand copyAsDigestQuery(ReplicaCollection<?> replicas)
+ {
+ if (Iterables.any(replicas, Replica::isTransient))
+ throw new IllegalArgumentException("Can't make a digest request on a transient replica " + replicas.filter(Replica::isTransient));
+
+ return copyAsDigestQuery();
+ }
+
+ protected abstract ReadCommand copyAsDigestQuery();
protected abstract UnfilteredPartitionIterator queryStorage(ColumnFamilyStore cfs, ReadExecutionController executionController);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59de3533/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index e99a487..b763217 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -22,6 +22,8 @@ import java.nio.ByteBuffer;
import java.util.*;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import org.apache.cassandra.cache.IRowCacheEntry;
@@ -40,6 +42,8 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableReadsListener;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaCollection;
import org.apache.cassandra.metrics.TableMetrics;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
@@ -294,7 +298,8 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
indexMetadata());
}
- public SinglePartitionReadCommand copyAsDigestQuery()
+ @Override
+ protected SinglePartitionReadCommand copyAsDigestQuery()
{
return new SinglePartitionReadCommand(true,
digestVersion(),
@@ -309,7 +314,8 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
indexMetadata());
}
- public SinglePartitionReadCommand copyAsTransientQuery()
+ @Override
+ protected SinglePartitionReadCommand copyAsTransientQuery()
{
return new SinglePartitionReadCommand(false,
0,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59de3533/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index fc49330..0d52afa 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -2107,7 +2107,7 @@ public class StorageProxy implements StorageProxyMBean
for (Replica replica : replicaPlan.contacts())
{
Tracing.trace("Enqueuing request to {}", replica);
- PartitionRangeReadCommand command = replica.isFull() ? rangeCommand : rangeCommand.copyAsTransientQuery();
+ ReadCommand command = replica.isFull() ? rangeCommand : rangeCommand.copyAsTransientQuery(replica);
MessageOut<ReadCommand> message = command.createMessage();
if (command.isTrackingRepairedStatus() && replica.isFull())
message = message.withParameter(ParameterType.TRACK_REPAIRED_DATA, MessagingService.ONE_BYTE);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59de3533/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
index c296cba..6881a2f 100644
--- a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
@@ -119,14 +119,14 @@ public abstract class AbstractReadExecutor
protected void makeTransientDataRequests(ReplicaCollection<?> replicas)
{
- makeRequests(command.copyAsTransientQuery(), replicas);
+ makeRequests(command.copyAsTransientQuery(replicas), replicas);
}
protected void makeDigestRequests(ReplicaCollection<?> replicas)
{
assert all(replicas, Replica::isFull);
// only send digest requests to full replicas, send data requests instead to the transient replicas
- makeRequests(command.copyAsDigestQuery(), replicas);
+ makeRequests(command.copyAsDigestQuery(replicas), replicas);
}
private void makeRequests(ReadCommand readCommand, ReplicaCollection<?> replicas)
@@ -284,8 +284,8 @@ public abstract class AbstractReadExecutor
assert extraReplica != null;
retryCommand = extraReplica.isTransient()
- ? command.copyAsTransientQuery()
- : command.copyAsDigestQuery();
+ ? command.copyAsTransientQuery(extraReplica)
+ : command.copyAsDigestQuery(extraReplica);
}
else
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59de3533/src/java/org/apache/cassandra/service/reads/DigestResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/DigestResolver.java b/src/java/org/apache/cassandra/service/reads/DigestResolver.java
index 0dcae95..899baf9 100644
--- a/src/java/org/apache/cassandra/service/reads/DigestResolver.java
+++ b/src/java/org/apache/cassandra/service/reads/DigestResolver.java
@@ -57,13 +57,7 @@ public class DigestResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRea
super.preprocess(message);
Replica replica = replicaPlan().getReplicaFor(message.from);
if (dataResponse == null && !message.payload.isDigestResponse() && replica.isFull())
- {
dataResponse = message;
- }
- else if (replica.isTransient() && message.payload.isDigestResponse())
- {
- throw new IllegalStateException("digest response received from transient replica");
- }
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59de3533/src/java/org/apache/cassandra/service/reads/ResponseResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/ResponseResolver.java b/src/java/org/apache/cassandra/service/reads/ResponseResolver.java
index 0c1e1ba..aaead84 100644
--- a/src/java/org/apache/cassandra/service/reads/ResponseResolver.java
+++ b/src/java/org/apache/cassandra/service/reads/ResponseResolver.java
@@ -55,6 +55,10 @@ public abstract class ResponseResolver<E extends Endpoints<E>, P extends Replica
public void preprocess(MessageIn<ReadResponse> message)
{
+ if (replicaPlan().getReplicaFor(message.from).isTransient() &&
+ message.payload.isDigestResponse())
+ throw new IllegalArgumentException("Digest response received from transient replica");
+
try
{
responses.add(message);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59de3533/test/unit/org/apache/cassandra/db/ReadCommandTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ReadCommandTest.java b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
index 8df7651..fba2bf2 100644
--- a/test/unit/org/apache/cassandra/db/ReadCommandTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
@@ -52,7 +52,9 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus;
+import org.apache.cassandra.locator.EndpointsForToken;
import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.ReplicaUtils;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.repair.consistent.LocalSessionAccessor;
@@ -681,6 +683,50 @@ public class ReadCommandTest
assertEquals(cacheHits, cfs.metric.rowCacheHit.getCount());
}
+ @Test (expected = IllegalArgumentException.class)
+ public void copyFullAsTransientTest()
+ {
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF6);
+ ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).build();
+ readCommand.copyAsTransientQuery(ReplicaUtils.full(FBUtilities.getBroadcastAddressAndPort()));
+ }
+
+ @Test (expected = IllegalArgumentException.class)
+ public void copyTransientAsDigestQuery()
+ {
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF6);
+ ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).build();
+ readCommand.copyAsDigestQuery(ReplicaUtils.trans(FBUtilities.getBroadcastAddressAndPort()));
+ }
+
+ @Test (expected = IllegalArgumentException.class)
+ public void copyMultipleFullAsTransientTest()
+ {
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF6);
+ DecoratedKey key = Util.dk("key");
+ Token token = key.getToken();
+ // Address is unimportant for this test
+ InetAddressAndPort addr = FBUtilities.getBroadcastAddressAndPort();
+ ReadCommand readCommand = Util.cmd(cfs, key).build();
+ readCommand.copyAsTransientQuery(EndpointsForToken.of(token,
+ ReplicaUtils.trans(addr, token),
+ ReplicaUtils.full(addr, token)));
+ }
+
+ @Test (expected = IllegalArgumentException.class)
+ public void copyMultipleTransientAsDigestQuery()
+ {
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF6);
+ DecoratedKey key = Util.dk("key");
+ Token token = key.getToken();
+ // Address is unimportant for this test
+ InetAddressAndPort addr = FBUtilities.getBroadcastAddressAndPort();
+ ReadCommand readCommand = Util.cmd(cfs, key).build();
+ readCommand.copyAsDigestQuery(EndpointsForToken.of(token,
+ ReplicaUtils.trans(addr, token),
+ ReplicaUtils.full(addr, token)));
+ }
+
private void testRepairedDataTracking(ColumnFamilyStore cfs, ReadCommand readCommand) throws IOException
{
cfs.truncateBlocking();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59de3533/test/unit/org/apache/cassandra/locator/ReplicaUtils.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/ReplicaUtils.java b/test/unit/org/apache/cassandra/locator/ReplicaUtils.java
index 66f538f..c5350dc 100644
--- a/test/unit/org/apache/cassandra/locator/ReplicaUtils.java
+++ b/test/unit/org/apache/cassandra/locator/ReplicaUtils.java
@@ -41,4 +41,14 @@ public class ReplicaUtils
{
return transientReplica(endpoint, FULL_RANGE);
}
+
+ public static Replica full(InetAddressAndPort endpoint, Token token)
+ {
+ return fullReplica(endpoint, new Range<>(token, token));
+ }
+
+ public static Replica trans(InetAddressAndPort endpoint, Token token)
+ {
+ return transientReplica(endpoint, new Range<>(token, token));
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org