You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by dc...@apache.org on 2021/08/26 20:41:43 UTC
[cassandra] branch trunk updated: Add client warnings and abort to
tombstone and coordinator reads which go past a low/high watermark
This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4ec4ab9 Add client warnings and abort to tombstone and coordinator reads which go past a low/high watermark
4ec4ab9 is described below
commit 4ec4ab992f8adc0a60055a60525e9d11a28bc2ae
Author: David Capwell <dc...@apache.org>
AuthorDate: Thu Aug 26 13:06:00 2021 -0700
Add client warnings and abort to tombstone and coordinator reads which go past a low/high watermark
patch by David Capwell; reviewed by Blake Eggleston, Marcus Eriksson for CASSANDRA-16850
---
CHANGES.txt | 1 +
NEWS.txt | 7 +
conf/cassandra.yaml | 14 +
src/java/org/apache/cassandra/config/Config.java | 5 +
.../cassandra/config/DatabaseDescriptor.java | 33 +++
.../org/apache/cassandra/cql3/QueryOptions.java | 106 +++++++
.../cassandra/cql3/selection/ResultSetBuilder.java | 31 +-
.../cassandra/cql3/statements/SelectStatement.java | 184 ++++++++++++
src/java/org/apache/cassandra/db/DataRange.java | 2 +-
.../org/apache/cassandra/db/MessageParams.java | 72 +++++
.../cassandra/db/PartitionRangeReadCommand.java | 56 ++--
src/java/org/apache/cassandra/db/ReadCommand.java | 42 ++-
.../cassandra/db/ReadCommandVerbHandler.java | 20 ++
src/java/org/apache/cassandra/db/ReadQuery.java | 4 +
.../RejectException.java} | 32 ++-
.../cassandra/db/SinglePartitionReadCommand.java | 45 ++-
.../cassandra/db/SinglePartitionReadQuery.java | 6 +
.../db/filter/TombstoneOverwhelmingException.java | 2 +-
...ilureException.java => ReadAbortException.java} | 15 +-
.../cassandra/exceptions/ReadFailureException.java | 6 +
...eException.java => ReadSizeAbortException.java} | 12 +-
.../exceptions/RequestFailureException.java | 21 +-
.../cassandra/exceptions/RequestFailureReason.java | 5 +-
...Exception.java => TombstoneAbortException.java} | 17 +-
.../cassandra/metrics/ClientRequestMetrics.java | 27 ++
.../apache/cassandra/metrics/KeyspaceMetrics.java | 12 +
.../org/apache/cassandra/metrics/TableMetrics.java | 11 +
src/java/org/apache/cassandra/net/Message.java | 48 ++++
src/java/org/apache/cassandra/net/MessageFlag.java | 4 +-
src/java/org/apache/cassandra/net/ParamType.java | 7 +-
.../org/apache/cassandra/service/StorageProxy.java | 41 ++-
.../apache/cassandra/service/StorageService.java | 39 +++
.../cassandra/service/StorageServiceMBean.java | 7 +
.../cassandra/service/reads/ReadCallback.java | 114 +++++++-
.../service/reads/range/RangeCommandIterator.java | 6 +
.../Int32Serializer.java} | 30 +-
test/conf/cassandra.yaml | 3 +
.../test/ClientReadSizeWarningTest.java | 266 +++++++++++++++++
.../test/ClientTombstoneWarningTest.java | 314 +++++++++++++++++++++
.../distributed/test/JavaDriverUtils.java | 50 ++++
.../cassandra/db/ReadCommandVerbHandlerTest.java | 3 +-
.../org/apache/cassandra/db/ReadResponseTest.java | 3 +-
.../cassandra/service/reads/ReadExecutorTest.java | 2 +-
.../reads/repair/RepairedDataVerifierTest.java | 3 +-
44 files changed, 1623 insertions(+), 105 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 9ce0c20..9828159 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.1
+ * Add client warnings and abort to tombstone and coordinator reads which go past a low/high watermark (CASSANDRA-16850)
* Add TTL support to nodetool snapshots (CASSANDRA-16789)
* Allow CommitLogSegmentReader to optionally skip sync marker CRC checks (CASSANDRA-16842)
* allow blocking IPs from updating metrics about traffic (CASSANDRA-16859)
diff --git a/NEWS.txt b/NEWS.txt
index 6b39b21..443a5c4 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -38,6 +38,13 @@ using the provided 'sstableupgrade' tool.
New features
------------
+ - Warn/abort thresholds added to read queries notifying clients when these thresholds trigger (by
+ emitting a client warning or aborting the query). This feature is disabled by default, scheduled
+ to be enabled in 4.2; it is controlled with the configuration client_track_warnings_enabled,
+ setting to true will enable this feature. Each check has its own warn/abort thresholds, currently
+ tombstones (tombstone_warn_threshold, and tombstone_failure_threshold) and coordinator result set
+ materialized size (client_large_read_warn_threshold_kb, and client_large_read_abort_threshold_kb)
+ are supported; more checks will be added over time.
Upgrading
---------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index bf8c358..ff073da 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1453,3 +1453,17 @@ enable_drop_compact_storage: false
# subnets:
# - 127.0.0.1
# - 127.0.0.0/31
+
+# Enables tracking warnings/aborts across all replicas for reporting back to client.
+# Scheduled to enable in 4.2
+# See: CASSANDRA-16850
+# See: tombstone_warn_threshold, tombstone_failure_threshold, client_large_read_warn_threshold_kb, and client_large_read_abort_threshold_kb
+#client_track_warnings_enabled: false
+
+# When client_track_warnings_enabled: true, this tracks the materialized size of a query on the
+# coordinator. If client_large_read_warn_threshold_kb is greater than 0, this will emit a warning
+# to clients with details on what query triggered this as well as the size of the result set; if
+# client_large_read_abort_threshold_kb is greater than 0, this will abort the query after it
+# has exceeded this threshold, returning a read error to the user.
+#client_large_read_warn_threshold_kb: 0
+#client_large_read_abort_threshold_kb: 0
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index bd2177d..731b466 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -349,11 +349,16 @@ public class Config
public volatile int tombstone_warn_threshold = 1000;
public volatile int tombstone_failure_threshold = 100000;
+ public volatile long client_large_read_warn_threshold_kb = 0;
+ public volatile long client_large_read_abort_threshold_kb = 0;
+
public final ReplicaFilteringProtectionOptions replica_filtering_protection = new ReplicaFilteringProtectionOptions();
public volatile Long index_summary_capacity_in_mb;
public volatile int index_summary_resize_interval_in_minutes = 60;
+ public volatile boolean client_track_warnings_enabled = false; // should set to true in 4.2
+
public int gc_log_threshold_in_ms = 200;
public int gc_warn_threshold_in_ms = 1000;
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 19e79d7..7e80485 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -857,6 +857,9 @@ public class DatabaseDescriptor
throw new ConfigurationException("To set concurrent_validations > concurrent_compactors, " +
"set the system property cassandra.allow_unlimited_concurrent_validations=true");
}
+
+ conf.client_large_read_warn_threshold_kb = Math.max(conf.client_large_read_warn_threshold_kb, 0);
+ conf.client_large_read_abort_threshold_kb = Math.max(conf.client_large_read_abort_threshold_kb, 0);
}
@VisibleForTesting
@@ -3443,4 +3446,34 @@ public class DatabaseDescriptor
{
return conf.internode_error_reporting_exclusions;
}
+
+ public static long getClientLargeReadWarnThresholdKB()
+ {
+ return conf.client_large_read_warn_threshold_kb;
+ }
+
+ public static void setClientLargeReadWarnThresholdKB(long threshold)
+ {
+ conf.client_large_read_warn_threshold_kb = Math.max(threshold, 0);
+ }
+
+ public static long getClientLargeReadAbortThresholdKB()
+ {
+ return conf.client_large_read_abort_threshold_kb;
+ }
+
+ public static void setClientLargeReadAbortThresholdKB(long threshold)
+ {
+ conf.client_large_read_abort_threshold_kb = Math.max(threshold, 0);
+ }
+
+ public static boolean getClientTrackWarningsEnabled()
+ {
+ return conf.client_track_warnings_enabled;
+ }
+
+ public static void setClientTrackWarningsEnabled(boolean value)
+ {
+ conf.client_track_warnings_enabled = value;
+ }
}
diff --git a/src/java/org/apache/cassandra/cql3/QueryOptions.java b/src/java/org/apache/cassandra/cql3/QueryOptions.java
index d3b1a03..e46c458 100644
--- a/src/java/org/apache/cassandra/cql3/QueryOptions.java
+++ b/src/java/org/apache/cassandra/cql3/QueryOptions.java
@@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableList;
import io.netty.buffer.ByteBuf;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.marshal.UTF8Type;
@@ -216,11 +217,103 @@ public abstract class QueryOptions
// Mainly for the sake of BatchQueryOptions
abstract SpecificOptions getSpecificOptions();
+ abstract TrackWarnings getTrackWarnings();
+
+ public boolean isClientTrackWarningsEnabled()
+ {
+ return getTrackWarnings().isEnabled();
+ }
+
+ public long getClientLargeReadWarnThresholdKb()
+ {
+ return getTrackWarnings().getClientLargeReadWarnThresholdKb();
+ }
+
+ public long getClientLargeReadAbortThresholdKB()
+ {
+ return getTrackWarnings().getClientLargeReadAbortThresholdKB();
+ }
+
public QueryOptions prepare(List<ColumnSpecification> specs)
{
return this;
}
+ interface TrackWarnings
+ {
+ boolean isEnabled();
+
+ long getClientLargeReadWarnThresholdKb();
+
+ long getClientLargeReadAbortThresholdKB();
+
+ static TrackWarnings create()
+ {
+ // if daemon initialization hasn't happened yet (very common in tests) then ignore
+ if (!DatabaseDescriptor.isDaemonInitialized())
+ return DisabledTrackWarnings.INSTANCE;
+ boolean enabled = DatabaseDescriptor.getClientTrackWarningsEnabled();
+ if (!enabled)
+ return DisabledTrackWarnings.INSTANCE;
+ long clientLargeReadWarnThresholdKb = DatabaseDescriptor.getClientLargeReadWarnThresholdKB();
+ long clientLargeReadAbortThresholdKB = DatabaseDescriptor.getClientLargeReadAbortThresholdKB();
+ return new DefaultTrackWarnings(clientLargeReadWarnThresholdKb, clientLargeReadAbortThresholdKB);
+ }
+ }
+
+ private enum DisabledTrackWarnings implements TrackWarnings
+ {
+ INSTANCE;
+
+ @Override
+ public boolean isEnabled()
+ {
+ return false;
+ }
+
+ @Override
+ public long getClientLargeReadWarnThresholdKb()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getClientLargeReadAbortThresholdKB()
+ {
+ return 0;
+ }
+ }
+
+ private static class DefaultTrackWarnings implements TrackWarnings
+ {
+ private final long clientLargeReadWarnThresholdKb;
+ private final long clientLargeReadAbortThresholdKB;
+
+ public DefaultTrackWarnings(long clientLargeReadWarnThresholdKb, long clientLargeReadAbortThresholdKB)
+ {
+ this.clientLargeReadWarnThresholdKb = clientLargeReadWarnThresholdKb;
+ this.clientLargeReadAbortThresholdKB = clientLargeReadAbortThresholdKB;
+ }
+
+ @Override
+ public boolean isEnabled()
+ {
+ return true;
+ }
+
+ @Override
+ public long getClientLargeReadWarnThresholdKb()
+ {
+ return clientLargeReadWarnThresholdKb;
+ }
+
+ @Override
+ public long getClientLargeReadAbortThresholdKB()
+ {
+ return clientLargeReadAbortThresholdKB;
+ }
+ }
+
static class DefaultQueryOptions extends QueryOptions
{
private final ConsistencyLevel consistency;
@@ -230,6 +323,7 @@ public abstract class QueryOptions
private final SpecificOptions options;
private final transient ProtocolVersion protocolVersion;
+ private final transient TrackWarnings trackWarnings = TrackWarnings.create();
DefaultQueryOptions(ConsistencyLevel consistency, List<ByteBuffer> values, boolean skipMetadata, SpecificOptions options, ProtocolVersion protocolVersion)
{
@@ -264,6 +358,12 @@ public abstract class QueryOptions
{
return options;
}
+
+ @Override
+ TrackWarnings getTrackWarnings()
+ {
+ return trackWarnings;
+ }
}
static class QueryOptionsWrapper extends QueryOptions
@@ -301,6 +401,12 @@ public abstract class QueryOptions
}
@Override
+ TrackWarnings getTrackWarnings()
+ {
+ return wrapped.getTrackWarnings();
+ }
+
+ @Override
public QueryOptions prepare(List<ColumnSpecification> specs)
{
wrapped.prepare(specs);
diff --git a/src/java/org/apache/cassandra/cql3/selection/ResultSetBuilder.java b/src/java/org/apache/cassandra/cql3/selection/ResultSetBuilder.java
index 84e1e84..852872a 100644
--- a/src/java/org/apache/cassandra/cql3/selection/ResultSetBuilder.java
+++ b/src/java/org/apache/cassandra/cql3/selection/ResultSetBuilder.java
@@ -59,6 +59,9 @@ public final class ResultSetBuilder
final long[] timestamps;
final int[] ttls;
+ private long size = 0;
+ private boolean sizeWarningEmitted = false;
+
public ResultSetBuilder(ResultMetadata metadata, Selectors selectors)
{
this(metadata, selectors, null);
@@ -79,6 +82,30 @@ public final class ResultSetBuilder
Arrays.fill(ttls, -1);
}
+ private void addSize(List<ByteBuffer> row)
+ {
+ for (int i=0, isize=row.size(); i<isize; i++)
+ {
+ ByteBuffer value = row.get(i);
+ size += value != null ? value.remaining() : 0;
+ }
+ }
+
+ public boolean shouldWarn(long thresholdKB)
+ {
+ if (thresholdKB > 0 && !sizeWarningEmitted && size > thresholdKB << 10)
+ {
+ sizeWarningEmitted = true;
+ return true;
+ }
+ return false;
+ }
+
+ public boolean shouldReject(long thresholdKB)
+ {
+ return thresholdKB > 0 && size > thresholdKB << 10;
+ }
+
public void add(ByteBuffer v)
{
current.add(v);
@@ -166,6 +193,8 @@ public final class ResultSetBuilder
private List<ByteBuffer> getOutputRow()
{
- return selectors.getOutputRow();
+ List<ByteBuffer> row = selectors.getOutputRow();
+ addSize(row);
+ return row;
}
}
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 774bd68..5c7ac29 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -22,6 +22,8 @@ import java.util.*;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -29,6 +31,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.audit.AuditLogContext;
import org.apache.cassandra.audit.AuditLogEntryType;
import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableMetadata;
@@ -61,12 +64,15 @@ import org.apache.cassandra.serializers.MarshalException;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.ClientWarn;
import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.pager.AggregationQueryPager;
import org.apache.cassandra.service.pager.PagingState;
import org.apache.cassandra.service.pager.QueryPager;
import org.apache.cassandra.transport.ProtocolVersion;
import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
@@ -242,6 +248,9 @@ public class SelectStatement implements CQLStatement
Selectors selectors = selection.newSelectors(options);
ReadQuery query = getQuery(options, selectors.getColumnFilter(), nowInSec, userLimit, userPerPartitionLimit, pageSize);
+ if (options.isClientTrackWarningsEnabled())
+ query.trackWarnings();
+
if (aggregationSpec == null && (pageSize <= 0 || (query.limits().count() <= pageSize)))
return execute(query, options, state, selectors, nowInSec, userLimit, queryStartNanoTime);
@@ -783,6 +792,7 @@ public class SelectStatement implements CQLStatement
}
ResultSet cqlRows = result.build();
+ maybeWarn(result, options);
orderResults(cqlRows);
@@ -804,10 +814,49 @@ public class SelectStatement implements CQLStatement
}
}
+ private void maybeWarn(ResultSetBuilder result, QueryOptions options)
+ {
+ if (!options.isClientTrackWarningsEnabled())
+ return;
+ if (result.shouldWarn(options.getClientLargeReadWarnThresholdKb()))
+ {
+ String msg = String.format("Read on table %s has exceeded the size warning threshold of %,d kb", table, options.getClientLargeReadWarnThresholdKb());
+ ClientWarn.instance.warn(msg + " with " + loggableTokens(options));
+ logger.warn("{} with query {}", msg, asCQL(options));
+ cfs().metric.clientReadSizeWarnings.mark();
+ }
+ }
+
+ private void maybeFail(ResultSetBuilder result, QueryOptions options)
+ {
+ if (!options.isClientTrackWarningsEnabled())
+ return;
+ if (result.shouldReject(options.getClientLargeReadAbortThresholdKB()))
+ {
+ String msg = String.format("Read on table %s has exceeded the size failure threshold of %,d kb", table, options.getClientLargeReadAbortThresholdKB());
+ String clientMsg = msg + " with " + loggableTokens(options);
+ ClientWarn.instance.warn(clientMsg);
+ logger.warn("{} with query {}", msg, asCQL(options));
+ cfs().metric.clientReadSizeAborts.mark();
+ // read errors require blockFor and recieved (its in the protocol message), but this isn't known;
+ // to work around this, treat the coordinator as the only response we care about and mark it failed
+ ReadSizeAbortException exception = new ReadSizeAbortException(clientMsg, options.getConsistency(), 0, 1, true,
+ ImmutableMap.of(FBUtilities.getBroadcastAddressAndPort(), RequestFailureReason.READ_TOO_LARGE));
+ StorageProxy.recordReadRegularAbort(options.getConsistency(), exception);
+ throw exception;
+ }
+ }
+
+ private ColumnFamilyStore cfs()
+ {
+ return Schema.instance.getColumnFamilyStoreInstance(table.id);
+ }
+
// Used by ModificationStatement for CAS operations
void processPartition(RowIterator partition, QueryOptions options, ResultSetBuilder result, int nowInSec)
throws InvalidRequestException
{
+ maybeFail(result, options);
ProtocolVersion protocolVersion = options.getProtocolVersion();
ByteBuffer[] keyComponents = getComponents(table, partition.partitionKey());
@@ -819,6 +868,7 @@ public class SelectStatement implements CQLStatement
if (!staticRow.isEmpty() && restrictions.returnStaticContentOnPartitionWithNoRows())
{
result.newRow(partition.partitionKey(), staticRow.clustering());
+ maybeFail(result, options);
for (ColumnMetadata def : selection.getColumns())
{
switch (def.kind)
@@ -841,6 +891,13 @@ public class SelectStatement implements CQLStatement
{
Row row = partition.next();
result.newRow( partition.partitionKey(), row.clustering());
+
+ // reads aren't failed as soon the size exceeds the failure threshold, they're failed once the failure
+ // threshold has been exceeded and we start adding more data. We're slightly more permissive to avoid
+ // cases where a row can never be read. Since we only warn/fail after entire rows are read, this will
+ // still allow the entire dataset to be read with LIMIT 1 queries, even if every row is oversized
+ maybeFail(result, options);
+
// Respect selection order
for (ColumnMetadata def : selection.getColumns())
{
@@ -1358,4 +1415,131 @@ public class SelectStatement implements CQLStatement
{
return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
}
+
+ private String loggableTokens(QueryOptions options)
+ {
+ if (restrictions.isKeyRange() || restrictions.usesSecondaryIndexing())
+ {
+ AbstractBounds<PartitionPosition> bounds = restrictions.getPartitionKeyBounds(options);
+ return "token range: " + (bounds.inclusiveLeft() ? '[' : '(') +
+ bounds.left.getToken().toString() + ", " +
+ bounds.right.getToken().toString() +
+ (bounds.inclusiveRight() ? ']' : ')');
+ }
+ else
+ {
+ Collection<ByteBuffer> keys = restrictions.getPartitionKeys(options);
+ if (keys.size() == 1)
+ {
+ return "token: " + table.partitioner.getToken(Iterables.getOnlyElement(keys)).toString();
+ }
+ else
+ {
+ StringBuilder sb = new StringBuilder("tokens: [");
+ boolean isFirst = true;
+ for (ByteBuffer key : keys)
+ {
+ if (!isFirst) sb.append(", ");
+ sb.append(table.partitioner.getToken(key).toString());
+ isFirst = false;
+ }
+ return sb.append(']').toString();
+ }
+ }
+ }
+
+ private String asCQL(QueryOptions options)
+ {
+ ColumnFilter columnFilter = selection.newSelectors(options).getColumnFilter();
+ StringBuilder sb = new StringBuilder();
+
+ sb.append("SELECT ").append(queriedColumns().toCQLString());
+ sb.append(" FROM ").append(table.keyspace).append('.').append(table.name);
+ if (restrictions.isKeyRange() || restrictions.usesSecondaryIndexing())
+ {
+ // partition range
+ ClusteringIndexFilter clusteringIndexFilter = makeClusteringIndexFilter(options, columnFilter);
+ if (clusteringIndexFilter == null)
+ return "EMPTY";
+
+ RowFilter rowFilter = getRowFilter(options);
+
+ // The LIMIT provided by the user is the number of CQL row he wants returned.
+ // We want to have getRangeSlice to count the number of columns, not the number of keys.
+ AbstractBounds<PartitionPosition> keyBounds = restrictions.getPartitionKeyBounds(options);
+ if (keyBounds == null)
+ return "EMPTY";
+
+ DataRange dataRange = new DataRange(keyBounds, clusteringIndexFilter);
+
+ if (!dataRange.isUnrestricted(table) || !rowFilter.isEmpty())
+ {
+ sb.append(" WHERE ");
+ // We put the row filter first because the data range can end by "ORDER BY"
+ if (!rowFilter.isEmpty())
+ {
+ sb.append(rowFilter);
+ if (!dataRange.isUnrestricted(table))
+ sb.append(" AND ");
+ }
+ if (!dataRange.isUnrestricted(table))
+ sb.append(dataRange.toCQLString(table, rowFilter));
+ }
+ }
+ else
+ {
+ // single partition
+ Collection<ByteBuffer> keys = restrictions.getPartitionKeys(options);
+ if (keys.isEmpty())
+ return "EMPTY";
+ ClusteringIndexFilter filter = makeClusteringIndexFilter(options, columnFilter);
+ if (filter == null)
+ return "EMPTY";
+
+ sb.append(" WHERE ");
+
+
+ boolean compoundPk = table.partitionKeyColumns().size() > 1;
+ if (compoundPk) sb.append('(');
+ sb.append(ColumnMetadata.toCQLString(table.partitionKeyColumns()));
+ if (compoundPk) sb.append(')');
+ if (keys.size() == 1)
+ {
+ sb.append(" = ");
+ if (compoundPk) sb.append('(');
+ DataRange.appendKeyString(sb, table.partitionKeyType, Iterables.getOnlyElement(keys));
+ if (compoundPk) sb.append(')');
+ }
+ else
+ {
+ sb.append(" IN (");
+ boolean first = true;
+ for (ByteBuffer key : keys)
+ {
+ if (!first)
+ sb.append(", ");
+
+ if (compoundPk) sb.append('(');
+ DataRange.appendKeyString(sb, table.partitionKeyType, key);
+ if (compoundPk) sb.append(')');
+ first = false;
+ }
+
+ sb.append(')');
+ }
+
+ RowFilter rowFilter = getRowFilter(options);
+ if (!rowFilter.isEmpty())
+ sb.append(" AND ").append(rowFilter);
+
+ String filterString = filter.toCQLString(table, rowFilter);
+ if (!filterString.isEmpty())
+ sb.append(" AND ").append(filterString);
+ }
+
+ DataLimits limits = getDataLimits(getLimit(options), getPerPartitionLimit(options), options.getPageSize());
+ if (limits != DataLimits.NONE)
+ sb.append(' ').append(limits);
+ return sb.toString();
+ }
}
diff --git a/src/java/org/apache/cassandra/db/DataRange.java b/src/java/org/apache/cassandra/db/DataRange.java
index b322912..52162be 100644
--- a/src/java/org/apache/cassandra/db/DataRange.java
+++ b/src/java/org/apache/cassandra/db/DataRange.java
@@ -312,7 +312,7 @@ public class DataRange
: (isInclusive ? "<=" : "<");
}
- private static void appendKeyString(StringBuilder sb, AbstractType<?> type, ByteBuffer key)
+ public static void appendKeyString(StringBuilder sb, AbstractType<?> type, ByteBuffer key)
{
if (type instanceof CompositeType)
{
diff --git a/src/java/org/apache/cassandra/db/MessageParams.java b/src/java/org/apache/cassandra/db/MessageParams.java
new file mode 100644
index 0000000..137d3a6
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/MessageParams.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.util.EnumMap;
+import java.util.Map;
+
+import io.netty.util.concurrent.FastThreadLocal;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.ParamType;
+
+public class MessageParams
+{
+ private static final FastThreadLocal<Map<ParamType, Object>> local = new FastThreadLocal<>();
+
+ private MessageParams()
+ {
+ }
+
+ private static Map<ParamType, Object> get()
+ {
+ Map<ParamType, Object> instance = local.get();
+ if (instance == null)
+ {
+ instance = new EnumMap<>(ParamType.class);
+ local.set(instance);
+ }
+
+ return instance;
+ }
+
+ public static void add(ParamType key, Object value)
+ {
+ get().put(key, value);
+ }
+
+ public static <T> T get(ParamType key)
+ {
+ return (T) get().get(key);
+ }
+
+ public static void remove(ParamType key)
+ {
+ get().remove(key);
+ }
+
+ public static void reset()
+ {
+ get().clear();
+ }
+
+ public static <T> Message<T> addToMessage(Message<T> message)
+ {
+ return message.withParams(get());
+ }
+}
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index 45cd308..917f0f0 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -55,17 +55,18 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
private final DataRange dataRange;
private PartitionRangeReadCommand(boolean isDigest,
- int digestVersion,
- boolean acceptsTransient,
- TableMetadata metadata,
- int nowInSec,
- ColumnFilter columnFilter,
- RowFilter rowFilter,
- DataLimits limits,
- DataRange dataRange,
- IndexMetadata index)
+ int digestVersion,
+ boolean acceptsTransient,
+ TableMetadata metadata,
+ int nowInSec,
+ ColumnFilter columnFilter,
+ RowFilter rowFilter,
+ DataLimits limits,
+ DataRange dataRange,
+ IndexMetadata index,
+ boolean trackWarnings)
{
- super(Kind.PARTITION_RANGE, isDigest, digestVersion, acceptsTransient, metadata, nowInSec, columnFilter, rowFilter, limits, index);
+ super(Kind.PARTITION_RANGE, isDigest, digestVersion, acceptsTransient, metadata, nowInSec, columnFilter, rowFilter, limits, index, trackWarnings);
this.dataRange = dataRange;
}
@@ -85,7 +86,8 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
rowFilter,
limits,
dataRange,
- findIndex(metadata, rowFilter));
+ findIndex(metadata, rowFilter),
+ false);
}
/**
@@ -107,7 +109,8 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
RowFilter.NONE,
DataLimits.NONE,
DataRange.allData(metadata.partitioner),
- null);
+ null,
+ false);
}
public DataRange dataRange()
@@ -156,7 +159,8 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
rowFilter(),
isRangeContinuation ? limits() : limits().withoutState(),
dataRange().forSubRange(range),
- indexMetadata());
+ indexMetadata(),
+ isTrackingWarnings());
}
public PartitionRangeReadCommand copy()
@@ -170,7 +174,8 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
rowFilter(),
limits(),
dataRange(),
- indexMetadata());
+ indexMetadata(),
+ isTrackingWarnings());
}
@Override
@@ -185,7 +190,8 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
rowFilter(),
limits(),
dataRange(),
- indexMetadata());
+ indexMetadata(),
+ isTrackingWarnings());
}
@Override
@@ -200,7 +206,8 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
rowFilter(),
limits(),
dataRange(),
- indexMetadata());
+ indexMetadata(),
+ isTrackingWarnings());
}
@Override
@@ -215,7 +222,8 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
rowFilter(),
newLimits,
dataRange(),
- indexMetadata());
+ indexMetadata(),
+ isTrackingWarnings());
}
@Override
@@ -230,7 +238,8 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
rowFilter(),
newLimits,
newDataRange,
- indexMetadata());
+ indexMetadata(),
+ isTrackingWarnings());
}
public long getTimeout(TimeUnit unit)
@@ -363,6 +372,15 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
sb.append(" WHERE ").append(filterString);
}
+ @Override
+ public String loggableTokens()
+ {
+ return "token range: " + (dataRange.keyRange.inclusiveLeft() ? '[' : '(') +
+ dataRange.keyRange.left.getToken().toString() + ", " +
+ dataRange.keyRange.right.getToken().toString() +
+ (dataRange.keyRange.inclusiveRight() ? ']' : ')');
+ }
+
/**
* Allow to post-process the result of the query after it has been reconciled on the coordinator
* but before it is passed to the CQL layer to return the ResultSet.
@@ -431,7 +449,7 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
throws IOException
{
DataRange range = DataRange.serializer.deserialize(in, version, metadata);
- return new PartitionRangeReadCommand(isDigest, digestVersion, acceptsTransient, metadata, nowInSec, columnFilter, rowFilter, limits, range, index);
+ return new PartitionRangeReadCommand(isDigest, digestVersion, acceptsTransient, metadata, nowInSec, columnFilter, rowFilter, limits, range, index, false);
}
}
}
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index 71bce0b..42206e1 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -38,6 +38,7 @@ import org.apache.cassandra.config.*;
import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.net.MessageFlag;
+import org.apache.cassandra.net.ParamType;
import org.apache.cassandra.net.Verb;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.rows.*;
@@ -99,6 +100,8 @@ public abstract class ReadCommand extends AbstractReadQuery
int oldestUnrepairedTombstone = Integer.MAX_VALUE;
+ private boolean trackWarnings;
+
@Nullable
private final IndexMetadata index;
@@ -139,7 +142,8 @@ public abstract class ReadCommand extends AbstractReadQuery
ColumnFilter columnFilter,
RowFilter rowFilter,
DataLimits limits,
- IndexMetadata index)
+ IndexMetadata index,
+ boolean trackWarnings)
{
super(metadata, nowInSec, columnFilter, rowFilter, limits);
if (acceptsTransient && isDigestQuery)
@@ -150,6 +154,7 @@ public abstract class ReadCommand extends AbstractReadQuery
this.digestVersion = digestVersion;
this.acceptsTransient = acceptsTransient;
this.index = index;
+ this.trackWarnings = trackWarnings;
}
protected abstract void serializeSelection(DataOutputPlus out, int version) throws IOException;
@@ -281,6 +286,17 @@ public abstract class ReadCommand extends AbstractReadQuery
return repairedDataInfo.isConclusive();
}
+ @Override
+ public void trackWarnings()
+ {
+ trackWarnings = true;
+ }
+
+ public boolean isTrackingWarnings()
+ {
+ return trackWarnings;
+ }
+
/**
* Index (metadata) chosen for this query. Can be null.
*
@@ -588,6 +604,11 @@ public abstract class ReadCommand extends AbstractReadQuery
String query = ReadCommand.this.toCQLString();
Tracing.trace("Scanned over {} tombstones for query {}; query aborted (see tombstone_failure_threshold)", failureThreshold, query);
metric.tombstoneFailures.inc();
+ if (trackWarnings)
+ {
+ MessageParams.remove(ParamType.TOMBSTONE_WARNING);
+ MessageParams.add(ParamType.TOMBSTONE_ABORT, tombstones);
+ }
throw new TombstoneOverwhelmingException(tombstones, query, ReadCommand.this.metadata(), currentKey, clustering);
}
}
@@ -606,7 +627,10 @@ public abstract class ReadCommand extends AbstractReadQuery
String msg = String.format(
"Read %d live rows and %d tombstone cells for query %1.512s; token %s (see tombstone_warn_threshold)",
liveRows, tombstones, ReadCommand.this.toCQLString(), currentKey.getToken());
- ClientWarn.instance.warn(msg);
+ if (trackWarnings)
+ MessageParams.add(ParamType.TOMBSTONE_WARNING, tombstones);
+ else
+ ClientWarn.instance.warn(msg);
if (tombstones < failureThreshold)
{
metric.tombstoneWarnings.inc();
@@ -686,9 +710,12 @@ public abstract class ReadCommand extends AbstractReadQuery
*/
public Message<ReadCommand> createMessage(boolean trackRepairedData)
{
- return trackRepairedData
- ? Message.outWithFlags(verb(), this, MessageFlag.CALL_BACK_ON_FAILURE, MessageFlag.TRACK_REPAIRED_DATA)
- : Message.outWithFlag (verb(), this, MessageFlag.CALL_BACK_ON_FAILURE);
+ Message<ReadCommand> msg = trackRepairedData
+ ? Message.outWithFlags(verb(), this, MessageFlag.CALL_BACK_ON_FAILURE, MessageFlag.TRACK_REPAIRED_DATA)
+ : Message.outWithFlag(verb(), this, MessageFlag.CALL_BACK_ON_FAILURE);
+ if (trackWarnings)
+ msg = msg.withFlag(MessageFlag.TRACK_WARNINGS);
+ return msg;
}
public abstract Verb verb();
@@ -746,6 +773,11 @@ public abstract class ReadCommand extends AbstractReadQuery
return sb.toString();
}
+ /**
+ * Return the queried token(s) for logging
+ */
+ public abstract String loggableTokens();
+
// Monitorable interface
public String name()
{
diff --git a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
index 2c28ed9..2260fde 100644
--- a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
@@ -48,10 +48,14 @@ public class ReadCommandVerbHandler implements IVerbHandler<ReadCommand>
ReadCommand command = message.payload;
validateTransientStatus(message);
+ MessageParams.reset();
long timeout = message.expiresAtNanos() - message.createdAtNanos();
command.setMonitoringTime(message.createdAtNanos(), message.isCrossNode(), timeout, DatabaseDescriptor.getSlowQueryTimeout(NANOSECONDS));
+ if (message.trackWarnings())
+ command.trackWarnings();
+
if (message.trackRepairedData())
command.trackRepairedStatus();
@@ -61,6 +65,21 @@ public class ReadCommandVerbHandler implements IVerbHandler<ReadCommand>
{
response = command.createResponse(iterator);
}
+ catch (RejectException e)
+ {
+ if (!command.isTrackingWarnings())
+ throw e;
+
+ // make sure to log as the exception is swallowed
+ logger.error(e.getMessage());
+
+ response = command.createResponse(EmptyIterators.unfilteredPartition(command.metadata()));
+ Message<ReadResponse> reply = message.responseWith(response);
+ reply = MessageParams.addToMessage(reply);
+
+ MessagingService.instance().send(reply, message.from());
+ return;
+ }
if (!command.complete())
{
@@ -71,6 +90,7 @@ public class ReadCommandVerbHandler implements IVerbHandler<ReadCommand>
Tracing.trace("Enqueuing response to {}", message.from());
Message<ReadResponse> reply = message.responseWith(response);
+ reply = MessageParams.addToMessage(reply);
MessagingService.instance().send(reply, message.from());
}
diff --git a/src/java/org/apache/cassandra/db/ReadQuery.java b/src/java/org/apache/cassandra/db/ReadQuery.java
index bd20c26..f9695d6 100644
--- a/src/java/org/apache/cassandra/db/ReadQuery.java
+++ b/src/java/org/apache/cassandra/db/ReadQuery.java
@@ -254,4 +254,8 @@ public interface ReadQuery
default void maybeValidateIndex()
{
}
+
+ default void trackWarnings()
+ {
+ }
}
diff --git a/src/java/org/apache/cassandra/exceptions/ReadFailureException.java b/src/java/org/apache/cassandra/db/RejectException.java
similarity index 55%
copy from src/java/org/apache/cassandra/exceptions/ReadFailureException.java
copy to src/java/org/apache/cassandra/db/RejectException.java
index 744cad4..a879b76 100644
--- a/src/java/org/apache/cassandra/exceptions/ReadFailureException.java
+++ b/src/java/org/apache/cassandra/db/RejectException.java
@@ -15,22 +15,30 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.cassandra.exceptions;
+package org.apache.cassandra.db;
-import java.util.Map;
-
-import com.google.common.collect.ImmutableMap;
+/**
+ * Represents a request to reject the current operation
+ */
+public abstract class RejectException extends RuntimeException
+{
+ public RejectException(String message)
+ {
+ super(message);
+ }
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.locator.InetAddressAndPort;
+ public RejectException(String message, Throwable cause)
+ {
+ super(message, cause);
+ }
-public class ReadFailureException extends RequestFailureException
-{
- public final boolean dataPresent;
+ public RejectException(Throwable cause)
+ {
+ super(cause);
+ }
- public ReadFailureException(ConsistencyLevel consistency, int received, int blockFor, boolean dataPresent, Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint)
+ public RejectException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace)
{
- super(ExceptionCode.READ_FAILURE, consistency, received, blockFor, ImmutableMap.copyOf(failureReasonByEndpoint));
- this.dataPresent = dataPresent;
+ super(message, cause, enableSuppression, writableStackTrace);
}
}
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 026a795..b7fcf31 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -71,9 +71,10 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
DataLimits limits,
DecoratedKey partitionKey,
ClusteringIndexFilter clusteringIndexFilter,
- IndexMetadata index)
+ IndexMetadata index,
+ boolean trackWarnings)
{
- super(Kind.SINGLE_PARTITION, isDigest, digestVersion, acceptsTransient, metadata, nowInSec, columnFilter, rowFilter, limits, index);
+ super(Kind.SINGLE_PARTITION, isDigest, digestVersion, acceptsTransient, metadata, nowInSec, columnFilter, rowFilter, limits, index, trackWarnings);
assert partitionKey.getPartitioner() == metadata.partitioner;
this.partitionKey = partitionKey;
this.clusteringIndexFilter = clusteringIndexFilter;
@@ -112,7 +113,8 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
limits,
partitionKey,
clusteringIndexFilter,
- indexMetadata);
+ indexMetadata,
+ false);
}
/**
@@ -288,7 +290,8 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
limits(),
partitionKey(),
clusteringIndexFilter(),
- indexMetadata());
+ indexMetadata(),
+ isTrackingWarnings());
}
@Override
@@ -304,7 +307,8 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
limits(),
partitionKey(),
clusteringIndexFilter(),
- indexMetadata());
+ indexMetadata(),
+ isTrackingWarnings());
}
@Override
@@ -320,7 +324,8 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
limits(),
partitionKey(),
clusteringIndexFilter(),
- indexMetadata());
+ indexMetadata(),
+ isTrackingWarnings());
}
@Override
@@ -336,7 +341,8 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
newLimits,
partitionKey(),
clusteringIndexFilter(),
- indexMetadata());
+ indexMetadata(),
+ isTrackingWarnings());
}
@Override
@@ -371,13 +377,16 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
{
// We shouldn't have set digest yet when reaching that point
assert !isDigestQuery();
- return create(metadata(),
- nowInSec(),
- columnFilter(),
- rowFilter(),
- limits,
- partitionKey(),
- lastReturned == null ? clusteringIndexFilter() : clusteringIndexFilter.forPaging(metadata().comparator, lastReturned, false));
+ SinglePartitionReadCommand cmd = create(metadata(),
+ nowInSec(),
+ columnFilter(),
+ rowFilter(),
+ limits,
+ partitionKey(),
+ lastReturned == null ? clusteringIndexFilter() : clusteringIndexFilter.forPaging(metadata().comparator, lastReturned, false));
+ if (isTrackingWarnings())
+ cmd.trackWarnings();
+ return cmd;
}
public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime) throws RequestExecutionException
@@ -1068,6 +1077,12 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
}
}
+ @Override
+ public String loggableTokens()
+ {
+ return "token=" + partitionKey.getToken().toString();
+ }
+
protected void serializeSelection(DataOutputPlus out, int version) throws IOException
{
metadata().partitionKeyType.writeValue(partitionKey().getKey(), out);
@@ -1151,7 +1166,7 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
{
DecoratedKey key = metadata.partitioner.decorateKey(metadata.partitionKeyType.readBuffer(in, DatabaseDescriptor.getMaxValueSize()));
ClusteringIndexFilter filter = ClusteringIndexFilter.serializer.deserialize(in, version, metadata);
- return new SinglePartitionReadCommand(isDigest, digestVersion, acceptsTransient, metadata, nowInSec, columnFilter, rowFilter, limits, key, filter, index);
+ return new SinglePartitionReadCommand(isDigest, digestVersion, acceptsTransient, metadata, nowInSec, columnFilter, rowFilter, limits, key, filter, index, false);
}
}
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadQuery.java b/src/java/org/apache/cassandra/db/SinglePartitionReadQuery.java
index 755d552..5d344de 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadQuery.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadQuery.java
@@ -282,6 +282,12 @@ public interface SinglePartitionReadQuery extends ReadQuery
}
@Override
+ public void trackWarnings()
+ {
+ queries.forEach(ReadQuery::trackWarnings);
+ }
+
+ @Override
public String toString()
{
return queries.toString();
diff --git a/src/java/org/apache/cassandra/db/filter/TombstoneOverwhelmingException.java b/src/java/org/apache/cassandra/db/filter/TombstoneOverwhelmingException.java
index 28d49ae..efca3ac 100644
--- a/src/java/org/apache/cassandra/db/filter/TombstoneOverwhelmingException.java
+++ b/src/java/org/apache/cassandra/db/filter/TombstoneOverwhelmingException.java
@@ -24,7 +24,7 @@ import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.marshal.*;
-public class TombstoneOverwhelmingException extends RuntimeException
+public class TombstoneOverwhelmingException extends RejectException
{
public TombstoneOverwhelmingException(int numTombstones, String query, TableMetadata metadata, DecoratedKey lastPartitionKey, ClusteringPrefix<?> lastClustering)
{
diff --git a/src/java/org/apache/cassandra/exceptions/ReadFailureException.java b/src/java/org/apache/cassandra/exceptions/ReadAbortException.java
similarity index 66%
copy from src/java/org/apache/cassandra/exceptions/ReadFailureException.java
copy to src/java/org/apache/cassandra/exceptions/ReadAbortException.java
index 744cad4..0075858 100644
--- a/src/java/org/apache/cassandra/exceptions/ReadFailureException.java
+++ b/src/java/org/apache/cassandra/exceptions/ReadAbortException.java
@@ -15,22 +15,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.cassandra.exceptions;
import java.util.Map;
-import com.google.common.collect.ImmutableMap;
-
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.locator.InetAddressAndPort;
-public class ReadFailureException extends RequestFailureException
+/**
+ * Special Read Failure which is caused by user query; implies a user request is not allowed and not that Cassandra had an issue.
+ */
+public abstract class ReadAbortException extends ReadFailureException
{
- public final boolean dataPresent;
-
- public ReadFailureException(ConsistencyLevel consistency, int received, int blockFor, boolean dataPresent, Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint)
+ protected ReadAbortException(String msg, ConsistencyLevel consistency, int received, int blockFor, boolean dataPresent, Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint)
{
- super(ExceptionCode.READ_FAILURE, consistency, received, blockFor, ImmutableMap.copyOf(failureReasonByEndpoint));
- this.dataPresent = dataPresent;
+ super(msg, consistency, received, blockFor, dataPresent, failureReasonByEndpoint);
}
}
diff --git a/src/java/org/apache/cassandra/exceptions/ReadFailureException.java b/src/java/org/apache/cassandra/exceptions/ReadFailureException.java
index 744cad4..698c8ae 100644
--- a/src/java/org/apache/cassandra/exceptions/ReadFailureException.java
+++ b/src/java/org/apache/cassandra/exceptions/ReadFailureException.java
@@ -33,4 +33,10 @@ public class ReadFailureException extends RequestFailureException
super(ExceptionCode.READ_FAILURE, consistency, received, blockFor, ImmutableMap.copyOf(failureReasonByEndpoint));
this.dataPresent = dataPresent;
}
+
+ protected ReadFailureException(String msg, ConsistencyLevel consistency, int received, int blockFor, boolean dataPresent, Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint)
+ {
+ super(ExceptionCode.READ_FAILURE, msg, consistency, received, blockFor, failureReasonByEndpoint);
+ this.dataPresent = dataPresent;
+ }
}
diff --git a/src/java/org/apache/cassandra/exceptions/ReadFailureException.java b/src/java/org/apache/cassandra/exceptions/ReadSizeAbortException.java
similarity index 66%
copy from src/java/org/apache/cassandra/exceptions/ReadFailureException.java
copy to src/java/org/apache/cassandra/exceptions/ReadSizeAbortException.java
index 744cad4..ed81008 100644
--- a/src/java/org/apache/cassandra/exceptions/ReadFailureException.java
+++ b/src/java/org/apache/cassandra/exceptions/ReadSizeAbortException.java
@@ -15,22 +15,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.cassandra.exceptions;
import java.util.Map;
-import com.google.common.collect.ImmutableMap;
-
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.locator.InetAddressAndPort;
-public class ReadFailureException extends RequestFailureException
+public class ReadSizeAbortException extends ReadAbortException
{
- public final boolean dataPresent;
-
- public ReadFailureException(ConsistencyLevel consistency, int received, int blockFor, boolean dataPresent, Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint)
+ public ReadSizeAbortException(String msg, ConsistencyLevel consistency, int received, int blockFor, boolean dataPresent, Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint)
{
- super(ExceptionCode.READ_FAILURE, consistency, received, blockFor, ImmutableMap.copyOf(failureReasonByEndpoint));
- this.dataPresent = dataPresent;
+ super(msg, consistency, received, blockFor, dataPresent, failureReasonByEndpoint);
}
}
diff --git a/src/java/org/apache/cassandra/exceptions/RequestFailureException.java b/src/java/org/apache/cassandra/exceptions/RequestFailureException.java
index 56cee1a..c75ff9c 100644
--- a/src/java/org/apache/cassandra/exceptions/RequestFailureException.java
+++ b/src/java/org/apache/cassandra/exceptions/RequestFailureException.java
@@ -32,7 +32,12 @@ public class RequestFailureException extends RequestExecutionException
protected RequestFailureException(ExceptionCode code, ConsistencyLevel consistency, int received, int blockFor, Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint)
{
- super(code, buildErrorMessage(received, failureReasonByEndpoint));
+ this(code, buildErrorMessage(received, failureReasonByEndpoint), consistency, received, blockFor, failureReasonByEndpoint);
+ }
+
+ protected RequestFailureException(ExceptionCode code, String msg, ConsistencyLevel consistency, int received, int blockFor, Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint)
+ {
+ super(code, buildErrorMessage(msg, failureReasonByEndpoint));
this.consistency = consistency;
this.received = received;
this.blockFor = blockFor;
@@ -41,10 +46,7 @@ public class RequestFailureException extends RequestExecutionException
private static String buildErrorMessage(int received, Map<InetAddressAndPort, RequestFailureReason> failures)
{
- return String.format("Operation failed - received %d responses and %d failures: %s",
- received,
- failures.size(),
- buildFailureString(failures));
+ return String.format("received %d responses and %d failures", received, failures.size());
}
private static String buildFailureString(Map<InetAddressAndPort, RequestFailureReason> failures)
@@ -53,4 +55,13 @@ public class RequestFailureException extends RequestExecutionException
.map(e -> String.format("%s from %s", e.getValue(), e.getKey()))
.collect(Collectors.joining(", "));
}
+
+ private static String buildErrorMessage(CharSequence msg, Map<InetAddressAndPort, RequestFailureReason> failures)
+ {
+ StringBuilder sb = new StringBuilder("Operation failed - ");
+ sb.append(msg);
+ if (failures != null && !failures.isEmpty())
+ sb.append(": ").append(buildFailureString(failures));
+ return sb.toString();
+ }
}
diff --git a/src/java/org/apache/cassandra/exceptions/RequestFailureReason.java b/src/java/org/apache/cassandra/exceptions/RequestFailureReason.java
index 1cdbdb5..3f6c2d4 100644
--- a/src/java/org/apache/cassandra/exceptions/RequestFailureReason.java
+++ b/src/java/org/apache/cassandra/exceptions/RequestFailureReason.java
@@ -35,8 +35,9 @@ public enum RequestFailureReason
UNKNOWN (0),
READ_TOO_MANY_TOMBSTONES (1),
TIMEOUT (2),
- INCOMPATIBLE_SCHEMA (3);
-
+ INCOMPATIBLE_SCHEMA (3),
+ READ_TOO_LARGE (4);
+
public static final Serializer serializer = new Serializer();
public final int code;
diff --git a/src/java/org/apache/cassandra/exceptions/ReadFailureException.java b/src/java/org/apache/cassandra/exceptions/TombstoneAbortException.java
similarity index 61%
copy from src/java/org/apache/cassandra/exceptions/ReadFailureException.java
copy to src/java/org/apache/cassandra/exceptions/TombstoneAbortException.java
index 744cad4..e86e760 100644
--- a/src/java/org/apache/cassandra/exceptions/ReadFailureException.java
+++ b/src/java/org/apache/cassandra/exceptions/TombstoneAbortException.java
@@ -15,22 +15,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.cassandra.exceptions;
import java.util.Map;
-import com.google.common.collect.ImmutableMap;
-
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.locator.InetAddressAndPort;
-public class ReadFailureException extends RequestFailureException
+import static org.apache.cassandra.service.reads.ReadCallback.tombstoneAbortMessage;
+
+public class TombstoneAbortException extends ReadAbortException
{
- public final boolean dataPresent;
+ public final int nodes;
+ public final int tombstones;
- public ReadFailureException(ConsistencyLevel consistency, int received, int blockFor, boolean dataPresent, Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint)
+ public TombstoneAbortException(int nodes, int tombstones, String cql, boolean dataPresent, ConsistencyLevel consistency, int received, int blockFor, Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint)
{
- super(ExceptionCode.READ_FAILURE, consistency, received, blockFor, ImmutableMap.copyOf(failureReasonByEndpoint));
- this.dataPresent = dataPresent;
+ super(tombstoneAbortMessage(nodes, tombstones, cql), consistency, received, blockFor, dataPresent, failureReasonByEndpoint);
+ this.nodes = nodes;
+ this.tombstones = tombstones;
}
}
diff --git a/src/java/org/apache/cassandra/metrics/ClientRequestMetrics.java b/src/java/org/apache/cassandra/metrics/ClientRequestMetrics.java
index e3a6970..19bc6d6 100644
--- a/src/java/org/apache/cassandra/metrics/ClientRequestMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/ClientRequestMetrics.java
@@ -22,6 +22,9 @@ package org.apache.cassandra.metrics;
import com.codahale.metrics.Meter;
+import org.apache.cassandra.exceptions.ReadAbortException;
+import org.apache.cassandra.exceptions.ReadSizeAbortException;
+import org.apache.cassandra.exceptions.TombstoneAbortException;
import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
@@ -31,6 +34,9 @@ public class ClientRequestMetrics extends LatencyMetrics
public final Meter timeouts;
public final Meter unavailables;
public final Meter failures;
+ public final Meter aborts;
+ public final Meter tombstoneAborts;
+ public final Meter readSizeAborts;
public ClientRequestMetrics(String scope)
{
@@ -39,6 +45,24 @@ public class ClientRequestMetrics extends LatencyMetrics
timeouts = Metrics.meter(factory.createMetricName("Timeouts"));
unavailables = Metrics.meter(factory.createMetricName("Unavailables"));
failures = Metrics.meter(factory.createMetricName("Failures"));
+ aborts = Metrics.meter(factory.createMetricName("Aborts"));
+ tombstoneAborts = Metrics.meter(factory.createMetricName("TombstoneAborts"));
+ readSizeAborts = Metrics.meter(factory.createMetricName("ReadSizeAborts"));
+ }
+
+ public void markAbort(Throwable cause)
+ {
+ aborts.mark();
+ if (!(cause instanceof ReadAbortException))
+ return;
+ if (cause instanceof TombstoneAbortException)
+ {
+ tombstoneAborts.mark();
+ }
+ else if (cause instanceof ReadSizeAbortException)
+ {
+ readSizeAborts.mark();
+ }
}
public void release()
@@ -47,5 +71,8 @@ public class ClientRequestMetrics extends LatencyMetrics
Metrics.remove(factory.createMetricName("Timeouts"));
Metrics.remove(factory.createMetricName("Unavailables"));
Metrics.remove(factory.createMetricName("Failures"));
+ Metrics.remove(factory.createMetricName("Aborts"));
+ Metrics.remove(factory.createMetricName("TombstoneAborts"));
+ Metrics.remove(factory.createMetricName("ReadSizeAborts"));
}
}
diff --git a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
index dadbe47..d0607af 100644
--- a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
@@ -153,6 +153,12 @@ public class KeyspaceMetrics
public final Histogram repairedDataTrackingOverreadRows;
public final Timer repairedDataTrackingOverreadTime;
+ public final Meter clientTombstoneWarnings;
+ public final Meter clientTombstoneAborts;
+
+ public final Meter clientReadSizeWarnings;
+ public final Meter clientReadSizeAborts;
+
public final MetricNameFactory factory;
private Keyspace keyspace;
@@ -235,6 +241,12 @@ public class KeyspaceMetrics
repairedDataTrackingOverreadRows = createKeyspaceHistogram("RepairedDataTrackingOverreadRows", false);
repairedDataTrackingOverreadTime = createKeyspaceTimer("RepairedDataTrackingOverreadTime");
+
+ clientTombstoneWarnings = createKeyspaceMeter("ClientTombstoneWarnings");
+ clientTombstoneAborts = createKeyspaceMeter("ClientTombstoneAborts");
+
+ clientReadSizeWarnings = createKeyspaceMeter("ClientReadSizeWarnings");
+ clientReadSizeAborts = createKeyspaceMeter("ClientReadSizeAborts");
}
/**
diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java
index 09f41a1..ced0622 100644
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@ -259,6 +259,11 @@ public class TableMetrics
/** When sampler activated, will track the slowest local reads **/
public final Sampler<String> topLocalReadQueryTime;
+ public final TableMeter clientTombstoneWarnings;
+ public final TableMeter clientTombstoneAborts;
+ public final TableMeter clientReadSizeWarnings;
+ public final TableMeter clientReadSizeAborts;
+
private static Pair<Long, Long> totalNonSystemTablesSize(Predicate<SSTableReader> predicate)
{
long total = 0;
@@ -913,6 +918,12 @@ public class TableMetrics
}
return cnt;
});
+
+ clientTombstoneWarnings = createTableMeter("ClientTombstoneWarnings", cfs.keyspace.metric.clientTombstoneWarnings);
+ clientTombstoneAborts = createTableMeter("ClientTombstoneAborts", cfs.keyspace.metric.clientTombstoneAborts);
+
+ clientReadSizeWarnings = createTableMeter("ClientReadSizeWarnings", cfs.keyspace.metric.clientReadSizeWarnings);
+ clientReadSizeAborts = createTableMeter("ClientReadSizeAborts", cfs.keyspace.metric.clientReadSizeAborts);
}
public void updateSSTableIterated(int count)
diff --git a/src/java/org/apache/cassandra/net/Message.java b/src/java/org/apache/cassandra/net/Message.java
index ca74012..19e2231 100644
--- a/src/java/org/apache/cassandra/net/Message.java
+++ b/src/java/org/apache/cassandra/net/Message.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.net;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.EnumMap;
import java.util.Map;
import java.util.UUID;
@@ -136,6 +137,11 @@ public class Message<T>
return header.callBackOnFailure();
}
+ public boolean trackWarnings()
+ {
+ return header.trackWarnings();
+ }
+
/** See CASSANDRA-14145 */
public boolean trackRepairedData()
{
@@ -267,6 +273,23 @@ public class Message<T>
return new Message<>(header.withParam(ParamType.FORWARD_TO, peers), payload);
}
+ public Message<T> withFlag(MessageFlag flag)
+ {
+ return new Message<>(header.withFlag(flag), payload);
+ }
+
+ public Message<T> withParam(ParamType type, Object value)
+ {
+ return new Message<>(header.withParam(type, value), payload);
+ }
+
+ public Message<T> withParams(Map<ParamType, Object> values)
+ {
+ if (values == null || values.isEmpty())
+ return this;
+ return new Message<>(header.withParams(values), payload);
+ }
+
private static final EnumMap<ParamType, Object> NO_PARAMS = new EnumMap<>(ParamType.class);
private static Map<ParamType, Object> buildParams(ParamType type, Object value)
@@ -295,6 +318,16 @@ public class Message<T>
return params;
}
+ private static Map<ParamType, Object> addParams(Map<ParamType, Object> params, Map<ParamType, Object> values)
+ {
+ if (values == null || values.isEmpty())
+ return params;
+
+ params = new EnumMap<>(params);
+ params.putAll(values);
+ return params;
+ }
+
/*
* id generation
*/
@@ -383,6 +416,11 @@ public class Message<T>
return new Header(id, verb, from, createdAtNanos, expiresAtNanos, flags, addParam(params, type, value));
}
+ Header withParams(Map<ParamType, Object> values)
+ {
+ return new Header(id, verb, from, createdAtNanos, expiresAtNanos, flags, addParams(params, values));
+ }
+
boolean callBackOnFailure()
{
return MessageFlag.CALL_BACK_ON_FAILURE.isIn(flags);
@@ -393,6 +431,11 @@ public class Message<T>
return MessageFlag.TRACK_REPAIRED_DATA.isIn(flags);
}
+ boolean trackWarnings()
+ {
+ return MessageFlag.TRACK_WARNINGS.isIn(flags);
+ }
+
@Nullable
ForwardingInfo forwardTo()
{
@@ -416,6 +459,11 @@ public class Message<T>
{
return (TraceType) params.getOrDefault(ParamType.TRACE_TYPE, TraceType.QUERY);
}
+
+ public Map<ParamType, Object> params()
+ {
+ return Collections.unmodifiableMap(params);
+ }
}
@SuppressWarnings("WeakerAccess")
diff --git a/src/java/org/apache/cassandra/net/MessageFlag.java b/src/java/org/apache/cassandra/net/MessageFlag.java
index c74784d..441b06b 100644
--- a/src/java/org/apache/cassandra/net/MessageFlag.java
+++ b/src/java/org/apache/cassandra/net/MessageFlag.java
@@ -27,7 +27,9 @@ public enum MessageFlag
/** a failure response should be sent back in case of failure */
CALL_BACK_ON_FAILURE (0),
/** track repaired data - see CASSANDRA-14145 */
- TRACK_REPAIRED_DATA (1);
+ TRACK_REPAIRED_DATA (1),
+ /** allow creating warnings or aborting queries based off query - see CASSANDRA-16850 */
+ TRACK_WARNINGS(2);
private final int id;
diff --git a/src/java/org/apache/cassandra/net/ParamType.java b/src/java/org/apache/cassandra/net/ParamType.java
index 5d4b982..d8b8c0e 100644
--- a/src/java/org/apache/cassandra/net/ParamType.java
+++ b/src/java/org/apache/cassandra/net/ParamType.java
@@ -19,12 +19,12 @@ package org.apache.cassandra.net;
import java.util.HashMap;
import java.util.Map;
-
import javax.annotation.Nullable;
import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.Int32Serializer;
import org.apache.cassandra.utils.UUIDSerializer;
import static java.lang.Math.max;
@@ -54,7 +54,10 @@ public enum ParamType
TRACE_TYPE (6, "TraceType", Tracing.traceTypeSerializer),
@Deprecated
- TRACK_REPAIRED_DATA (7, "TrackRepaired", LegacyFlag.serializer);
+ TRACK_REPAIRED_DATA (7, "TrackRepaired", LegacyFlag.serializer),
+
+ TOMBSTONE_ABORT(8, "TSA", Int32Serializer.serializer),
+ TOMBSTONE_WARNING(9, "TSW", Int32Serializer.serializer);
final int id;
@Deprecated final String legacyAlias; // pre-4.0 we used to serialize entire param name string
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 2f6ad38..1387ce3 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -53,11 +53,14 @@ import org.apache.cassandra.batchlog.Batch;
import org.apache.cassandra.batchlog.BatchlogManager;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.RejectException;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.CounterMutation;
import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.EmptyIterators;
import org.apache.cassandra.db.IMutation;
import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.MessageParams;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.PartitionRangeReadCommand;
import org.apache.cassandra.db.ReadCommand;
@@ -75,6 +78,7 @@ import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.db.rows.RowIterator;
import org.apache.cassandra.db.view.ViewUtils;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.ReadAbortException;
import org.apache.cassandra.exceptions.CasWriteTimeoutException;
import org.apache.cassandra.exceptions.CasWriteUnknownResultException;
import org.apache.cassandra.exceptions.InvalidRequestException;
@@ -357,6 +361,12 @@ public class StorageProxy implements StorageProxyMBean
writeMetricsMap.get(consistencyForPaxos).timeouts.mark();
throw e;
}
+ catch (ReadAbortException e)
+ {
+ casWriteMetrics.markAbort(e);
+ writeMetricsMap.get(consistencyForPaxos).markAbort(e);
+ throw e;
+ }
catch (WriteFailureException | ReadFailureException e)
{
casWriteMetrics.failures.mark();
@@ -1798,6 +1808,13 @@ public class StorageProxy implements StorageProxyMBean
readMetricsMap.get(consistencyLevel).timeouts.mark();
throw e;
}
+ catch (ReadAbortException e)
+ {
+ readMetrics.markAbort(e);
+ casReadMetrics.markAbort(e);
+ readMetricsMap.get(consistencyLevel).markAbort(e);
+ throw e;
+ }
catch (ReadFailureException e)
{
readMetrics.failures.mark();
@@ -1846,6 +1863,11 @@ public class StorageProxy implements StorageProxyMBean
readMetricsMap.get(consistencyLevel).timeouts.mark();
throw e;
}
+ catch (ReadAbortException e)
+ {
+ recordReadRegularAbort(consistencyLevel, e);
+ throw e;
+ }
catch (ReadFailureException e)
{
readMetrics.failures.mark();
@@ -1863,6 +1885,12 @@ public class StorageProxy implements StorageProxyMBean
}
}
+ public static void recordReadRegularAbort(ConsistencyLevel consistencyLevel, Throwable cause)
+ {
+ readMetrics.markAbort(cause);
+ readMetricsMap.get(consistencyLevel).markAbort(cause);
+ }
+
public static PartitionIterator concatAndBlockOnRepair(List<PartitionIterator> iterators, List<ReadRepair<?, ?>> repairs)
{
PartitionIterator concatenated = PartitionIterators.concat(iterators);
@@ -1980,6 +2008,9 @@ public class StorageProxy implements StorageProxyMBean
{
try
{
+ MessageParams.reset();
+
+ boolean readRejected = false;
command.setMonitoringTime(approxCreationTimeNanos, false, verb.expiresAfterNanos(), DatabaseDescriptor.getSlowQueryTimeout(NANOSECONDS));
ReadResponse response;
@@ -1988,6 +2019,13 @@ public class StorageProxy implements StorageProxyMBean
{
response = command.createResponse(iterator);
}
+ catch (RejectException e)
+ {
+ if (!command.isTrackingWarnings())
+ throw e;
+ response = command.createResponse(EmptyIterators.unfilteredPartition(command.metadata()));
+ readRejected = true;
+ }
if (command.complete())
{
@@ -1999,7 +2037,8 @@ public class StorageProxy implements StorageProxyMBean
handler.onFailure(FBUtilities.getBroadcastAddressAndPort(), RequestFailureReason.UNKNOWN);
}
- MessagingService.instance().latencySubscribers.add(FBUtilities.getBroadcastAddressAndPort(), MonotonicClock.approxTime.now() - approxCreationTimeNanos, NANOSECONDS);
+ if (!readRejected)
+ MessagingService.instance().latencySubscribers.add(FBUtilities.getBroadcastAddressAndPort(), MonotonicClock.approxTime.now() - approxCreationTimeNanos, NANOSECONDS);
}
catch (Throwable t)
{
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index c226d37..29b52b1 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -6059,4 +6059,43 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public void addSnapshot(TableSnapshot snapshot) {
snapshotManager.addSnapshot(snapshot);
}
+
+ @Override
+ public long getClientLargeReadWarnThresholdKB()
+ {
+ return DatabaseDescriptor.getClientLargeReadWarnThresholdKB();
+ }
+
+ @Override
+ public void setClientLargeReadWarnThresholdKB(long threshold)
+ {
+ DatabaseDescriptor.setClientLargeReadWarnThresholdKB(threshold);
+ logger.info("updated client_large_read_warn_threshold_kb to {}", threshold);
+ }
+
+ @Override
+ public long getClientLargeReadAbortThresholdKB()
+ {
+ return DatabaseDescriptor.getClientLargeReadAbortThresholdKB();
+ }
+
+ @Override
+ public void setClientLargeReadAbortThresholdKB(long threshold)
+ {
+ DatabaseDescriptor.setClientLargeReadAbortThresholdKB(threshold);
+ logger.info("updated client_large_read_abort_threshold_kb to {}", threshold);
+ }
+
+ @Override
+ public boolean getClientTrackWarningsEnabled()
+ {
+ return DatabaseDescriptor.getClientTrackWarningsEnabled();
+ }
+
+ @Override
+ public void setClientTrackWarningsEnabled(boolean value)
+ {
+ DatabaseDescriptor.setClientTrackWarningsEnabled(value);
+ logger.info("updated client_track_warnings_enabled to {}", value);
+ }
}
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 3154c82..4a27f84 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -884,4 +884,11 @@ public interface StorageServiceMBean extends NotificationEmitter
public void setCompactionTombstoneWarningThreshold(int count);
public int getCompactionTombstoneWarningThreshold();
+
+ public long getClientLargeReadWarnThresholdKB();
+ public void setClientLargeReadWarnThresholdKB(long threshold);
+ public long getClientLargeReadAbortThresholdKB();
+ public void setClientLargeReadAbortThresholdKB(long threshold);
+ public boolean getClientTrackWarningsEnabled();
+ public void setClientTrackWarningsEnabled(boolean value);
}
diff --git a/src/java/org/apache/cassandra/service/reads/ReadCallback.java b/src/java/org/apache/cassandra/service/reads/ReadCallback.java
index 91d9370..6703147 100644
--- a/src/java/org/apache/cassandra/service/reads/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/reads/ReadCallback.java
@@ -20,9 +20,16 @@ package org.apache.cassandra.service.reads;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+import com.google.common.annotations.VisibleForTesting;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.MessageParams;
+import org.apache.cassandra.exceptions.TombstoneAbortException;
import org.apache.cassandra.locator.ReplicaPlan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,9 +42,12 @@ import org.apache.cassandra.exceptions.ReadTimeoutException;
import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.locator.Endpoints;
import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.ParamType;
import org.apache.cassandra.net.RequestCallback;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.service.ClientWarn;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.concurrent.SimpleCondition;
@@ -46,6 +56,31 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> implements RequestCallback<ReadResponse>
{
protected static final Logger logger = LoggerFactory.getLogger( ReadCallback.class );
+ private class WarningCounter
+ {
+ // the highest number of tombstones reported by a node's warning
+ final AtomicInteger tombstoneWarnings = new AtomicInteger();
+ final AtomicInteger maxTombstoneWarningCount = new AtomicInteger();
+ // the highest number of tombstones reported by a node's rejection. This should be the same as
+ // our configured limit, but including to aid in diagnosing misconfigurations
+ final AtomicInteger tombstoneAborts = new AtomicInteger();
+ final AtomicInteger maxTombstoneAbortsCount = new AtomicInteger();
+
+ // TODO: take message as arg and return boolean for 'had warning' etc
+ void addTombstoneWarning(InetAddressAndPort from, int tombstones)
+ {
+ if (!waitingFor(from)) return;
+ tombstoneWarnings.incrementAndGet();
+ maxTombstoneWarningCount.accumulateAndGet(tombstones, Math::max);
+ }
+
+ void addTombstoneAbort(InetAddressAndPort from, int tombstones)
+ {
+ if (!waitingFor(from)) return;
+ tombstoneAborts.incrementAndGet();
+ maxTombstoneAbortsCount.accumulateAndGet(tombstones, Math::max);
+ }
+ }
public final ResponseResolver<E, P> resolver;
final SimpleCondition condition = new SimpleCondition();
@@ -59,6 +94,9 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
= AtomicIntegerFieldUpdater.newUpdater(ReadCallback.class, "failures");
private volatile int failures = 0;
private final Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint;
+ private volatile WarningCounter warningCounter;
+ private static final AtomicReferenceFieldUpdater<ReadCallback, ReadCallback.WarningCounter> warningsUpdater
+ = AtomicReferenceFieldUpdater.newUpdater(ReadCallback.class, ReadCallback.WarningCounter.class, "warningCounter");
public ReadCallback(ResponseResolver<E, P> resolver, ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime)
{
@@ -93,6 +131,23 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
}
}
+ @VisibleForTesting
+ public static String tombstoneAbortMessage(int nodes, int tombstones, String cql)
+ {
+ return String.format("%s nodes scanned over %s tombstones and aborted the query %s (see tombstone_failure_threshold)", nodes, tombstones, cql);
+ }
+
+ @VisibleForTesting
+ public static String tombstoneWarnMessage(int nodes, int tombstones, String cql)
+ {
+ return String.format("%s nodes scanned up to %s tombstones and issued tombstone warnings for query %s (see tombstone_warn_threshold)", nodes, tombstones, cql);
+ }
+
+ private ColumnFamilyStore cfs()
+ {
+ return Schema.instance.getColumnFamilyStoreInstance(command.metadata().id);
+ }
+
public void awaitResults() throws ReadFailureException, ReadTimeoutException
{
boolean signaled = await(command.getTimeout(MILLISECONDS), TimeUnit.MILLISECONDS);
@@ -105,6 +160,25 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
*/
int received = resolver.responses.size();
boolean failed = failures > 0 && (blockFor > received || !resolver.isDataPresent());
+ WarningCounter warnings = warningCounter;
+ if (warnings != null)
+ {
+ if (warnings.tombstoneAborts.get() > 0)
+ {
+ String msg = tombstoneAbortMessage(warnings.tombstoneAborts.get(), warnings.maxTombstoneAbortsCount.get(), command.toCQLString());
+ ClientWarn.instance.warn(msg + " with " + command.loggableTokens());
+ logger.warn(msg);
+ cfs().metric.clientTombstoneAborts.mark();
+ }
+
+ if (warnings.tombstoneWarnings.get() > 0)
+ {
+ String msg = tombstoneWarnMessage(warnings.tombstoneWarnings.get(), warnings.maxTombstoneWarningCount.get(), command.toCQLString());
+ ClientWarn.instance.warn(msg + " with " + command.loggableTokens());
+ logger.warn(msg);
+ cfs().metric.clientTombstoneWarnings.mark();
+ }
+ }
if (signaled && !failed)
return;
@@ -119,6 +193,10 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
logger.debug("{}; received {} of {} responses{}", failed ? "Failed" : "Timed out", received, blockFor, gotData);
}
+ if (warnings != null && warnings.tombstoneAborts.get() > 0)
+ throw new TombstoneAbortException(warnings.tombstoneAborts.get(), warnings.maxTombstoneAbortsCount.get(), command.toCQLString(), resolver.isDataPresent(),
+ replicaPlan.get().consistencyLevel(), received, blockFor, failureReasonByEndpoint);
+
// Same as for writes, see AbstractWriteResponseHandler
throw failed
? new ReadFailureException(replicaPlan().consistencyLevel(), received, blockFor, resolver.isDataPresent(), failureReasonByEndpoint)
@@ -134,6 +212,17 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
public void onResponse(Message<ReadResponse> message)
{
assertWaitingFor(message.from());
+ Map<ParamType, Object> params = message.header.params();
+ if (params.containsKey(ParamType.TOMBSTONE_ABORT))
+ {
+ getWarningCounter().addTombstoneAbort(message.from(), (Integer) params.get(ParamType.TOMBSTONE_ABORT));
+ onFailure(message.from(), RequestFailureReason.READ_TOO_MANY_TOMBSTONES);
+ return;
+ }
+ else if (params.containsKey(ParamType.TOMBSTONE_WARNING))
+ {
+ getWarningCounter().addTombstoneWarning(message.from(), (Integer) params.get(ParamType.TOMBSTONE_WARNING));
+ }
resolver.preprocess(message);
/*
@@ -146,10 +235,25 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
condition.signalAll();
}
+ private WarningCounter getWarningCounter()
+ {
+ WarningCounter current;
+ do {
+
+ current = warningCounter;
+ if (current != null)
+ return current;
+
+ current = new WarningCounter();
+ } while (!warningsUpdater.compareAndSet(this, null, current));
+ return current;
+ }
+
public void response(ReadResponse result)
{
Verb kind = command.isRangeRequest() ? Verb.RANGE_RSP : Verb.READ_RSP;
Message<ReadResponse> message = Message.internalResponse(kind, result);
+ message = MessageParams.addToMessage(message);
onResponse(message);
}
@@ -182,8 +286,12 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
*/
private void assertWaitingFor(InetAddressAndPort from)
{
- assert !replicaPlan().consistencyLevel().isDatacenterLocal()
- || DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(from))
- : "Received read response from unexpected replica: " + from;
+ assert waitingFor(from): "Received read response from unexpected replica: " + from;
+ }
+
+ private boolean waitingFor(InetAddressAndPort from)
+ {
+ return !replicaPlan().consistencyLevel().isDatacenterLocal()
+ || DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(from));
}
}
diff --git a/src/java/org/apache/cassandra/service/reads/range/RangeCommandIterator.java b/src/java/org/apache/cassandra/service/reads/range/RangeCommandIterator.java
index ae7ee60..d4dd3c8 100644
--- a/src/java/org/apache/cassandra/service/reads/range/RangeCommandIterator.java
+++ b/src/java/org/apache/cassandra/service/reads/range/RangeCommandIterator.java
@@ -34,6 +34,7 @@ import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.partitions.PartitionIterator;
import org.apache.cassandra.db.rows.RowIterator;
+import org.apache.cassandra.exceptions.ReadAbortException;
import org.apache.cassandra.exceptions.ReadFailureException;
import org.apache.cassandra.exceptions.ReadTimeoutException;
import org.apache.cassandra.exceptions.UnavailableException;
@@ -129,6 +130,11 @@ class RangeCommandIterator extends AbstractIterator<RowIterator> implements Part
rangeMetrics.timeouts.mark();
throw e;
}
+ catch (ReadAbortException e)
+ {
+ rangeMetrics.markAbort(e);
+ throw e;
+ }
catch (ReadFailureException e)
{
rangeMetrics.failures.mark();
diff --git a/src/java/org/apache/cassandra/exceptions/ReadFailureException.java b/src/java/org/apache/cassandra/utils/Int32Serializer.java
similarity index 52%
copy from src/java/org/apache/cassandra/exceptions/ReadFailureException.java
copy to src/java/org/apache/cassandra/utils/Int32Serializer.java
index 744cad4..731f5aa 100644
--- a/src/java/org/apache/cassandra/exceptions/ReadFailureException.java
+++ b/src/java/org/apache/cassandra/utils/Int32Serializer.java
@@ -15,22 +15,32 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.cassandra.exceptions;
-import java.util.Map;
+package org.apache.cassandra.utils;
-import com.google.common.collect.ImmutableMap;
+import java.io.IOException;
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
-public class ReadFailureException extends RequestFailureException
+public class Int32Serializer implements IVersionedSerializer<Integer>
{
- public final boolean dataPresent;
+ public static final Int32Serializer serializer = new Int32Serializer();
- public ReadFailureException(ConsistencyLevel consistency, int received, int blockFor, boolean dataPresent, Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint)
+ public void serialize(Integer t, DataOutputPlus out, int version) throws IOException
{
- super(ExceptionCode.READ_FAILURE, consistency, received, blockFor, ImmutableMap.copyOf(failureReasonByEndpoint));
- this.dataPresent = dataPresent;
+ out.writeInt(t);
+ }
+
+ public Integer deserialize(DataInputPlus in, int version) throws IOException
+ {
+ return in.readInt();
+ }
+
+ public long serializedSize(Integer t, int version)
+ {
+ return TypeSizes.sizeof(t.intValue());
}
}
diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml
index fcdef2d..c8ac72b 100644
--- a/test/conf/cassandra.yaml
+++ b/test/conf/cassandra.yaml
@@ -53,3 +53,6 @@ enable_materialized_views: true
enable_drop_compact_storage: true
file_cache_enabled: true
auto_hints_cleanup_enabled: true
+client_track_warnings_enabled: true
+client_large_read_warn_threshold_kb: 1024
+client_large_read_abort_threshold_kb: 4096
diff --git a/test/distributed/org/apache/cassandra/distributed/test/ClientReadSizeWarningTest.java b/test/distributed/org/apache/cassandra/distributed/test/ClientReadSizeWarningTest.java
new file mode 100644
index 0000000..2d790f4
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/ClientReadSizeWarningTest.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.function.Consumer;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import org.junit.*;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.SimpleStatement;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.*;
+import org.apache.cassandra.exceptions.ReadSizeAbortException;
+import org.apache.cassandra.exceptions.RequestFailureReason;
+import org.apache.cassandra.service.ClientWarn;
+import org.apache.cassandra.service.QueryState;
+import org.assertj.core.api.Assertions;
+import org.assertj.core.api.Condition;
+
+/**
+ * ReadSize client warn/abort is coordinator only, so the fact ClientMetrics is coordinator only does not
+ * impact the user experience
+ */
+public class ClientReadSizeWarningTest extends TestBaseImpl
+{
+ private static final Random RANDOM = new Random(0);
+ private static ICluster<IInvokableInstance> CLUSTER;
+ private static com.datastax.driver.core.Cluster JAVA_DRIVER;
+ private static com.datastax.driver.core.Session JAVA_DRIVER_SESSION;
+
+ @BeforeClass
+ public static void setupClass() throws IOException
+ {
+ Cluster.Builder builder = Cluster.build(3);
+ builder.withConfig(c -> c.with(Feature.NATIVE_PROTOCOL, Feature.GOSSIP));
+ CLUSTER = builder.start();
+ JAVA_DRIVER = JavaDriverUtils.create(CLUSTER);
+ JAVA_DRIVER_SESSION = JAVA_DRIVER.connect();
+
+ // setup threshold after init to avoid driver issues loading
+ // the test uses a rather small limit, which causes driver to fail while loading metadata
+ CLUSTER.stream().forEach(i -> i.runOnInstance(() -> {
+ DatabaseDescriptor.setClientLargeReadWarnThresholdKB(1);
+ DatabaseDescriptor.setClientLargeReadAbortThresholdKB(2);
+ }));
+ }
+
+ @Before
+ public void setup()
+ {
+ CLUSTER.schemaChange("DROP KEYSPACE IF EXISTS " + KEYSPACE);
+ init(CLUSTER);
+ CLUSTER.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v blob, PRIMARY KEY (pk, ck))");
+ }
+
+ private static void enable(boolean value)
+ {
+ CLUSTER.stream().forEach(i -> i.runOnInstance(() -> DatabaseDescriptor.setClientTrackWarningsEnabled(value)));
+ }
+
+ private static void assertPrefix(String expectedPrefix, String actual)
+ {
+ if (!actual.startsWith(expectedPrefix))
+ throw new AssertionError(String.format("expected \"%s\" to begin with \"%s\"", actual, expectedPrefix));
+ }
+
+ private static ByteBuffer bytes(int size)
+ {
+ byte[] b = new byte[size];
+ RANDOM.nextBytes(b);
+ return ByteBuffer.wrap(b);
+ }
+
+ @Test
+ public void noWarningsSinglePartition()
+ {
+ noWarnings("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk=1");
+ }
+
+ @Test
+ public void noWarningsScan()
+ {
+ noWarnings("SELECT * FROM " + KEYSPACE + ".tbl");
+ }
+
+ public void noWarnings(String cql)
+ {
+ CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, ?)", ConsistencyLevel.ALL, bytes(128));
+ CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, ?)", ConsistencyLevel.ALL, bytes(128));
+
+ Consumer<List<String>> test = warnings ->
+ Assert.assertEquals(Collections.emptyList(), warnings);
+
+ for (boolean b : Arrays.asList(true, false))
+ {
+ enable(b);
+ SimpleQueryResult result = CLUSTER.coordinator(1).executeWithResult(cql, ConsistencyLevel.ALL);
+ test.accept(result.warnings());
+ test.accept(driverQueryAll(cql).getExecutionInfo().getWarnings());
+ assertWarnAborts(0, 0, 0);
+ }
+ }
+
+ @Test
+ public void warnThresholdSinglePartition()
+ {
+ warnThreshold("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk=1");
+ }
+
+ @Test
+ public void warnThresholdScan()
+ {
+ warnThreshold("SELECT * FROM " + KEYSPACE + ".tbl");
+ }
+
+ public void warnThreshold(String cql)
+ {
+ CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, ?)", ConsistencyLevel.ALL, bytes(512));
+ CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, ?)", ConsistencyLevel.ALL, bytes(512));
+
+ Consumer<List<String>> testEnabled = warnings ->
+ assertPrefix("Read on table " + KEYSPACE + ".tbl has exceeded the size warning threshold", Iterables.getOnlyElement(warnings));
+
+ enable(true);
+ SimpleQueryResult result = CLUSTER.coordinator(1).executeWithResult(cql, ConsistencyLevel.ALL);
+ testEnabled.accept(result.warnings());
+ assertWarnAborts(1, 0, 0);
+ testEnabled.accept(driverQueryAll(cql).getExecutionInfo().getWarnings());
+ assertWarnAborts(2, 0, 0);
+
+ enable(false);
+ result = CLUSTER.coordinator(1).executeWithResult(cql, ConsistencyLevel.ALL);
+ Assertions.assertThat(result.warnings()).isEmpty();
+ Assertions.assertThat(driverQueryAll(cql).getExecutionInfo().getWarnings()).isEmpty();
+ assertWarnAborts(2, 0, 0);
+ }
+
+ @Test
+ public void failThresholdSinglePartition() throws UnknownHostException
+ {
+ failThreshold("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk=1");
+ }
+
+ @Test
+ public void failThresholdScan() throws UnknownHostException
+ {
+ failThreshold("SELECT * FROM " + KEYSPACE + ".tbl");
+ }
+
+ public void failThreshold(String cql) throws UnknownHostException
+ {
+ CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, ?)", ConsistencyLevel.ALL, bytes(512));
+ CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, ?)", ConsistencyLevel.ALL, bytes(512));
+ CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 3, ?)", ConsistencyLevel.ALL, bytes(512));
+ CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 4, ?)", ConsistencyLevel.ALL, bytes(512));
+ CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 5, ?)", ConsistencyLevel.ALL, bytes(512));
+
+ enable(true);
+ List<String> warnings = CLUSTER.get(1).callsOnInstance(() -> {
+ ClientWarn.instance.captureWarnings();
+ try
+ {
+ QueryProcessor.execute(cql, org.apache.cassandra.db.ConsistencyLevel.ALL, QueryState.forInternalCalls());
+ Assert.fail("Expected query failure");
+ }
+ catch (ReadSizeAbortException e)
+ {
+ // expected, client transport returns an error message and includes client warnings
+ }
+ return ClientWarn.instance.getWarnings();
+ }).call();
+ Assertions.assertThat(warnings).hasSize(1);
+ assertPrefix("Read on table " + KEYSPACE + ".tbl has exceeded the size failure threshold", warnings.get(0));
+ assertWarnAborts(0, 1, 1);
+
+ try
+ {
+ driverQueryAll(cql);
+ Assert.fail("Query should have thrown ReadFailureException");
+ }
+ catch (com.datastax.driver.core.exceptions.ReadFailureException e)
+ {
+ // without changing the client can't produce a better message...
+ // client does NOT include the message sent from the server in the exception; so the message doesn't work
+ // well in this case
+ Assertions.assertThat(e.getMessage()).endsWith("(1 responses were required but only 0 replica responded, 1 failed)");
+ ImmutableSet<InetAddress> expectedKeys = ImmutableSet.of(InetAddress.getByAddress(new byte[]{ 127, 0, 0, 1 }), InetAddress.getByAddress(new byte[]{ 127, 0, 0, 2 }), InetAddress.getByAddress(new byte[]{ 127, 0, 0, 3 }));
+ Assertions.assertThat(e.getFailuresMap())
+ .hasSize(1)
+ // coordinator changes from run to run, so can't assert map as the key is dynamic... so assert the domain of keys and the single value expect
+ .containsValue(RequestFailureReason.READ_TOO_LARGE.code)
+ .hasKeySatisfying(new Condition<InetAddress>() {
+ public boolean matches(InetAddress value)
+ {
+ return expectedKeys.contains(value);
+ }
+ });
+ }
+ assertWarnAborts(0, 2, 1);
+
+ // query should no longer fail
+ enable(false);
+ SimpleQueryResult result = CLUSTER.coordinator(1).executeWithResult(cql, ConsistencyLevel.ALL);
+ Assertions.assertThat(result.warnings()).isEmpty();
+ Assertions.assertThat(driverQueryAll(cql).getExecutionInfo().getWarnings()).isEmpty();
+ assertWarnAborts(0, 2, 0);
+ }
+
+ private static long GLOBAL_READ_ABORTS = 0;
+ private static void assertWarnAborts(int warns, int aborts, int globalAborts)
+ {
+ Assertions.assertThat(totalWarnings()).as("warnings").isEqualTo(warns);
+ Assertions.assertThat(totalAborts()).as("aborts").isEqualTo(aborts);
+ long expectedGlobalAborts = GLOBAL_READ_ABORTS + globalAborts;
+ Assertions.assertThat(totalReadAborts()).as("global aborts").isEqualTo(expectedGlobalAborts);
+ GLOBAL_READ_ABORTS = expectedGlobalAborts;
+ }
+
+ private static long totalWarnings()
+ {
+ return CLUSTER.stream().mapToLong(i -> i.metrics().getCounter("org.apache.cassandra.metrics.keyspace.ClientReadSizeWarnings." + KEYSPACE)).sum();
+ }
+
+ private static long totalAborts()
+ {
+ return CLUSTER.stream().mapToLong(i -> i.metrics().getCounter("org.apache.cassandra.metrics.keyspace.ClientReadSizeAborts." + KEYSPACE)).sum();
+ }
+
+ private static long totalReadAborts()
+ {
+ return CLUSTER.stream().mapToLong(i -> i.metrics().getCounter("org.apache.cassandra.metrics.ClientRequest.Aborts.Read-ALL")).sum();
+ }
+
+ private static ResultSet driverQueryAll(String cql)
+ {
+ return JAVA_DRIVER_SESSION.execute(new SimpleStatement(cql).setConsistencyLevel(com.datastax.driver.core.ConsistencyLevel.ALL));
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/ClientTombstoneWarningTest.java b/test/distributed/org/apache/cassandra/distributed/test/ClientTombstoneWarningTest.java
new file mode 100644
index 0000000..d529515
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/ClientTombstoneWarningTest.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Consumer;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.SimpleStatement;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.exceptions.ReadFailureException;
+import org.apache.cassandra.exceptions.RequestFailureReason;
+import org.apache.cassandra.exceptions.TombstoneAbortException;
+import org.apache.cassandra.service.ClientWarn;
+import org.apache.cassandra.service.QueryState;
+import org.assertj.core.api.Assertions;
+
+public class ClientTombstoneWarningTest extends TestBaseImpl
+{
+ private static final int TOMBSTONE_WARN = 50;
+ private static final int TOMBSTONE_FAIL = 100;
+ private static ICluster<IInvokableInstance> CLUSTER;
+ private static com.datastax.driver.core.Cluster JAVA_DRIVER;
+ private static com.datastax.driver.core.Session JAVA_DRIVER_SESSION;
+
+ @BeforeClass
+ public static void setupClass() throws IOException
+ {
+ Cluster.Builder builder = Cluster.build(3);
+ builder.withConfig(c -> c.set("tombstone_warn_threshold", TOMBSTONE_WARN)
+ .set("tombstone_failure_threshold", TOMBSTONE_FAIL)
+ .with(Feature.NATIVE_PROTOCOL, Feature.GOSSIP));
+ CLUSTER = builder.start();
+ JAVA_DRIVER = JavaDriverUtils.create(CLUSTER);
+ JAVA_DRIVER_SESSION = JAVA_DRIVER.connect();
+ }
+
+ @AfterClass
+ public static void teardown()
+ {
+ if (JAVA_DRIVER_SESSION != null)
+ JAVA_DRIVER_SESSION.close();
+ if (JAVA_DRIVER != null)
+ JAVA_DRIVER.close();
+ }
+
+ @Before
+ public void setup()
+ {
+ CLUSTER.schemaChange("DROP KEYSPACE IF EXISTS " + KEYSPACE);
+ init(CLUSTER);
+ CLUSTER.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+ }
+
+ private static void enable(boolean value)
+ {
+ CLUSTER.stream().forEach(i -> i.runOnInstance(() -> DatabaseDescriptor.setClientTrackWarningsEnabled(value)));
+ }
+
+ @Test
+ public void noWarningsSinglePartition()
+ {
+ noWarnings("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk=1");
+ }
+
+ @Test
+ public void noWarningsScan()
+ {
+ noWarnings("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk=1");
+ }
+
+ public void noWarnings(String cql)
+ {
+ Consumer<List<String>> test = warnings ->
+ Assert.assertEquals(Collections.emptyList(), warnings);
+
+ for (int i=0; i<TOMBSTONE_WARN; i++)
+ CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, null)", ConsistencyLevel.ALL, i);
+
+ for (boolean b : Arrays.asList(true, false))
+ {
+ enable(b);
+
+ SimpleQueryResult result = CLUSTER.coordinator(1).executeWithResult(cql, ConsistencyLevel.ALL);
+ test.accept(result.warnings());
+ test.accept(driverQueryAll(cql).getExecutionInfo().getWarnings());
+
+ assertWarnAborts(0, 0, 0);
+ }
+ }
+
+ @Test
+ public void warnThresholdSinglePartition()
+ {
+ warnThreshold("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", false);
+ }
+
+ @Test
+ public void warnThresholdScan()
+ {
+ warnThreshold("SELECT * FROM " + KEYSPACE + ".tbl", true);
+ }
+
+ private void warnThreshold(String cql, boolean isScan)
+ {
+ for (int i = 0; i < TOMBSTONE_WARN + 1; i++)
+ CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, null)", ConsistencyLevel.ALL, i);
+
+ enable(true);
+ Consumer<List<String>> testEnabled = warnings ->
+ Assertions.assertThat(Iterables.getOnlyElement(warnings))
+ .contains("nodes scanned up to " + (TOMBSTONE_WARN + 1) + " tombstones and issued tombstone warnings for query " + cql);
+
+ SimpleQueryResult result = CLUSTER.coordinator(1).executeWithResult(cql, ConsistencyLevel.ALL);
+ testEnabled.accept(result.warnings());
+ assertWarnAborts(1, 0, 0);
+ testEnabled.accept(driverQueryAll(cql).getExecutionInfo().getWarnings());
+ assertWarnAborts(2, 0, 0);
+
+ enable(false);
+ Consumer<List<String>> testDisabled = warnings -> {
+ // client warnings are currently coordinator only, so if present only 1 is expected
+ if (isScan)
+ {
+ // Scans perform multiple ReadCommands, which will not propgate the warnings to the top-level coordinator; so no warnings are expected
+ Assertions.assertThat(warnings).isEmpty();
+ }
+ else
+ {
+ Assertions.assertThat(Iterables.getOnlyElement(warnings))
+ .startsWith("Read " + (TOMBSTONE_WARN + 1) + " live rows and " + (TOMBSTONE_WARN + 1) + " tombstone cells for query " + cql);
+ }
+ };
+ result = CLUSTER.coordinator(1).executeWithResult(cql, ConsistencyLevel.ALL);
+ testDisabled.accept(result.warnings());
+ assertWarnAborts(2, 0, 0);
+ testDisabled.accept(driverQueryAll(cql).getExecutionInfo().getWarnings());
+ assertWarnAborts(2, 0, 0);
+ }
+
+ @Test
+ public void failThresholdSinglePartition() throws UnknownHostException
+ {
+ failThreshold("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", false);
+ }
+
+ @Test
+ public void failThresholdScan() throws UnknownHostException
+ {
+ failThreshold("SELECT * FROM " + KEYSPACE + ".tbl", true);
+ }
+
+ private void failThreshold(String cql, boolean isScan) throws UnknownHostException
+ {
+ for (int i = 0; i < TOMBSTONE_FAIL + 1; i++)
+ CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, null)", ConsistencyLevel.ALL, i);
+
+ enable(true);
+ List<String> warnings = CLUSTER.get(1).callsOnInstance(() -> {
+ ClientWarn.instance.captureWarnings();
+ try
+ {
+ QueryProcessor.execute(cql, org.apache.cassandra.db.ConsistencyLevel.ALL, QueryState.forInternalCalls());
+ Assert.fail("Expected query failure");
+ }
+ catch (TombstoneAbortException e)
+ {
+ Assert.assertTrue(e.nodes >= 1 && e.nodes <= 3);
+ Assert.assertEquals(TOMBSTONE_FAIL + 1, e.tombstones);
+ // expected, client transport returns an error message and includes client warnings
+ }
+ return ClientWarn.instance.getWarnings();
+ }).call();
+ Assertions.assertThat(Iterables.getOnlyElement(warnings))
+ .contains("nodes scanned over " + (TOMBSTONE_FAIL + 1) + " tombstones and aborted the query " + cql);
+
+ assertWarnAborts(0, 1, 1);
+
+ try
+ {
+ driverQueryAll(cql);
+ Assert.fail("Query should have thrown ReadFailureException");
+ }
+ catch (com.datastax.driver.core.exceptions.ReadFailureException e)
+ {
+ // without changing the client can't produce a better message...
+ // client does NOT include the message sent from the server in the exception; so the message doesn't work
+ // well in this case
+ Assertions.assertThat(e.getMessage()).contains("(3 responses were required but only 0 replica responded"); // can't include ', 3 failed)' as some times its 2
+ Assertions.assertThat(e.getFailuresMap())
+ .isEqualTo(ImmutableMap.of(
+ InetAddress.getByAddress(new byte[] {127, 0, 0, 1}), RequestFailureReason.READ_TOO_MANY_TOMBSTONES.code,
+ InetAddress.getByAddress(new byte[] {127, 0, 0, 2}), RequestFailureReason.READ_TOO_MANY_TOMBSTONES.code,
+ InetAddress.getByAddress(new byte[] {127, 0, 0, 3}), RequestFailureReason.READ_TOO_MANY_TOMBSTONES.code));
+ }
+
+ assertWarnAborts(0, 2, 1);
+
+ enable(false);
+ warnings = CLUSTER.get(1).callsOnInstance(() -> {
+ ClientWarn.instance.captureWarnings();
+ try
+ {
+ QueryProcessor.execute(cql, org.apache.cassandra.db.ConsistencyLevel.ALL, QueryState.forInternalCalls());
+ Assert.fail("Expected query failure");
+ }
+ catch (ReadFailureException e)
+ {
+ Assertions.assertThat(e).isNotInstanceOf(TombstoneAbortException.class);
+ }
+ return ClientWarn.instance.getWarnings();
+ }).call();
+ // client warnings are currently coordinator only, so if present only 1 is expected
+ if (isScan)
+ {
+ // Scans perform multiple ReadCommands, which will not propgate the warnings to the top-level coordinator; so no warnings are expected
+ Assertions.assertThat(warnings).isNull();
+ }
+ else
+ {
+ Assertions.assertThat(Iterables.getOnlyElement(warnings))
+ .startsWith("Read " + TOMBSTONE_FAIL + " live rows and " + (TOMBSTONE_FAIL + 1) + " tombstone cells for query " + cql);
+ }
+
+ assertWarnAborts(0, 2, 0);
+
+ try
+ {
+ driverQueryAll(cql);
+ Assert.fail("Query should have thrown ReadFailureException");
+ }
+ catch (com.datastax.driver.core.exceptions.ReadFailureException e)
+ {
+ // not checking the message as different cases exist for the failure, so the fact that this failed is enough
+
+ Assertions.assertThat(e.getFailuresMap())
+ .isNotEmpty();
+ Assertions.assertThat(e.getFailuresMap().values())
+ .as("Non READ_TOO_MANY_TOMBSTONES exists")
+ .allMatch(i -> i.equals(RequestFailureReason.READ_TOO_MANY_TOMBSTONES.code));
+ }
+
+ assertWarnAborts(0, 2, 0);
+ }
+
+ private static long GLOBAL_READ_ABORTS = 0;
+ private static void assertWarnAborts(int warns, int aborts, int globalAborts)
+ {
+ Assertions.assertThat(totalWarnings()).as("warnings").isEqualTo(warns);
+ Assertions.assertThat(totalAborts()).as("aborts").isEqualTo(aborts);
+ long expectedGlobalAborts = GLOBAL_READ_ABORTS + globalAborts;
+ Assertions.assertThat(totalReadAborts()).as("global aborts").isEqualTo(expectedGlobalAborts);
+ GLOBAL_READ_ABORTS = expectedGlobalAborts;
+ }
+
+ private static long totalWarnings()
+ {
+ return CLUSTER.stream().mapToLong(i -> i.metrics().getCounter("org.apache.cassandra.metrics.keyspace.ClientTombstoneWarnings." + KEYSPACE)).sum();
+ }
+
+ private static long totalAborts()
+ {
+ return CLUSTER.stream().mapToLong(i -> i.metrics().getCounter("org.apache.cassandra.metrics.keyspace.ClientTombstoneAborts." + KEYSPACE)).sum();
+ }
+
+ private static long totalReadAborts()
+ {
+ return CLUSTER.stream().mapToLong(i ->
+ i.metrics().getCounter("org.apache.cassandra.metrics.ClientRequest.Aborts.Read-ALL")
+ + i.metrics().getCounter("org.apache.cassandra.metrics.ClientRequest.Aborts.RangeSlice")
+ ).sum();
+ }
+
+ private static ResultSet driverQueryAll(String cql)
+ {
+ return JAVA_DRIVER_SESSION.execute(new SimpleStatement(cql).setConsistencyLevel(com.datastax.driver.core.ConsistencyLevel.ALL));
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/JavaDriverUtils.java b/test/distributed/org/apache/cassandra/distributed/test/JavaDriverUtils.java
new file mode 100644
index 0000000..bc39ba1
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/JavaDriverUtils.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInstance;
+
+public final class JavaDriverUtils
+{
+ private JavaDriverUtils()
+ {
+ }
+
+ public static com.datastax.driver.core.Cluster create(ICluster<? extends IInstance> dtest)
+ {
+ if (dtest.size() == 0)
+ throw new IllegalArgumentException("Attempted to open java driver for empty cluster");
+
+ // make sure the needed Features are added
+ dtest.stream().forEach(i -> {
+ if (!(i.config().has(Feature.NATIVE_PROTOCOL) && i.config().has(Feature.GOSSIP))) // gossip is needed as currently Host.getHostId is empty without it
+ throw new IllegalStateException("java driver requires Feature.NATIVE_PROTOCOL and Feature.GOSSIP; but one or more is missing");
+ });
+
+ com.datastax.driver.core.Cluster.Builder builder = com.datastax.driver.core.Cluster.builder();
+
+ //TODO support port
+ //TODO support auth
+ dtest.stream().forEach(i -> builder.addContactPoint(i.broadcastAddress().getAddress().getHostAddress()));
+
+ return builder.build();
+ }
+}
diff --git a/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerTest.java b/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerTest.java
index 8682273..47b2417 100644
--- a/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerTest.java
@@ -166,7 +166,8 @@ public class ReadCommandVerbHandlerTest
DataLimits.NONE,
KEY,
new ClusteringIndexSliceFilter(Slices.ALL, false),
- null);
+ null,
+ false);
}
private static DecoratedKey key(TableMetadata metadata, int key)
diff --git a/test/unit/org/apache/cassandra/db/ReadResponseTest.java b/test/unit/org/apache/cassandra/db/ReadResponseTest.java
index 6e1a804..f625d41 100644
--- a/test/unit/org/apache/cassandra/db/ReadResponseTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadResponseTest.java
@@ -236,7 +236,8 @@ public class ReadResponseTest
DataLimits.NONE,
metadata.partitioner.decorateKey(ByteBufferUtil.bytes(key)),
null,
- null);
+ null,
+ false);
this.repairedDigest = repairedDigest;
this.conclusive = conclusive;
}
diff --git a/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java b/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java
index 6fc8fbf..ac68205 100644
--- a/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java
@@ -246,7 +246,7 @@ public class ReadExecutorTest
MockSinglePartitionReadCommand(long timeout)
{
- super(false, 0, false, cfs.metadata(), 0, null, null, null, Util.dk("ry@n_luvs_teh_y@nk33z"), null, null);
+ super(false, 0, false, cfs.metadata(), 0, null, null, null, Util.dk("ry@n_luvs_teh_y@nk33z"), null, null, false);
this.timeout = timeout;
}
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/RepairedDataVerifierTest.java b/test/unit/org/apache/cassandra/service/reads/repair/RepairedDataVerifierTest.java
index 169e09d..592bff8 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/RepairedDataVerifierTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/RepairedDataVerifierTest.java
@@ -287,7 +287,8 @@ public class RepairedDataVerifierTest
DataLimits.NONE,
metadata.partitioner.decorateKey(ByteBufferUtil.bytes(key)),
new ClusteringIndexSliceFilter(Slices.ALL, false),
- null);
+ null,
+ false);
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org