You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2021/09/28 17:52:25 UTC
[cassandra] branch trunk updated: Add soft/hard limits to local
reads to protect against reading too much data in a single query
This is an automated email from the ASF dual-hosted git repository.
maedhroz pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new c7526f9 Add soft/hard limits to local reads to protect against reading too much data in a single query
c7526f9 is described below
commit c7526f943f50e994e94c8287c772c856961833f2
Author: David Capwell <dc...@apache.org>
AuthorDate: Mon Aug 30 10:18:03 2021 -0700
Add soft/hard limits to local reads to protect against reading too much data in a single query
patch by David Capwell; reviewed by Caleb Rackliffe and Marcus Eriksson for CASSANDRA-16896
---
CHANGES.txt | 1 +
NEWS.txt | 11 +-
build.xml | 2 +
conf/cassandra.yaml | 32 +-
ide/idea/workspace.xml | 6 +-
src/java/org/apache/cassandra/config/Config.java | 7 +-
.../cassandra/config/DatabaseDescriptor.java | 74 ++++-
.../org/apache/cassandra/config/TrackWarnings.java | 108 ++++++
.../org/apache/cassandra/cql3/QueryOptions.java | 44 +--
.../cassandra/cql3/selection/ResultSetBuilder.java | 5 +
.../cassandra/cql3/statements/SelectStatement.java | 30 +-
.../org/apache/cassandra/db/ArrayClustering.java | 2 +-
src/java/org/apache/cassandra/db/Clustering.java | 3 +-
src/java/org/apache/cassandra/db/DeletionTime.java | 2 +-
src/java/org/apache/cassandra/db/ReadCommand.java | 198 ++++++++---
.../org/apache/cassandra/db/RowIndexEntry.java | 51 +++
.../filter/LocalReadSizeTooLargeException.java} | 20 +-
.../filter/RowIndexEntryTooLargeException.java} | 20 +-
.../db/rows/AbstractRangeTombstoneMarker.java | 4 +-
.../org/apache/cassandra/db/rows/ArrayCell.java | 7 +
.../org/apache/cassandra/db/rows/BTreeRow.java | 13 +
.../org/apache/cassandra/db/rows/BufferCell.java | 7 +
.../org/apache/cassandra/db/rows/CellPath.java | 18 +-
.../org/apache/cassandra/db/rows/ColumnData.java | 3 +-
.../cassandra/db/rows/ComplexColumnData.java | 10 +
.../org/apache/cassandra/db/rows/NativeCell.java | 7 +
.../db/rows/RangeTombstoneBoundMarker.java | 9 +
.../db/rows/RangeTombstoneBoundaryMarker.java | 9 +
.../cassandra/db/rows/RangeTombstoneMarker.java | 3 +-
src/java/org/apache/cassandra/db/rows/Row.java | 3 +-
.../cassandra/exceptions/RequestFailureReason.java | 2 +-
.../exceptions/TombstoneAbortException.java | 6 +-
.../org/apache/cassandra/io/sstable/IndexInfo.java | 3 +-
.../apache/cassandra/metrics/KeyspaceMetrics.java | 26 +-
.../org/apache/cassandra/metrics/TableMetrics.java | 27 +-
src/java/org/apache/cassandra/net/ParamType.java | 7 +-
.../apache/cassandra/service/StorageService.java | 94 +++++-
.../cassandra/service/StorageServiceMBean.java | 23 +-
.../cassandra/service/reads/ReadCallback.java | 130 ++------
.../reads/trackwarnings/CoordinatorWarnings.java | 198 +++++++++++
.../reads/trackwarnings/WarnAbortCounter.java | 57 ++++
.../reads/trackwarnings/WarningContext.java | 83 +++++
.../reads/trackwarnings/WarningsSnapshot.java | 355 ++++++++++++++++++++
.../org/apache/cassandra/transport/Dispatcher.java | 16 +
.../org/apache/cassandra/transport/Message.java | 11 +
.../cassandra/transport/messages/BatchMessage.java | 6 +
.../transport/messages/ExecuteMessage.java | 6 +
.../cassandra/transport/messages/QueryMessage.java | 6 +
.../Int64Serializer.java} | 36 +-
.../org/apache/cassandra/utils/ObjectSizes.java | 3 +-
test/conf/cassandra.yaml | 14 +-
.../cassandra/distributed/impl/Coordinator.java | 51 +--
.../cassandra/distributed/impl/Instance.java | 43 ++-
.../test/ClientReadSizeWarningTest.java | 266 ---------------
.../trackwarnings/AbstractClientSizeWarning.java | 367 +++++++++++++++++++++
.../CoordinatorReadSizeWarningTest.java | 86 +++++
.../trackwarnings/LocalReadSizeWarningTest.java | 80 +++++
.../trackwarnings/RowIndexSizeWarningTest.java | 123 +++++++
.../TombstoneWarningTest.java} | 12 +-
.../config/DatabaseDescriptorRefTest.java | 3 +
.../cassandra/config/DatabaseDescriptorTest.java | 151 +++++++++
.../config/YamlConfigurationLoaderTest.java | 66 +++-
.../unit/org/apache/cassandra/db/CellSpecTest.java | 43 ++-
.../cassandra/db/ClusteringHeapSizeTest.java | 6 -
.../reads/trackwarnings/WarningsSnapshotTest.java | 187 +++++++++++
.../apache/cassandra/utils/ObjectSizesTest.java | 50 +++
66 files changed, 2739 insertions(+), 613 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 2501253..388fad1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.1
+ * Add soft/hard limits to local reads to protect against reading too much data in a single query (CASSANDRA-16896)
* Avoid token cache invalidation for removing a non-member node (CASSANDRA-15290)
* Allow configuration of consistency levels on auth operations (CASSANDRA-12988)
* Add number of sstables in a compaction to compactionstats output (CASSANDRA-16844)
diff --git a/NEWS.txt b/NEWS.txt
index 77a3e1b..bdd3dd6 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -40,11 +40,14 @@ New features
------------
- Warn/abort thresholds added to read queries notifying clients when these thresholds trigger (by
emitting a client warning or aborting the query). This feature is disabled by default, scheduled
- to be enabled in 4.2; it is controlled with the configuration client_track_warnings_enabled,
+ to be enabled in 4.2; it is controlled with the configuration track_warnings.enabled,
setting to true will enable this feature. Each check has its own warn/abort thresholds, currently
- tombstones (tombstone_warn_threshold, and tombstone_failure_threshold) and coordinator result set
- materialized size (client_large_read_warn_threshold_kb, and client_large_read_abort_threshold_kb)
- are supported; more checks will be added over time.
+ tombstones (tombstone_warn_threshold, and tombstone_failure_threshold), coordinator result set
+ materialized size (track_warnings.coordinator_large_read.warn_threshold_kb, and
+ track_warnings.coordinator_large_read.abort_threshold_kb), local read materialized heap size
+ (track_warnings.local_read_size.warn_threshold_kb and track_warnings.local_read_size.abort_threshold_kb),
+ and RowIndexEntry estimated memory size (track_warnings.row_index_size.warn_threshold_kb and
+ track_warnings.row_index_size.abort_threshold_kb) are supported; more checks will be added over time.
Upgrading
---------
diff --git a/build.xml b/build.xml
index 32eba7c..1dc1c03 100644
--- a/build.xml
+++ b/build.xml
@@ -859,6 +859,7 @@
<pathelement location="${test.conf}"/>
</classpath>
<jvmarg value="-Dstorage-config=${test.conf}"/>
+ <jvmarg value="-Dcassandra.track_warnings.coordinator.defensive_checks_enabled=true" /> <!-- enable defensive checks -->
<jvmarg value="-javaagent:${build.lib}/jamm-${jamm.version}.jar" />
<jvmarg value="-ea"/>
<jvmarg line="${java11-jvmargs}"/>
@@ -1370,6 +1371,7 @@
<jvmarg value="-Dcassandra.testtag=@{testtag}"/>
<jvmarg value="-Dcassandra.keepBriefBrief=${cassandra.keepBriefBrief}" />
<jvmarg value="-Dcassandra.strict.runtime.checks=true" />
+ <jvmarg value="-Dcassandra.track_warnings.coordinator.defensive_checks_enabled=true" /> <!-- enable defensive checks -->
<jvmarg line="${java11-jvmargs}"/>
<!-- disable shrinks in quicktheories CASSANDRA-15554 -->
<jvmarg value="-DQT_SHRINKS=0"/>
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index a868a4a..4e6db63 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1459,15 +1459,25 @@ enable_drop_compact_storage: false
# - 127.0.0.0/31
# Enables tracking warnings/aborts across all replicas for reporting back to client.
-# Scheduled to enable in 4.2
# See: CASSANDRA-16850
-# See: tombstone_warn_threshold, tombstone_failure_threshold, client_large_read_warn_threshold_kb, and client_large_read_abort_threshold_kb
-#client_track_warnings_enabled: false
-
-# When client_track_warnings_enabled: true, this tracks the materialized size of a query on the
-# coordinator. If client_large_read_warn_threshold_kb is greater than 0, this will emit a warning
-# to clients with details on what query triggered this as well as the size of the result set; if
-# client_large_read_abort_threshold_kb is greater than 0, this will abort the query after it
-# has exceeded this threshold, returning a read error to the user.
-#client_large_read_warn_threshold_kb: 0
-#client_large_read_abort_threshold_kb: 0
+#track_warnings:
+# # Scheduled to enable in 4.2
+# enabled: false
+# # When track_warnings.enabled: true, this tracks the materialized size of a query on the
+# # coordinator. If coordinator_large_read.warn_threshold_kb is greater than 0, this will emit a warning
+# # to clients with details on what query triggered this as well as the size of the result set; if
+# # coordinator_large_read.abort_threshold_kb is greater than 0, this will abort the query after it
+# # has exceeded this threshold, returning a read error to the user.
+# coordinator_large_read:
+# warn_threshold_kb: 0
+# abort_threshold_kb: 0
+# # When track_warnings.enabled: true, this tracks the size of the local read (as defined by
+# # heap size), and will warn/abort based off these thresholds; 0 disables these checks.
+# local_read_size:
+# warn_threshold_kb: 0
+# abort_threshold_kb: 0
+# # When track_warnings.enabled: true, this tracks the expected memory size of the RowIndexEntry
+# # and will warn/abort based off these thresholds; 0 disables these checks.
+# row_index_size:
+# warn_threshold_kb: 0
+# abort_threshold_kb: 0
diff --git a/ide/idea/workspace.xml b/ide/idea/workspace.xml
index 41645f5..73af47f 100644
--- a/ide/idea/workspace.xml
+++ b/ide/idea/workspace.xml
@@ -143,7 +143,7 @@
<configuration default="true" type="Application" factoryName="Application">
<extension name="coverage" enabled="false" merge="false" sample_coverage="true" runner="idea" />
<option name="MAIN_CLASS_NAME" value="" />
- <option name="VM_PARAMETERS" value="-Dcassandra.config=file://$PROJECT_DIR$/conf/cassandra.yaml -Dcassandra.storagedir=$PROJECT_DIR$/data -Dlogback.configurationFile=file://$PROJECT_DIR$/conf/logback.xml -Dcassandra.logdir=$PROJECT_DIR$/data/logs -Djava.library.path=$PROJECT_DIR$/lib/sigar-bin -DQT_SHRINKS=0 -ea" />
+ <option name="VM_PARAMETERS" value="-Dcassandra.config=file://$PROJECT_DIR$/conf/cassandra.yaml -Dcassandra.storagedir=$PROJECT_DIR$/data -Dlogback.configurationFile=file://$PROJECT_DIR$/conf/logback.xml -Dcassandra.logdir=$PROJECT_DIR$/data/logs -Djava.library.path=$PROJECT_DIR$/lib/sigar-bin -DQT_SHRINKS=0 -ea -Dcassandra.track_warnings.coordinator.defensive_checks_enabled=true" />
<option name="PROGRAM_PARAMETERS" value="" />
<option name="WORKING_DIRECTORY" value="" />
<option name="ALTERNATIVE_JRE_PATH_ENABLED" value="false" />
@@ -167,7 +167,7 @@
<option name="MAIN_CLASS_NAME" value="" />
<option name="METHOD_NAME" value="" />
<option name="TEST_OBJECT" value="class" />
- <option name="VM_PARAMETERS" value="-Dcassandra.config=file://$PROJECT_DIR$/test/conf/cassandra.yaml -Dlogback.configurationFile=file://$PROJECT_DIR$/test/conf/logback-test.xml -Dcassandra.logdir=$PROJECT_DIR$/build/test/logs -Djava.library.path=$PROJECT_DIR$/lib/sigar-bin -Dlegacy-sstable-root=$PROJECT_DIR$/test/data/legacy-sstables -Dinvalid-legacy-sstable-root=$PROJECT_DIR$/test/data/invalid-legacy-sstables -Dcassandra.ring_delay_ms=1000 -Dcassandra.skip_sync=true -ea -XX:MaxMet [...]
+ <option name="VM_PARAMETERS" value="-Dcassandra.config=file://$PROJECT_DIR$/test/conf/cassandra.yaml -Dlogback.configurationFile=file://$PROJECT_DIR$/test/conf/logback-test.xml -Dcassandra.logdir=$PROJECT_DIR$/build/test/logs -Djava.library.path=$PROJECT_DIR$/lib/sigar-bin -Dlegacy-sstable-root=$PROJECT_DIR$/test/data/legacy-sstables -Dinvalid-legacy-sstable-root=$PROJECT_DIR$/test/data/invalid-legacy-sstables -Dcassandra.ring_delay_ms=1000 -Dcassandra.skip_sync=true -ea -XX:MaxMet [...]
<option name="PARAMETERS" value="" />
<fork_mode value="class" />
<option name="WORKING_DIRECTORY" value="" />
@@ -186,7 +186,7 @@
<configuration default="false" name="Cassandra" type="Application" factoryName="Application">
<extension name="coverage" enabled="false" merge="false" sample_coverage="true" runner="idea" />
<option name="MAIN_CLASS_NAME" value="org.apache.cassandra.service.CassandraDaemon" />
- <option name="VM_PARAMETERS" value="-Dcassandra-foreground=yes -Dcassandra.config=file://$PROJECT_DIR$/conf/cassandra.yaml -Dcassandra.storagedir=$PROJECT_DIR$/data -Dlogback.configurationFile=file://$PROJECT_DIR$/conf/logback.xml -Dcassandra.logdir=$PROJECT_DIR$/data/logs -Djava.library.path=$PROJECT_DIR$/lib/sigar-bin -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=7199 -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.authenticate= [...]
+ <option name="VM_PARAMETERS" value="-Dcassandra-foreground=yes -Dcassandra.config=file://$PROJECT_DIR$/conf/cassandra.yaml -Dcassandra.storagedir=$PROJECT_DIR$/data -Dlogback.configurationFile=file://$PROJECT_DIR$/conf/logback.xml -Dcassandra.logdir=$PROJECT_DIR$/data/logs -Djava.library.path=$PROJECT_DIR$/lib/sigar-bin -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=7199 -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.authenticate= [...]
<option name="PROGRAM_PARAMETERS" value="" />
<option name="WORKING_DIRECTORY" value="file://$PROJECT_DIR$" />
<option name="ALTERNATIVE_JRE_PATH_ENABLED" value="false" />
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 4d4cc11..caa4965 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -346,19 +346,16 @@ public class Config
public MemtableAllocationType memtable_allocation_type = MemtableAllocationType.heap_buffers;
+ public final TrackWarnings track_warnings = new TrackWarnings();
+
public volatile int tombstone_warn_threshold = 1000;
public volatile int tombstone_failure_threshold = 100000;
- public volatile long client_large_read_warn_threshold_kb = 0;
- public volatile long client_large_read_abort_threshold_kb = 0;
-
public final ReplicaFilteringProtectionOptions replica_filtering_protection = new ReplicaFilteringProtectionOptions();
public volatile Long index_summary_capacity_in_mb;
public volatile int index_summary_resize_interval_in_minutes = 60;
- public volatile boolean client_track_warnings_enabled = false; // should set to true in 4.2
-
public int gc_log_threshold_in_ms = 200;
public int gc_warn_threshold_in_ms = 1000;
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 511ef3f..32a333b 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -688,6 +688,7 @@ public class DatabaseDescriptor
applyConcurrentValidations(conf);
applyRepairCommandPoolSize(conf);
+ applyTrackWarningsValidations(conf);
if (conf.concurrent_materialized_view_builders <= 0)
throw new ConfigurationException("concurrent_materialized_view_builders should be strictly greater than 0, but was " + conf.concurrent_materialized_view_builders, false);
@@ -857,9 +858,6 @@ public class DatabaseDescriptor
throw new ConfigurationException("To set concurrent_validations > concurrent_compactors, " +
"set the system property cassandra.allow_unlimited_concurrent_validations=true");
}
-
- conf.client_large_read_warn_threshold_kb = Math.max(conf.client_large_read_warn_threshold_kb, 0);
- conf.client_large_read_abort_threshold_kb = Math.max(conf.client_large_read_abort_threshold_kb, 0);
}
@VisibleForTesting
@@ -869,6 +867,12 @@ public class DatabaseDescriptor
config.repair_command_pool_size = config.concurrent_validations;
}
+ @VisibleForTesting
+ static void applyTrackWarningsValidations(Config config)
+ {
+ config.track_warnings.validate("track_warnings");
+ }
+
private static String storagedirFor(String type)
{
return storagedir(type + "_directory") + File.separator + type;
@@ -3477,33 +3481,73 @@ public class DatabaseDescriptor
return conf.internode_error_reporting_exclusions;
}
- public static long getClientLargeReadWarnThresholdKB()
+ public static boolean getTrackWarningsEnabled()
+ {
+ return conf.track_warnings.enabled;
+ }
+
+ public static void setTrackWarningsEnabled(boolean value)
+ {
+ conf.track_warnings.enabled = value;
+ }
+
+ public static long getCoordinatorReadSizeWarnThresholdKB()
+ {
+ return conf.track_warnings.coordinator_read_size.getWarnThresholdKb();
+ }
+
+ public static void setCoordinatorReadSizeWarnThresholdKB(long threshold)
+ {
+ conf.track_warnings.coordinator_read_size.setWarnThresholdKb(threshold);
+ }
+
+ public static long getCoordinatorReadSizeAbortThresholdKB()
+ {
+ return conf.track_warnings.coordinator_read_size.getAbortThresholdKb();
+ }
+
+ public static void setCoordinatorReadSizeAbortThresholdKB(long threshold)
+ {
+ conf.track_warnings.coordinator_read_size.setAbortThresholdKb(threshold);
+ }
+
+ public static long getLocalReadSizeWarnThresholdKb()
+ {
+ return conf.track_warnings.local_read_size.getWarnThresholdKb();
+ }
+
+ public static void setLocalReadSizeWarnThresholdKb(long value)
+ {
+ conf.track_warnings.local_read_size.setWarnThresholdKb(value);
+ }
+
+ public static long getLocalReadSizeAbortThresholdKb()
{
- return conf.client_large_read_warn_threshold_kb;
+ return conf.track_warnings.local_read_size.getAbortThresholdKb();
}
- public static void setClientLargeReadWarnThresholdKB(long threshold)
+ public static void setLocalReadSizeAbortThresholdKb(long value)
{
- conf.client_large_read_warn_threshold_kb = Math.max(threshold, 0);
+ conf.track_warnings.local_read_size.setAbortThresholdKb(value);
}
- public static long getClientLargeReadAbortThresholdKB()
+ public static int getRowIndexSizeWarnThresholdKb()
{
- return conf.client_large_read_abort_threshold_kb;
+ return conf.track_warnings.row_index_size.getWarnThresholdKb();
}
- public static void setClientLargeReadAbortThresholdKB(long threshold)
+ public static void setRowIndexSizeWarnThresholdKb(int value)
{
- conf.client_large_read_abort_threshold_kb = Math.max(threshold, 0);
+ conf.track_warnings.row_index_size.setWarnThresholdKb(value);
}
- public static boolean getClientTrackWarningsEnabled()
+ public static int getRowIndexSizeAbortThresholdKb()
{
- return conf.client_track_warnings_enabled;
+ return conf.track_warnings.row_index_size.getAbortThresholdKb();
}
- public static void setClientTrackWarningsEnabled(boolean value)
+ public static void setRowIndexSizeAbortThresholdKb(int value)
{
- conf.client_track_warnings_enabled = value;
+ conf.track_warnings.row_index_size.setAbortThresholdKb(value);
}
}
diff --git a/src/java/org/apache/cassandra/config/TrackWarnings.java b/src/java/org/apache/cassandra/config/TrackWarnings.java
new file mode 100644
index 0000000..77530a8
--- /dev/null
+++ b/src/java/org/apache/cassandra/config/TrackWarnings.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.config;
+
+import org.apache.cassandra.exceptions.ConfigurationException;
+
+public class TrackWarnings
+{
+ public volatile boolean enabled = false; // should set to true in 4.2
+ public final LongByteThreshold coordinator_read_size = new LongByteThreshold();
+ public final LongByteThreshold local_read_size = new LongByteThreshold();
+ public final IntByteThreshold row_index_size = new IntByteThreshold();
+
+ public void validate(String prefix)
+ {
+ prefix += ".";
+ coordinator_read_size.validate(prefix + "coordinator_read_size");
+ local_read_size.validate(prefix + "local_read_size");
+ row_index_size.validate(prefix + "row_index_size");
+ }
+
+ public static class LongByteThreshold
+ {
+ public volatile long warn_threshold_kb = 0;
+ public volatile long abort_threshold_kb = 0;
+
+ public long getWarnThresholdKb()
+ {
+ return warn_threshold_kb;
+ }
+
+ public void setWarnThresholdKb(long value)
+ {
+ warn_threshold_kb = Math.max(value, 0);
+ }
+
+ public long getAbortThresholdKb()
+ {
+ return abort_threshold_kb;
+ }
+
+ public void setAbortThresholdKb(long value)
+ {
+ abort_threshold_kb = Math.max(value, 0);
+ }
+
+ public void validate(String prefix)
+ {
+ warn_threshold_kb = Math.max(warn_threshold_kb, 0);
+ abort_threshold_kb = Math.max(abort_threshold_kb, 0);
+
+ if (abort_threshold_kb != 0 && abort_threshold_kb < warn_threshold_kb)
+ throw new ConfigurationException(String.format("abort_threshold_kb (%d) must be greater than or equal to warn_threshold_kb (%d); see %s",
+ abort_threshold_kb, warn_threshold_kb, prefix));
+ }
+ }
+
+ public static class IntByteThreshold
+ {
+ public volatile int warn_threshold_kb = 0;
+ public volatile int abort_threshold_kb = 0;
+
+ public int getWarnThresholdKb()
+ {
+ return warn_threshold_kb;
+ }
+
+ public void setWarnThresholdKb(int value)
+ {
+ warn_threshold_kb = Math.max(value, 0);
+ }
+
+ public int getAbortThresholdKb()
+ {
+ return abort_threshold_kb;
+ }
+
+ public void setAbortThresholdKb(int value)
+ {
+ abort_threshold_kb = Math.max(value, 0);
+ }
+
+ public void validate(String prefix)
+ {
+ warn_threshold_kb = Math.max(warn_threshold_kb, 0);
+ abort_threshold_kb = Math.max(abort_threshold_kb, 0);
+
+ if (abort_threshold_kb != 0 && abort_threshold_kb < warn_threshold_kb)
+ throw new ConfigurationException(String.format("abort_threshold_kb (%d) must be greater than or equal to warn_threshold_kb (%d); see %s",
+ abort_threshold_kb, warn_threshold_kb, prefix));
+ }
+ }
+}
diff --git a/src/java/org/apache/cassandra/cql3/QueryOptions.java b/src/java/org/apache/cassandra/cql3/QueryOptions.java
index e46c458..7e3d267 100644
--- a/src/java/org/apache/cassandra/cql3/QueryOptions.java
+++ b/src/java/org/apache/cassandra/cql3/QueryOptions.java
@@ -219,19 +219,19 @@ public abstract class QueryOptions
abstract TrackWarnings getTrackWarnings();
- public boolean isClientTrackWarningsEnabled()
+ public boolean isTrackWarningsEnabled()
{
return getTrackWarnings().isEnabled();
}
- public long getClientLargeReadWarnThresholdKb()
+ public long getCoordinatorReadSizeWarnThresholdKB()
{
- return getTrackWarnings().getClientLargeReadWarnThresholdKb();
+ return getTrackWarnings().getCoordinatorReadSizeWarnThresholdKB();
}
- public long getClientLargeReadAbortThresholdKB()
+ public long getCoordinatorReadSizeAbortThresholdKB()
{
- return getTrackWarnings().getClientLargeReadAbortThresholdKB();
+ return getTrackWarnings().getCoordinatorReadSizeAbortThresholdKB();
}
public QueryOptions prepare(List<ColumnSpecification> specs)
@@ -243,21 +243,21 @@ public abstract class QueryOptions
{
boolean isEnabled();
- long getClientLargeReadWarnThresholdKb();
+ long getCoordinatorReadSizeWarnThresholdKB();
- long getClientLargeReadAbortThresholdKB();
+ long getCoordinatorReadSizeAbortThresholdKB();
static TrackWarnings create()
{
// if daemon initialization hasn't happened yet (very common in tests) then ignore
if (!DatabaseDescriptor.isDaemonInitialized())
return DisabledTrackWarnings.INSTANCE;
- boolean enabled = DatabaseDescriptor.getClientTrackWarningsEnabled();
+ boolean enabled = DatabaseDescriptor.getTrackWarningsEnabled();
if (!enabled)
return DisabledTrackWarnings.INSTANCE;
- long clientLargeReadWarnThresholdKb = DatabaseDescriptor.getClientLargeReadWarnThresholdKB();
- long clientLargeReadAbortThresholdKB = DatabaseDescriptor.getClientLargeReadAbortThresholdKB();
- return new DefaultTrackWarnings(clientLargeReadWarnThresholdKb, clientLargeReadAbortThresholdKB);
+ long warnThresholdKB = DatabaseDescriptor.getCoordinatorReadSizeWarnThresholdKB();
+ long abortThresholdKB = DatabaseDescriptor.getCoordinatorReadSizeAbortThresholdKB();
+ return new DefaultTrackWarnings(warnThresholdKB, abortThresholdKB);
}
}
@@ -272,13 +272,13 @@ public abstract class QueryOptions
}
@Override
- public long getClientLargeReadWarnThresholdKb()
+ public long getCoordinatorReadSizeWarnThresholdKB()
{
return 0;
}
@Override
- public long getClientLargeReadAbortThresholdKB()
+ public long getCoordinatorReadSizeAbortThresholdKB()
{
return 0;
}
@@ -286,13 +286,13 @@ public abstract class QueryOptions
private static class DefaultTrackWarnings implements TrackWarnings
{
- private final long clientLargeReadWarnThresholdKb;
- private final long clientLargeReadAbortThresholdKB;
+ private final long warnThresholdKB;
+ private final long abortThresholdKB;
- public DefaultTrackWarnings(long clientLargeReadWarnThresholdKb, long clientLargeReadAbortThresholdKB)
+ public DefaultTrackWarnings(long warnThresholdKB, long abortThresholdKB)
{
- this.clientLargeReadWarnThresholdKb = clientLargeReadWarnThresholdKb;
- this.clientLargeReadAbortThresholdKB = clientLargeReadAbortThresholdKB;
+ this.warnThresholdKB = warnThresholdKB;
+ this.abortThresholdKB = abortThresholdKB;
}
@Override
@@ -302,15 +302,15 @@ public abstract class QueryOptions
}
@Override
- public long getClientLargeReadWarnThresholdKb()
+ public long getCoordinatorReadSizeWarnThresholdKB()
{
- return clientLargeReadWarnThresholdKb;
+ return warnThresholdKB;
}
@Override
- public long getClientLargeReadAbortThresholdKB()
+ public long getCoordinatorReadSizeAbortThresholdKB()
{
- return clientLargeReadAbortThresholdKB;
+ return abortThresholdKB;
}
}
diff --git a/src/java/org/apache/cassandra/cql3/selection/ResultSetBuilder.java b/src/java/org/apache/cassandra/cql3/selection/ResultSetBuilder.java
index 852872a..df82d52 100644
--- a/src/java/org/apache/cassandra/cql3/selection/ResultSetBuilder.java
+++ b/src/java/org/apache/cassandra/cql3/selection/ResultSetBuilder.java
@@ -106,6 +106,11 @@ public final class ResultSetBuilder
return thresholdKB > 0 && size > thresholdKB << 10;
}
+ public long getSize()
+ {
+ return size;
+ }
+
public void add(ByteBuffer v)
{
current.add(v);
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 5c7ac29..25499b2 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -31,7 +31,6 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.audit.AuditLogContext;
import org.apache.cassandra.audit.AuditLogEntryType;
import org.apache.cassandra.auth.Permission;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableMetadata;
@@ -248,7 +247,7 @@ public class SelectStatement implements CQLStatement
Selectors selectors = selection.newSelectors(options);
ReadQuery query = getQuery(options, selectors.getColumnFilter(), nowInSec, userLimit, userPerPartitionLimit, pageSize);
- if (options.isClientTrackWarningsEnabled())
+ if (options.isTrackWarningsEnabled())
query.trackWarnings();
if (aggregationSpec == null && (pageSize <= 0 || (query.limits().count() <= pageSize)))
@@ -816,32 +815,41 @@ public class SelectStatement implements CQLStatement
private void maybeWarn(ResultSetBuilder result, QueryOptions options)
{
- if (!options.isClientTrackWarningsEnabled())
+ if (!options.isTrackWarningsEnabled())
return;
- if (result.shouldWarn(options.getClientLargeReadWarnThresholdKb()))
+ ColumnFamilyStore store = cfs();
+ if (store != null)
+ store.metric.coordinatorReadSize.update(result.getSize());
+ if (result.shouldWarn(options.getCoordinatorReadSizeWarnThresholdKB()))
{
- String msg = String.format("Read on table %s has exceeded the size warning threshold of %,d kb", table, options.getClientLargeReadWarnThresholdKb());
+ String msg = String.format("Read on table %s has exceeded the size warning threshold of %,d kb", table, options.getCoordinatorReadSizeWarnThresholdKB());
ClientWarn.instance.warn(msg + " with " + loggableTokens(options));
logger.warn("{} with query {}", msg, asCQL(options));
- cfs().metric.clientReadSizeWarnings.mark();
+ if (store != null)
+ store.metric.coordinatorReadSizeWarnings.mark();
}
}
private void maybeFail(ResultSetBuilder result, QueryOptions options)
{
- if (!options.isClientTrackWarningsEnabled())
+ if (!options.isTrackWarningsEnabled())
return;
- if (result.shouldReject(options.getClientLargeReadAbortThresholdKB()))
+ if (result.shouldReject(options.getCoordinatorReadSizeAbortThresholdKB()))
{
- String msg = String.format("Read on table %s has exceeded the size failure threshold of %,d kb", table, options.getClientLargeReadAbortThresholdKB());
+ String msg = String.format("Read on table %s has exceeded the size failure threshold of %,d kb", table, options.getCoordinatorReadSizeAbortThresholdKB());
String clientMsg = msg + " with " + loggableTokens(options);
ClientWarn.instance.warn(clientMsg);
logger.warn("{} with query {}", msg, asCQL(options));
- cfs().metric.clientReadSizeAborts.mark();
+ ColumnFamilyStore store = cfs();
+ if (store != null)
+ {
+ store.metric.coordinatorReadSizeAborts.mark();
+ store.metric.coordinatorReadSize.update(result.getSize());
+ }
// read errors require blockFor and recieved (its in the protocol message), but this isn't known;
// to work around this, treat the coordinator as the only response we care about and mark it failed
ReadSizeAbortException exception = new ReadSizeAbortException(clientMsg, options.getConsistency(), 0, 1, true,
- ImmutableMap.of(FBUtilities.getBroadcastAddressAndPort(), RequestFailureReason.READ_TOO_LARGE));
+ ImmutableMap.of(FBUtilities.getBroadcastAddressAndPort(), RequestFailureReason.READ_SIZE));
StorageProxy.recordReadRegularAbort(options.getConsistency(), exception);
throw exception;
}
diff --git a/src/java/org/apache/cassandra/db/ArrayClustering.java b/src/java/org/apache/cassandra/db/ArrayClustering.java
index a6ee991..53d45e7 100644
--- a/src/java/org/apache/cassandra/db/ArrayClustering.java
+++ b/src/java/org/apache/cassandra/db/ArrayClustering.java
@@ -22,7 +22,7 @@ import org.apache.cassandra.utils.ObjectSizes;
public class ArrayClustering extends AbstractArrayClusteringPrefix implements Clustering<byte[]>
{
- private static final long EMPTY_SIZE = ObjectSizes.measure(new ArrayClustering(EMPTY_VALUES_ARRAY));
+ public static final long EMPTY_SIZE = ObjectSizes.measure(new ArrayClustering(EMPTY_VALUES_ARRAY));
public ArrayClustering(byte[]... values)
{
diff --git a/src/java/org/apache/cassandra/db/Clustering.java b/src/java/org/apache/cassandra/db/Clustering.java
index f5184e9..7575c14 100644
--- a/src/java/org/apache/cassandra/db/Clustering.java
+++ b/src/java/org/apache/cassandra/db/Clustering.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
+import org.apache.cassandra.cache.IMeasurableMemory;
import org.apache.cassandra.db.marshal.ByteArrayAccessor;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.TableMetadata;
@@ -34,7 +35,7 @@ import org.apache.cassandra.utils.memory.AbstractAllocator;
import static org.apache.cassandra.db.AbstractBufferClusteringPrefix.EMPTY_VALUES_ARRAY;
-public interface Clustering<V> extends ClusteringPrefix<V>
+public interface Clustering<V> extends ClusteringPrefix<V>, IMeasurableMemory
{
public static final Serializer serializer = new Serializer();
diff --git a/src/java/org/apache/cassandra/db/DeletionTime.java b/src/java/org/apache/cassandra/db/DeletionTime.java
index d8ac91d..105e10d 100644
--- a/src/java/org/apache/cassandra/db/DeletionTime.java
+++ b/src/java/org/apache/cassandra/db/DeletionTime.java
@@ -33,7 +33,7 @@ import org.apache.cassandra.utils.ObjectSizes;
*/
public class DeletionTime implements Comparable<DeletionTime>, IMeasurableMemory
{
- private static final long EMPTY_SIZE = ObjectSizes.measure(new DeletionTime(0, 0));
+ public static final long EMPTY_SIZE = ObjectSizes.measure(new DeletionTime(0, 0));
/**
* A special DeletionTime that signifies that there is no top-level (row) tombstone.
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index f029bac..4ea589a 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -33,6 +33,7 @@ import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import io.netty.util.concurrent.FastThreadLocal;
import org.apache.cassandra.config.*;
import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.db.filter.*;
@@ -67,6 +68,7 @@ import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.ClientWarn;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.ObjectSizes;
import static com.google.common.collect.Iterables.any;
import static com.google.common.collect.Iterables.filter;
@@ -86,6 +88,10 @@ public abstract class ReadCommand extends AbstractReadQuery
protected static final Logger logger = LoggerFactory.getLogger(ReadCommand.class);
public static final IVersionedSerializer<ReadCommand> serializer = new Serializer();
+ // Expose the active command running so transitive calls can lookup this command.
+ // This is useful for a few reasons, but mainly because the CQL query is here.
+ private static final FastThreadLocal<ReadCommand> COMMAND = new FastThreadLocal<>();
+
private final Kind kind;
private final boolean isDigestQuery;
@@ -150,6 +156,11 @@ public abstract class ReadCommand extends AbstractReadQuery
this.trackWarnings = trackWarnings;
}
+ public static ReadCommand getCommand()
+ {
+ return COMMAND.get();
+ }
+
protected abstract void serializeSelection(DataOutputPlus out, int version) throws IOException;
protected abstract long selectionSerializedSize(int version);
@@ -388,66 +399,75 @@ public abstract class ReadCommand extends AbstractReadQuery
{
long startTimeNanos = System.nanoTime();
- ColumnFamilyStore cfs = Keyspace.openAndGetStore(metadata());
- Index index = getIndex(cfs);
-
- Index.Searcher searcher = null;
- if (index != null)
+ COMMAND.set(this);
+ try
{
- if (!cfs.indexManager.isIndexQueryable(index))
- throw new IndexNotAvailableException(index);
+ ColumnFamilyStore cfs = Keyspace.openAndGetStore(metadata());
+ Index index = getIndex(cfs);
- searcher = index.searcherFor(this);
- Tracing.trace("Executing read on {}.{} using index {}", cfs.metadata.keyspace, cfs.metadata.name, index.getIndexMetadata().name);
- }
+ Index.Searcher searcher = null;
+ if (index != null)
+ {
+ if (!cfs.indexManager.isIndexQueryable(index))
+ throw new IndexNotAvailableException(index);
- UnfilteredPartitionIterator iterator = (null == searcher) ? queryStorage(cfs, executionController) : searcher.search(executionController);
- iterator = RTBoundValidator.validate(iterator, Stage.MERGED, false);
+ searcher = index.searcherFor(this);
+ Tracing.trace("Executing read on {}.{} using index {}", cfs.metadata.keyspace, cfs.metadata.name, index.getIndexMetadata().name);
+ }
- try
- {
- iterator = withStateTracking(iterator);
- iterator = RTBoundValidator.validate(withoutPurgeableTombstones(iterator, cfs, executionController), Stage.PURGED, false);
- iterator = withMetricsRecording(iterator, cfs.metric, startTimeNanos);
-
- // If we've used a 2ndary index, we know the result already satisfy the primary expression used, so
- // no point in checking it again.
- RowFilter filter = (null == searcher) ? rowFilter() : index.getPostIndexQueryFilter(rowFilter());
-
- /*
- * TODO: We'll currently do filtering by the rowFilter here because it's convenient. However,
- * we'll probably want to optimize by pushing it down the layer (like for dropped columns) as it
- * would be more efficient (the sooner we discard stuff we know we don't care, the less useless
- * processing we do on it).
- */
- iterator = filter.filter(iterator, nowInSec());
+ UnfilteredPartitionIterator iterator = (null == searcher) ? queryStorage(cfs, executionController) : searcher.search(executionController);
+ iterator = RTBoundValidator.validate(iterator, Stage.MERGED, false);
- // apply the limits/row counter; this transformation is stopping and would close the iterator as soon
- // as the count is observed; if that happens in the middle of an open RT, its end bound will not be included.
- // If tracking repaired data, the counter is needed for overreading repaired data, otherwise we can
- // optimise the case where this.limit = DataLimits.NONE which skips an unnecessary transform
- if (executionController.isTrackingRepairedStatus())
+ try
{
- DataLimits.Counter limit =
+ iterator = withQuerySizeTracking(iterator);
+ iterator = withStateTracking(iterator);
+ iterator = RTBoundValidator.validate(withoutPurgeableTombstones(iterator, cfs, executionController), Stage.PURGED, false);
+ iterator = withMetricsRecording(iterator, cfs.metric, startTimeNanos);
+
+ // If we've used a 2ndary index, we know the result already satisfy the primary expression used, so
+ // no point in checking it again.
+ RowFilter filter = (null == searcher) ? rowFilter() : index.getPostIndexQueryFilter(rowFilter());
+
+ /*
+ * TODO: We'll currently do filtering by the rowFilter here because it's convenient. However,
+ * we'll probably want to optimize by pushing it down the layer (like for dropped columns) as it
+ * would be more efficient (the sooner we discard stuff we know we don't care, the less useless
+ * processing we do on it).
+ */
+ iterator = filter.filter(iterator, nowInSec());
+
+ // apply the limits/row counter; this transformation is stopping and would close the iterator as soon
+ // as the count is observed; if that happens in the middle of an open RT, its end bound will not be included.
+ // If tracking repaired data, the counter is needed for overreading repaired data, otherwise we can
+ // optimise the case where this.limit = DataLimits.NONE which skips an unnecessary transform
+ if (executionController.isTrackingRepairedStatus())
+ {
+ DataLimits.Counter limit =
limits().newCounter(nowInSec(), false, selectsFullPartition(), metadata().enforceStrictLiveness());
- iterator = limit.applyTo(iterator);
- // ensure that a consistent amount of repaired data is read on each replica. This causes silent
- // overreading from the repaired data set, up to limits(). The extra data is not visible to
- // the caller, only iterated to produce the repaired data digest.
- iterator = executionController.getRepairedDataInfo().extend(iterator, limit);
+ iterator = limit.applyTo(iterator);
+ // ensure that a consistent amount of repaired data is read on each replica. This causes silent
+ // overreading from the repaired data set, up to limits(). The extra data is not visible to
+ // the caller, only iterated to produce the repaired data digest.
+ iterator = executionController.getRepairedDataInfo().extend(iterator, limit);
+ }
+ else
+ {
+ iterator = limits().filter(iterator, nowInSec(), selectsFullPartition());
+ }
+
+ // because of the above, we need to append an aritifical end bound if the source iterator was stopped short by a counter.
+ return RTBoundCloser.close(iterator);
}
- else
+ catch (RuntimeException | Error e)
{
- iterator = limits().filter(iterator, nowInSec(), selectsFullPartition());
+ iterator.close();
+ throw e;
}
-
- // because of the above, we need to append an aritifical end bound if the source iterator was stopped short by a counter.
- return RTBoundCloser.close(iterator);
}
- catch (RuntimeException | Error e)
+ finally
{
- iterator.close();
- throw e;
+ COMMAND.set(null);
}
}
@@ -632,6 +652,88 @@ public abstract class ReadCommand extends AbstractReadQuery
}
}
+ private boolean shouldTrackSize(long warnThresholdBytes, long abortThresholdBytes)
+ {
+ return trackWarnings
+ && !SchemaConstants.isSystemKeyspace(metadata().keyspace)
+ && !(warnThresholdBytes == 0 && abortThresholdBytes == 0);
+ }
+
+ private UnfilteredPartitionIterator withQuerySizeTracking(UnfilteredPartitionIterator iterator)
+ {
+ final long warnThresholdBytes = DatabaseDescriptor.getLocalReadSizeWarnThresholdKb() * 1024;
+ final long abortThresholdBytes = DatabaseDescriptor.getLocalReadSizeAbortThresholdKb() * 1024;
+ if (!shouldTrackSize(warnThresholdBytes, abortThresholdBytes))
+ return iterator;
+ class QuerySizeTracking extends Transformation<UnfilteredRowIterator>
+ {
+ private long sizeInBytes = 0;
+
+ @Override
+ public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator iter)
+ {
+ sizeInBytes += ObjectSizes.sizeOnHeapOf(iter.partitionKey().getKey());
+ return Transformation.apply(iter, this);
+ }
+
+ @Override
+ protected Row applyToStatic(Row row)
+ {
+ return applyToRow(row);
+ }
+
+ @Override
+ protected Row applyToRow(Row row)
+ {
+ addSize(row.unsharedHeapSize());
+ return row;
+ }
+
+ @Override
+ protected RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
+ {
+ addSize(marker.unsharedHeapSize());
+ return marker;
+ }
+
+ @Override
+ protected DeletionTime applyToDeletion(DeletionTime deletionTime)
+ {
+ addSize(deletionTime.unsharedHeapSize());
+ return deletionTime;
+ }
+
+ private void addSize(long size)
+ {
+ this.sizeInBytes += size;
+ if (abortThresholdBytes != 0 && this.sizeInBytes >= abortThresholdBytes)
+ {
+ String msg = String.format("Query %s attempted to read %d bytes but max allowed is %d; query aborted (see track_warnings.local_read_size.abort_threshold_kb)",
+ ReadCommand.this.toCQLString(), this.sizeInBytes, abortThresholdBytes);
+ Tracing.trace(msg);
+ MessageParams.remove(ParamType.LOCAL_READ_SIZE_WARN);
+ MessageParams.add(ParamType.LOCAL_READ_SIZE_ABORT, this.sizeInBytes);
+ throw new LocalReadSizeTooLargeException(msg);
+ }
+ else if (warnThresholdBytes != 0 && this.sizeInBytes >= warnThresholdBytes)
+ {
+ MessageParams.add(ParamType.LOCAL_READ_SIZE_WARN, this.sizeInBytes);
+ }
+ }
+
+ @Override
+ protected void onClose()
+ {
+ ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(metadata().id);
+ if (cfs != null)
+ cfs.metric.localReadSize.update(sizeInBytes);
+ }
+ }
+
+ iterator = Transformation.apply(iterator, new QuerySizeTracking());
+ return iterator;
+ }
+
protected UnfilteredPartitionIterator withStateTracking(UnfilteredPartitionIterator iter)
{
return Transformation.apply(iter, new CheckForAbort());
diff --git a/src/java/org/apache/cassandra/db/RowIndexEntry.java b/src/java/org/apache/cassandra/db/RowIndexEntry.java
index 215768b..895bea9 100644
--- a/src/java/org/apache/cassandra/db/RowIndexEntry.java
+++ b/src/java/org/apache/cassandra/db/RowIndexEntry.java
@@ -24,6 +24,7 @@ import java.util.List;
import com.codahale.metrics.Histogram;
import org.apache.cassandra.cache.IMeasurableMemory;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.filter.RowIndexEntryTooLargeException;
import org.apache.cassandra.io.ISerializer;
import org.apache.cassandra.io.sstable.IndexInfo;
import org.apache.cassandra.io.sstable.format.Version;
@@ -36,6 +37,9 @@ import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.io.util.TrackedDataInputPlus;
import org.apache.cassandra.metrics.DefaultNameFactory;
import org.apache.cassandra.metrics.MetricNameFactory;
+import org.apache.cassandra.net.ParamType;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.utils.ObjectSizes;
import org.apache.cassandra.utils.vint.VIntCoding;
import org.github.jamm.Unmetered;
@@ -324,6 +328,8 @@ public class RowIndexEntry<T> implements IMeasurableMemory
DeletionTime deletionTime = DeletionTime.serializer.deserialize(in);
int columnsIndexCount = (int) in.readUnsignedVInt();
+ checkSize(columnsIndexCount, size);
+
int indexedPartSize = size - serializedSize(deletionTime, headerLength, columnsIndexCount);
if (size <= DatabaseDescriptor.getColumnIndexCacheSize())
@@ -343,6 +349,51 @@ public class RowIndexEntry<T> implements IMeasurableMemory
}
}
+ private void checkSize(int entries, int bytes)
+ {
+ ReadCommand command = ReadCommand.getCommand();
+ if (command == null || SchemaConstants.isSystemKeyspace(command.metadata().keyspace) || !DatabaseDescriptor.getTrackWarningsEnabled())
+ return;
+
+ int warnThreshold = DatabaseDescriptor.getRowIndexSizeWarnThresholdKb() * 1024;
+ int abortThreshold = DatabaseDescriptor.getRowIndexSizeAbortThresholdKb() * 1024;
+ if (warnThreshold == 0 && abortThreshold == 0)
+ return;
+
+ long estimatedMemory = estimateMaterializedIndexSize(entries, bytes);
+ ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(command.metadata().id);
+ if (cfs != null)
+ cfs.metric.rowIndexSize.update(estimatedMemory);
+
+ if (abortThreshold != 0 && estimatedMemory > abortThreshold)
+ {
+ String msg = String.format("Query %s attempted to access a large RowIndexEntry estimated to be %d bytes " +
+ "in-memory (total entries: %d, total bytes: %d) but the max allowed is %d;" +
+ " query aborted (see row_index_size_abort_threshold_kb)",
+ command.toCQLString(), estimatedMemory, entries, bytes, abortThreshold);
+ MessageParams.remove(ParamType.ROW_INDEX_SIZE_WARN);
+ MessageParams.add(ParamType.ROW_INDEX_SIZE_ABORT, estimatedMemory);
+
+ throw new RowIndexEntryTooLargeException(msg);
+ }
+ else if (warnThreshold != 0 && estimatedMemory > warnThreshold)
+ {
+ // use addIfLarger rather than add as a previous partition may be larger than this one
+ Long current = MessageParams.get(ParamType.ROW_INDEX_SIZE_WARN);
+ if (current == null || current.compareTo(estimatedMemory) < 0)
+ MessageParams.add(ParamType.ROW_INDEX_SIZE_WARN, estimatedMemory);
+ }
+ }
+
+ private static long estimateMaterializedIndexSize(int entries, int bytes)
+ {
+ long overhead = IndexInfo.EMPTY_SIZE
+ + ArrayClustering.EMPTY_SIZE
+ + DeletionTime.EMPTY_SIZE;
+
+ return (overhead * entries) + bytes;
+ }
+
public long deserializePositionAndSkip(DataInputPlus in) throws IOException
{
long position = in.readUnsignedVInt();
diff --git a/src/java/org/apache/cassandra/exceptions/TombstoneAbortException.java b/src/java/org/apache/cassandra/db/filter/LocalReadSizeTooLargeException.java
similarity index 50%
copy from src/java/org/apache/cassandra/exceptions/TombstoneAbortException.java
copy to src/java/org/apache/cassandra/db/filter/LocalReadSizeTooLargeException.java
index e86e760..9d872df 100644
--- a/src/java/org/apache/cassandra/exceptions/TombstoneAbortException.java
+++ b/src/java/org/apache/cassandra/db/filter/LocalReadSizeTooLargeException.java
@@ -16,24 +16,14 @@
* limitations under the License.
*/
-package org.apache.cassandra.exceptions;
+package org.apache.cassandra.db.filter;
-import java.util.Map;
+import org.apache.cassandra.db.RejectException;
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.locator.InetAddressAndPort;
-
-import static org.apache.cassandra.service.reads.ReadCallback.tombstoneAbortMessage;
-
-public class TombstoneAbortException extends ReadAbortException
+public class LocalReadSizeTooLargeException extends RejectException
{
- public final int nodes;
- public final int tombstones;
-
- public TombstoneAbortException(int nodes, int tombstones, String cql, boolean dataPresent, ConsistencyLevel consistency, int received, int blockFor, Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint)
+ public LocalReadSizeTooLargeException(String message)
{
- super(tombstoneAbortMessage(nodes, tombstones, cql), consistency, received, blockFor, dataPresent, failureReasonByEndpoint);
- this.nodes = nodes;
- this.tombstones = tombstones;
+ super(message);
}
}
diff --git a/src/java/org/apache/cassandra/exceptions/TombstoneAbortException.java b/src/java/org/apache/cassandra/db/filter/RowIndexEntryTooLargeException.java
similarity index 50%
copy from src/java/org/apache/cassandra/exceptions/TombstoneAbortException.java
copy to src/java/org/apache/cassandra/db/filter/RowIndexEntryTooLargeException.java
index e86e760..5f7bfcd 100644
--- a/src/java/org/apache/cassandra/exceptions/TombstoneAbortException.java
+++ b/src/java/org/apache/cassandra/db/filter/RowIndexEntryTooLargeException.java
@@ -16,24 +16,14 @@
* limitations under the License.
*/
-package org.apache.cassandra.exceptions;
+package org.apache.cassandra.db.filter;
-import java.util.Map;
+import org.apache.cassandra.db.RejectException;
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.locator.InetAddressAndPort;
-
-import static org.apache.cassandra.service.reads.ReadCallback.tombstoneAbortMessage;
-
-public class TombstoneAbortException extends ReadAbortException
+public class RowIndexEntryTooLargeException extends RejectException
{
- public final int nodes;
- public final int tombstones;
-
- public TombstoneAbortException(int nodes, int tombstones, String cql, boolean dataPresent, ConsistencyLevel consistency, int received, int blockFor, Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint)
+ public RowIndexEntryTooLargeException(String message)
{
- super(tombstoneAbortMessage(nodes, tombstones, cql), consistency, received, blockFor, dataPresent, failureReasonByEndpoint);
- this.nodes = nodes;
- this.tombstones = tombstones;
+ super(message);
}
}
diff --git a/src/java/org/apache/cassandra/db/rows/AbstractRangeTombstoneMarker.java b/src/java/org/apache/cassandra/db/rows/AbstractRangeTombstoneMarker.java
index 7dac1fa..be32847 100644
--- a/src/java/org/apache/cassandra/db/rows/AbstractRangeTombstoneMarker.java
+++ b/src/java/org/apache/cassandra/db/rows/AbstractRangeTombstoneMarker.java
@@ -17,10 +17,8 @@
*/
package org.apache.cassandra.db.rows;
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.db.ClusteringBoundOrBoundary;
+import org.apache.cassandra.schema.TableMetadata;
public abstract class AbstractRangeTombstoneMarker<B extends ClusteringBoundOrBoundary<?>> implements RangeTombstoneMarker
{
diff --git a/src/java/org/apache/cassandra/db/rows/ArrayCell.java b/src/java/org/apache/cassandra/db/rows/ArrayCell.java
index c4fdd14..eddc11c 100644
--- a/src/java/org/apache/cassandra/db/rows/ArrayCell.java
+++ b/src/java/org/apache/cassandra/db/rows/ArrayCell.java
@@ -110,6 +110,13 @@ public class ArrayCell extends AbstractCell<byte[]>
return new BufferCell(column, timestamp, ttl, localDeletionTime, allocator.clone(value), path == null ? null : path.copy(allocator));
}
+ @Override
+ public long unsharedHeapSize()
+ {
+ return EMPTY_SIZE + ObjectSizes.sizeOfArray(value) + (path == null ? 0 : path.unsharedHeapSize());
+ }
+
+ @Override
public long unsharedHeapSizeExcludingData()
{
return EMPTY_SIZE + ObjectSizes.sizeOfEmptyByteArray() + (path == null ? 0 : path.unsharedHeapSizeExcludingData());
diff --git a/src/java/org/apache/cassandra/db/rows/BTreeRow.java b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
index bd44b66..2d3ee83 100644
--- a/src/java/org/apache/cassandra/db/rows/BTreeRow.java
+++ b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
@@ -497,6 +497,19 @@ public class BTreeRow extends AbstractRow
return Ints.checkedCast(accumulate((cd, v) -> v + cd.dataSize(), dataSize));
}
+ @Override
+ public long unsharedHeapSize()
+ {
+ long heapSize = EMPTY_SIZE
+ + clustering.unsharedHeapSize()
+ + primaryKeyLivenessInfo.unsharedHeapSize()
+ + deletion.unsharedHeapSize()
+ + BTree.sizeOfStructureOnHeap(btree);
+
+ return accumulate((cd, v) -> v + cd.unsharedHeapSize(), heapSize);
+ }
+
+ @Override
public long unsharedHeapSizeExcludingData()
{
long heapSize = EMPTY_SIZE
diff --git a/src/java/org/apache/cassandra/db/rows/BufferCell.java b/src/java/org/apache/cassandra/db/rows/BufferCell.java
index 55fc4b4..7870bf1 100644
--- a/src/java/org/apache/cassandra/db/rows/BufferCell.java
+++ b/src/java/org/apache/cassandra/db/rows/BufferCell.java
@@ -142,6 +142,13 @@ public class BufferCell extends AbstractCell<ByteBuffer>
return new BufferCell(column, timestamp, ttl, localDeletionTime, allocator.clone(value), path == null ? null : path.copy(allocator));
}
+ @Override
+ public long unsharedHeapSize()
+ {
+ return EMPTY_SIZE + ObjectSizes.sizeOnHeapOf(value) + (path == null ? 0 : path.unsharedHeapSize());
+ }
+
+ @Override
public long unsharedHeapSizeExcludingData()
{
return EMPTY_SIZE + ObjectSizes.sizeOfEmptyHeapByteBuffer() + (path == null ? 0 : path.unsharedHeapSizeExcludingData());
diff --git a/src/java/org/apache/cassandra/db/rows/CellPath.java b/src/java/org/apache/cassandra/db/rows/CellPath.java
index 50496a1..27b6272 100644
--- a/src/java/org/apache/cassandra/db/rows/CellPath.java
+++ b/src/java/org/apache/cassandra/db/rows/CellPath.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;
+import org.apache.cassandra.cache.IMeasurableMemory;
import org.apache.cassandra.db.Digest;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.io.util.DataInputPlus;
@@ -31,7 +32,7 @@ import org.apache.cassandra.utils.memory.AbstractAllocator;
/**
* A path for a cell belonging to a complex column type (non-frozen collection or UDT).
*/
-public abstract class CellPath
+public abstract class CellPath implements IMeasurableMemory
{
public static final CellPath BOTTOM = new EmptyCellPath();
public static final CellPath TOP = new EmptyCellPath();
@@ -125,6 +126,13 @@ public abstract class CellPath
return new SingleItemCellPath(allocator.clone(value));
}
+ @Override
+ public long unsharedHeapSize()
+ {
+ return EMPTY_SIZE + ObjectSizes.sizeOnHeapOf(value);
+ }
+
+ @Override
public long unsharedHeapSizeExcludingData()
{
return EMPTY_SIZE + ObjectSizes.sizeOfEmptyHeapByteBuffer();
@@ -148,6 +156,14 @@ public abstract class CellPath
return this;
}
+ @Override
+ public long unsharedHeapSize()
+ {
+ // empty only happens with a cached reference, so 0 unshared space
+ return 0;
+ }
+
+ @Override
public long unsharedHeapSizeExcludingData()
{
return 0;
diff --git a/src/java/org/apache/cassandra/db/rows/ColumnData.java b/src/java/org/apache/cassandra/db/rows/ColumnData.java
index 36aad97..4146946 100644
--- a/src/java/org/apache/cassandra/db/rows/ColumnData.java
+++ b/src/java/org/apache/cassandra/db/rows/ColumnData.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.rows;
import java.util.Comparator;
+import org.apache.cassandra.cache.IMeasurableMemory;
import org.apache.cassandra.db.Digest;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.db.DeletionPurger;
@@ -31,7 +32,7 @@ import org.apache.cassandra.serializers.MarshalException;
* In practice, there is only 2 implementations of this: either {@link Cell} for simple columns
* or {@code ComplexColumnData} for complex columns.
*/
-public abstract class ColumnData
+public abstract class ColumnData implements IMeasurableMemory
{
public static final Comparator<ColumnData> comparator = (cd1, cd2) -> cd1.column().compareTo(cd2.column());
diff --git a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
index 9f35437..bf7714d 100644
--- a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
+++ b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
@@ -121,6 +121,16 @@ public class ComplexColumnData extends ColumnData implements Iterable<Cell<?>>
return size;
}
+ @Override
+ public long unsharedHeapSize()
+ {
+ long heapSize = EMPTY_SIZE + ObjectSizes.sizeOfArray(cells);
+ for (Cell<?> cell : this)
+ heapSize += cell.unsharedHeapSize();
+ return heapSize;
+ }
+
+ @Override
public long unsharedHeapSizeExcludingData()
{
long heapSize = EMPTY_SIZE + ObjectSizes.sizeOfArray(cells);
diff --git a/src/java/org/apache/cassandra/db/rows/NativeCell.java b/src/java/org/apache/cassandra/db/rows/NativeCell.java
index 02e0008..03dfc70 100644
--- a/src/java/org/apache/cassandra/db/rows/NativeCell.java
+++ b/src/java/org/apache/cassandra/db/rows/NativeCell.java
@@ -166,6 +166,13 @@ public class NativeCell extends AbstractCell<ByteBuffer>
return new BufferCell(column, timestamp(), ttl(), localDeletionTime(), ByteBufferUtil.EMPTY_BYTE_BUFFER, path());
}
+ @Override
+ public long unsharedHeapSize()
+ {
+ return EMPTY_SIZE;
+ }
+
+ @Override
public long unsharedHeapSizeExcludingData()
{
return EMPTY_SIZE;
diff --git a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java
index 20bc484..c38a6cd 100644
--- a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java
+++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java
@@ -22,6 +22,7 @@ import java.util.Objects;
import org.apache.cassandra.db.marshal.ValueAccessor;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.utils.ObjectSizes;
import org.apache.cassandra.utils.memory.AbstractAllocator;
/**
@@ -29,6 +30,8 @@ import org.apache.cassandra.utils.memory.AbstractAllocator;
*/
public class RangeTombstoneBoundMarker extends AbstractRangeTombstoneMarker<ClusteringBound<?>>
{
+ private static final long EMPTY_SIZE = ObjectSizes.measure(new RangeTombstoneBoundMarker(new ArrayClusteringBound(ClusteringPrefix.Kind.INCL_START_BOUND, AbstractArrayClusteringPrefix.EMPTY_VALUES_ARRAY), null));
+
private final DeletionTime deletion;
public RangeTombstoneBoundMarker(ClusteringBound<?> bound, DeletionTime deletion)
@@ -156,6 +159,12 @@ public class RangeTombstoneBoundMarker extends AbstractRangeTombstoneMarker<Clus
deletion.digest(digest);
}
+ @Override
+ public long unsharedHeapSize()
+ {
+ return EMPTY_SIZE + deletion.unsharedHeapSize();
+ }
+
public String toString(TableMetadata metadata)
{
return String.format("Marker %s@%d/%d", bound.toString(metadata), deletion.markedForDeleteAt(), deletion.localDeletionTime());
diff --git a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
index 7079981..c74f4a0 100644
--- a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
+++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
@@ -23,6 +23,7 @@ import java.util.Objects;
import org.apache.cassandra.db.marshal.ValueAccessor;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.utils.ObjectSizes;
import org.apache.cassandra.utils.memory.AbstractAllocator;
/**
@@ -30,6 +31,8 @@ import org.apache.cassandra.utils.memory.AbstractAllocator;
*/
public class RangeTombstoneBoundaryMarker extends AbstractRangeTombstoneMarker<ClusteringBoundary<?>>
{
+ private static final long EMPTY_SIZE = ObjectSizes.measure(new RangeTombstoneBoundaryMarker(new ArrayClusteringBoundary(ClusteringPrefix.Kind.INCL_END_EXCL_START_BOUNDARY, new byte[][] { new byte[0]}), null, null));
+
private final DeletionTime endDeletion;
private final DeletionTime startDeletion;
@@ -187,6 +190,12 @@ public class RangeTombstoneBoundaryMarker extends AbstractRangeTombstoneMarker<C
startDeletion.digest(digest);
}
+ @Override
+ public long unsharedHeapSize()
+ {
+ return EMPTY_SIZE + startDeletion.unsharedHeapSize() + endDeletion.unsharedHeapSize();
+ }
+
public String toString(TableMetadata metadata)
{
return String.format("Marker %s@%d/%d-%d/%d",
diff --git a/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java b/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java
index d70ac78..4ccf92d 100644
--- a/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java
+++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.rows;
import java.util.*;
+import org.apache.cassandra.cache.IMeasurableMemory;
import org.apache.cassandra.db.*;
import org.apache.cassandra.utils.memory.AbstractAllocator;
@@ -27,7 +28,7 @@ import org.apache.cassandra.utils.memory.AbstractAllocator;
* <p>
* There is 2 types of markers: bounds (see {@link RangeTombstoneBoundMarker}) and boundaries (see {@link RangeTombstoneBoundaryMarker}).
*/
-public interface RangeTombstoneMarker extends Unfiltered
+public interface RangeTombstoneMarker extends Unfiltered, IMeasurableMemory
{
@Override
public ClusteringBoundOrBoundary<?> clustering();
diff --git a/src/java/org/apache/cassandra/db/rows/Row.java b/src/java/org/apache/cassandra/db/rows/Row.java
index 5c28cd1..85d27a4 100644
--- a/src/java/org/apache/cassandra/db/rows/Row.java
+++ b/src/java/org/apache/cassandra/db/rows/Row.java
@@ -21,6 +21,7 @@ import java.util.*;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
+import org.apache.cassandra.cache.IMeasurableMemory;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.schema.ColumnMetadata;
@@ -48,7 +49,7 @@ import org.apache.cassandra.utils.btree.UpdateFunction;
* it's own data. For instance, a {@code Row} cannot contains a cell that is deleted by its own
* row deletion.
*/
-public interface Row extends Unfiltered, Iterable<ColumnData>
+public interface Row extends Unfiltered, Iterable<ColumnData>, IMeasurableMemory
{
/**
* The clustering values for this row.
diff --git a/src/java/org/apache/cassandra/exceptions/RequestFailureReason.java b/src/java/org/apache/cassandra/exceptions/RequestFailureReason.java
index 3f6c2d4..f205900 100644
--- a/src/java/org/apache/cassandra/exceptions/RequestFailureReason.java
+++ b/src/java/org/apache/cassandra/exceptions/RequestFailureReason.java
@@ -36,7 +36,7 @@ public enum RequestFailureReason
READ_TOO_MANY_TOMBSTONES (1),
TIMEOUT (2),
INCOMPATIBLE_SCHEMA (3),
- READ_TOO_LARGE (4);
+ READ_SIZE (4);
public static final Serializer serializer = new Serializer();
diff --git a/src/java/org/apache/cassandra/exceptions/TombstoneAbortException.java b/src/java/org/apache/cassandra/exceptions/TombstoneAbortException.java
index e86e760..ef30ff3 100644
--- a/src/java/org/apache/cassandra/exceptions/TombstoneAbortException.java
+++ b/src/java/org/apache/cassandra/exceptions/TombstoneAbortException.java
@@ -23,14 +23,14 @@ import java.util.Map;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.locator.InetAddressAndPort;
-import static org.apache.cassandra.service.reads.ReadCallback.tombstoneAbortMessage;
+import static org.apache.cassandra.service.reads.trackwarnings.WarningsSnapshot.tombstoneAbortMessage;
public class TombstoneAbortException extends ReadAbortException
{
public final int nodes;
- public final int tombstones;
+ public final long tombstones;
- public TombstoneAbortException(int nodes, int tombstones, String cql, boolean dataPresent, ConsistencyLevel consistency, int received, int blockFor, Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint)
+ public TombstoneAbortException(int nodes, long tombstones, String cql, boolean dataPresent, ConsistencyLevel consistency, int received, int blockFor, Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint)
{
super(tombstoneAbortMessage(nodes, tombstones, cql), consistency, received, blockFor, dataPresent, failureReasonByEndpoint);
this.nodes = nodes;
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexInfo.java b/src/java/org/apache/cassandra/io/sstable/IndexInfo.java
index e24436d..e744150 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexInfo.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexInfo.java
@@ -27,7 +27,6 @@ import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.ByteArrayAccessor;
import org.apache.cassandra.io.ISerializer;
import org.apache.cassandra.io.sstable.format.Version;
import org.apache.cassandra.io.util.DataInputPlus;
@@ -59,7 +58,7 @@ import org.apache.cassandra.utils.ObjectSizes;
*/
public class IndexInfo
{
- private static final long EMPTY_SIZE = ObjectSizes.measure(new IndexInfo(null, null, 0, 0, null));
+ public static final long EMPTY_SIZE = ObjectSizes.measure(new IndexInfo(null, null, 0, 0, null));
public final long offset;
public final long width;
diff --git a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
index d0607af..776027e 100644
--- a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
@@ -156,8 +156,17 @@ public class KeyspaceMetrics
public final Meter clientTombstoneWarnings;
public final Meter clientTombstoneAborts;
- public final Meter clientReadSizeWarnings;
- public final Meter clientReadSizeAborts;
+ public final Meter coordinatorReadSizeWarnings;
+ public final Meter coordinatorReadSizeAborts;
+ public final Histogram coordinatorReadSize;
+
+ public final Meter localReadSizeWarnings;
+ public final Meter localReadSizeAborts;
+ public final Histogram localReadSize;
+
+ public final Meter rowIndexSizeWarnings;
+ public final Meter rowIndexSizeAborts;
+ public final Histogram rowIndexSize;
public final MetricNameFactory factory;
private Keyspace keyspace;
@@ -245,8 +254,17 @@ public class KeyspaceMetrics
clientTombstoneWarnings = createKeyspaceMeter("ClientTombstoneWarnings");
clientTombstoneAborts = createKeyspaceMeter("ClientTombstoneAborts");
- clientReadSizeWarnings = createKeyspaceMeter("ClientReadSizeWarnings");
- clientReadSizeAborts = createKeyspaceMeter("ClientReadSizeAborts");
+ coordinatorReadSizeWarnings = createKeyspaceMeter("CoordinatorReadSizeWarnings");
+ coordinatorReadSizeAborts = createKeyspaceMeter("CoordinatorReadSizeAborts");
+ coordinatorReadSize = createKeyspaceHistogram("CoordinatorReadSize", false);
+
+ localReadSizeWarnings = createKeyspaceMeter("LocalReadSizeWarnings");
+ localReadSizeAborts = createKeyspaceMeter("LocalReadSizeAborts");
+ localReadSize = createKeyspaceHistogram("LocalReadSize", false);
+
+ rowIndexSizeWarnings = createKeyspaceMeter("RowIndexSizeWarnings");
+ rowIndexSizeAborts = createKeyspaceMeter("RowIndexSizeAborts");
+ rowIndexSize = createKeyspaceHistogram("RowIndexSize", false);
}
/**
diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java
index ced0622..6b7193c 100644
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@ -261,8 +261,18 @@ public class TableMetrics
public final TableMeter clientTombstoneWarnings;
public final TableMeter clientTombstoneAborts;
- public final TableMeter clientReadSizeWarnings;
- public final TableMeter clientReadSizeAborts;
+
+ public final TableMeter coordinatorReadSizeWarnings;
+ public final TableMeter coordinatorReadSizeAborts;
+ public final TableHistogram coordinatorReadSize;
+
+ public final TableMeter localReadSizeWarnings;
+ public final TableMeter localReadSizeAborts;
+ public final TableHistogram localReadSize;
+
+ public final TableMeter rowIndexSizeWarnings;
+ public final TableMeter rowIndexSizeAborts;
+ public final TableHistogram rowIndexSize;
private static Pair<Long, Long> totalNonSystemTablesSize(Predicate<SSTableReader> predicate)
{
@@ -922,8 +932,17 @@ public class TableMetrics
clientTombstoneWarnings = createTableMeter("ClientTombstoneWarnings", cfs.keyspace.metric.clientTombstoneWarnings);
clientTombstoneAborts = createTableMeter("ClientTombstoneAborts", cfs.keyspace.metric.clientTombstoneAborts);
- clientReadSizeWarnings = createTableMeter("ClientReadSizeWarnings", cfs.keyspace.metric.clientReadSizeWarnings);
- clientReadSizeAborts = createTableMeter("ClientReadSizeAborts", cfs.keyspace.metric.clientReadSizeAborts);
+ coordinatorReadSizeWarnings = createTableMeter("CoordinatorReadSizeWarnings", cfs.keyspace.metric.coordinatorReadSizeWarnings);
+ coordinatorReadSizeAborts = createTableMeter("CoordinatorReadSizeAborts", cfs.keyspace.metric.coordinatorReadSizeAborts);
+ coordinatorReadSize = createTableHistogram("CoordinatorReadSize", cfs.keyspace.metric.coordinatorReadSize, false);
+
+ localReadSizeWarnings = createTableMeter("LocalReadSizeWarnings", cfs.keyspace.metric.localReadSizeWarnings);
+ localReadSizeAborts = createTableMeter("LocalReadSizeAborts", cfs.keyspace.metric.localReadSizeAborts);
+ localReadSize = createTableHistogram("LocalReadSize", cfs.keyspace.metric.localReadSize, false);
+
+ rowIndexSizeWarnings = createTableMeter("RowIndexSizeWarnings", cfs.keyspace.metric.rowIndexSizeWarnings);
+ rowIndexSizeAborts = createTableMeter("RowIndexSizeAborts", cfs.keyspace.metric.rowIndexSizeAborts);
+ rowIndexSize = createTableHistogram("RowIndexSize", cfs.keyspace.metric.rowIndexSize, false);
}
public void updateSSTableIterated(int count)
diff --git a/src/java/org/apache/cassandra/net/ParamType.java b/src/java/org/apache/cassandra/net/ParamType.java
index d8b8c0e..038530b 100644
--- a/src/java/org/apache/cassandra/net/ParamType.java
+++ b/src/java/org/apache/cassandra/net/ParamType.java
@@ -25,6 +25,7 @@ import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.Int32Serializer;
+import org.apache.cassandra.utils.Int64Serializer;
import org.apache.cassandra.utils.UUIDSerializer;
import static java.lang.Math.max;
@@ -57,7 +58,11 @@ public enum ParamType
TRACK_REPAIRED_DATA (7, "TrackRepaired", LegacyFlag.serializer),
TOMBSTONE_ABORT(8, "TSA", Int32Serializer.serializer),
- TOMBSTONE_WARNING(9, "TSW", Int32Serializer.serializer);
+ TOMBSTONE_WARNING(9, "TSW", Int32Serializer.serializer),
+ LOCAL_READ_SIZE_ABORT(10, "LRSA", Int64Serializer.serializer),
+ LOCAL_READ_SIZE_WARN(11, "LRSW", Int64Serializer.serializer),
+ ROW_INDEX_SIZE_ABORT(12, "RISA", Int64Serializer.serializer),
+ ROW_INDEX_SIZE_WARN(13, "RISW", Int64Serializer.serializer);
final int id;
@Deprecated final String legacyAlias; // pre-4.0 we used to serialize entire param name string
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index dcd2598..0e663c0 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -6128,41 +6128,105 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
@Override
- public long getClientLargeReadWarnThresholdKB()
+ public boolean getTrackWarningsEnabled()
{
- return DatabaseDescriptor.getClientLargeReadWarnThresholdKB();
+ return DatabaseDescriptor.getTrackWarningsEnabled();
}
@Override
- public void setClientLargeReadWarnThresholdKB(long threshold)
+ public void setTrackWarningsEnabled(boolean value)
{
- DatabaseDescriptor.setClientLargeReadWarnThresholdKB(threshold);
- logger.info("updated client_large_read_warn_threshold_kb to {}", threshold);
+ DatabaseDescriptor.setTrackWarningsEnabled(value);
+ logger.info("updated track_warnings.enabled to {}", value);
}
@Override
- public long getClientLargeReadAbortThresholdKB()
+ public long getCoordinatorLargeReadWarnThresholdKB()
{
- return DatabaseDescriptor.getClientLargeReadAbortThresholdKB();
+ return DatabaseDescriptor.getCoordinatorReadSizeWarnThresholdKB();
}
@Override
- public void setClientLargeReadAbortThresholdKB(long threshold)
+ public void setCoordinatorLargeReadWarnThresholdKB(long threshold)
{
- DatabaseDescriptor.setClientLargeReadAbortThresholdKB(threshold);
- logger.info("updated client_large_read_abort_threshold_kb to {}", threshold);
+ if (threshold < 0)
+ throw new IllegalArgumentException("threshold " + threshold + " is less than 0; must be positive or zero");
+ DatabaseDescriptor.setCoordinatorReadSizeWarnThresholdKB(threshold);
+ logger.info("updated track_warnings.coordinator_large_read.warn_threshold_kb to {}", threshold);
}
@Override
- public boolean getClientTrackWarningsEnabled()
+ public long getCoordinatorLargeReadAbortThresholdKB()
{
- return DatabaseDescriptor.getClientTrackWarningsEnabled();
+ return DatabaseDescriptor.getCoordinatorReadSizeAbortThresholdKB();
}
@Override
- public void setClientTrackWarningsEnabled(boolean value)
+ public void setCoordinatorLargeReadAbortThresholdKB(long threshold)
{
- DatabaseDescriptor.setClientTrackWarningsEnabled(value);
- logger.info("updated client_track_warnings_enabled to {}", value);
+ if (threshold < 0)
+ throw new IllegalArgumentException("threshold " + threshold + " is less than 0; must be positive or zero");
+ DatabaseDescriptor.setCoordinatorReadSizeAbortThresholdKB(threshold);
+ logger.info("updated track_warnings.coordinator_large_read.abort_threshold_kb to {}", threshold);
+ }
+
+ @Override
+ public long getLocalReadTooLargeWarnThresholdKb()
+ {
+ return DatabaseDescriptor.getLocalReadSizeWarnThresholdKb();
+ }
+
+ @Override
+ public void setLocalReadTooLargeWarnThresholdKb(long value)
+ {
+ if (value < 0)
+ throw new IllegalArgumentException("value " + value + " is less than 0; must be positive or zero");
+ DatabaseDescriptor.setLocalReadSizeWarnThresholdKb(value);
+ logger.info("updated track_warnings.local_read_size.warn_threshold_kb to {}", value);
+ }
+
+ @Override
+ public long getLocalReadTooLargeAbortThresholdKb()
+ {
+ return DatabaseDescriptor.getLocalReadSizeAbortThresholdKb();
+ }
+
+ @Override
+ public void setLocalReadTooLargeAbortThresholdKb(long value)
+ {
+ if (value < 0)
+ throw new IllegalArgumentException("value " + value + " is less than 0; must be positive or zero");
+ DatabaseDescriptor.setLocalReadSizeAbortThresholdKb(value);
+ logger.info("updated track_warnings.local_read_size.abort_threshold_kb to {}", value);
+ }
+
+ @Override
+ public int getRowIndexSizeWarnThresholdKb()
+ {
+ return DatabaseDescriptor.getRowIndexSizeWarnThresholdKb();
+ }
+
+ @Override
+ public void setRowIndexSizeWarnThresholdKb(int value)
+ {
+ if (value < 0)
+ throw new IllegalArgumentException("value " + value + " is less than 0; must be positive or zero");
+ DatabaseDescriptor.setRowIndexSizeWarnThresholdKb(value);
+ logger.info("updated track_warnings.row_index_size.warn_threshold_kb to {}", value);
+ }
+
+ @Override
+ public int getRowIndexSizeAbortThresholdKb()
+ {
+ return DatabaseDescriptor.getRowIndexSizeAbortThresholdKb();
+ }
+
+ @Override
+ public void setRowIndexSizeAbortThresholdKb(int value)
+ {
+ if (value < 0)
+ throw new IllegalArgumentException("value " + value + " is less than 0; must be positive or zero");
+ DatabaseDescriptor.setRowIndexSizeAbortThresholdKb(value);
+ logger.info("updated track_warnings.row_index_size.abort_threshold_kb to {}", value);
}
}
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index f29dd89..6a294c1 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -892,10 +892,21 @@ public interface StorageServiceMBean extends NotificationEmitter
public void setCompactionTombstoneWarningThreshold(int count);
public int getCompactionTombstoneWarningThreshold();
- public long getClientLargeReadWarnThresholdKB();
- public void setClientLargeReadWarnThresholdKB(long threshold);
- public long getClientLargeReadAbortThresholdKB();
- public void setClientLargeReadAbortThresholdKB(long threshold);
- public boolean getClientTrackWarningsEnabled();
- public void setClientTrackWarningsEnabled(boolean value);
+ public boolean getTrackWarningsEnabled();
+ public void setTrackWarningsEnabled(boolean value);
+
+ public long getCoordinatorLargeReadWarnThresholdKB();
+ public void setCoordinatorLargeReadWarnThresholdKB(long threshold);
+ public long getCoordinatorLargeReadAbortThresholdKB();
+ public void setCoordinatorLargeReadAbortThresholdKB(long threshold);
+
+ public long getLocalReadTooLargeWarnThresholdKb();
+ public void setLocalReadTooLargeWarnThresholdKb(long value);
+ public long getLocalReadTooLargeAbortThresholdKb();
+ public void setLocalReadTooLargeAbortThresholdKb(long value);
+
+ public int getRowIndexSizeWarnThresholdKb();
+ public void setRowIndexSizeWarnThresholdKb(int value);
+ public int getRowIndexSizeAbortThresholdKb();
+ public void setRowIndexSizeAbortThresholdKb(int value);
}
diff --git a/src/java/org/apache/cassandra/service/reads/ReadCallback.java b/src/java/org/apache/cassandra/service/reads/ReadCallback.java
index 6703147..15f1559 100644
--- a/src/java/org/apache/cassandra/service/reads/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/reads/ReadCallback.java
@@ -20,20 +20,14 @@ package org.apache.cassandra.service.reads;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-import com.google.common.annotations.VisibleForTesting;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.MessageParams;
-import org.apache.cassandra.exceptions.TombstoneAbortException;
-import org.apache.cassandra.locator.ReplicaPlan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.MessageParams;
import org.apache.cassandra.db.PartitionRangeReadCommand;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.ReadResponse;
@@ -42,12 +36,14 @@ import org.apache.cassandra.exceptions.ReadTimeoutException;
import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.locator.Endpoints;
import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.ReplicaPlan;
+import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.ParamType;
import org.apache.cassandra.net.RequestCallback;
-import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.Verb;
-import org.apache.cassandra.schema.Schema;
-import org.apache.cassandra.service.ClientWarn;
+import org.apache.cassandra.service.reads.trackwarnings.CoordinatorWarnings;
+import org.apache.cassandra.service.reads.trackwarnings.WarningContext;
+import org.apache.cassandra.service.reads.trackwarnings.WarningsSnapshot;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.concurrent.SimpleCondition;
@@ -56,31 +52,6 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> implements RequestCallback<ReadResponse>
{
protected static final Logger logger = LoggerFactory.getLogger( ReadCallback.class );
- private class WarningCounter
- {
- // the highest number of tombstones reported by a node's warning
- final AtomicInteger tombstoneWarnings = new AtomicInteger();
- final AtomicInteger maxTombstoneWarningCount = new AtomicInteger();
- // the highest number of tombstones reported by a node's rejection. This should be the same as
- // our configured limit, but including to aid in diagnosing misconfigurations
- final AtomicInteger tombstoneAborts = new AtomicInteger();
- final AtomicInteger maxTombstoneAbortsCount = new AtomicInteger();
-
- // TODO: take message as arg and return boolean for 'had warning' etc
- void addTombstoneWarning(InetAddressAndPort from, int tombstones)
- {
- if (!waitingFor(from)) return;
- tombstoneWarnings.incrementAndGet();
- maxTombstoneWarningCount.accumulateAndGet(tombstones, Math::max);
- }
-
- void addTombstoneAbort(InetAddressAndPort from, int tombstones)
- {
- if (!waitingFor(from)) return;
- tombstoneAborts.incrementAndGet();
- maxTombstoneAbortsCount.accumulateAndGet(tombstones, Math::max);
- }
- }
public final ResponseResolver<E, P> resolver;
final SimpleCondition condition = new SimpleCondition();
@@ -94,9 +65,9 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
= AtomicIntegerFieldUpdater.newUpdater(ReadCallback.class, "failures");
private volatile int failures = 0;
private final Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint;
- private volatile WarningCounter warningCounter;
- private static final AtomicReferenceFieldUpdater<ReadCallback, ReadCallback.WarningCounter> warningsUpdater
- = AtomicReferenceFieldUpdater.newUpdater(ReadCallback.class, ReadCallback.WarningCounter.class, "warningCounter");
+ private volatile WarningContext warningContext;
+ private static final AtomicReferenceFieldUpdater<ReadCallback, WarningContext> warningsUpdater
+ = AtomicReferenceFieldUpdater.newUpdater(ReadCallback.class, WarningContext.class, "warningContext");
public ReadCallback(ResponseResolver<E, P> resolver, ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime)
{
@@ -131,23 +102,6 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
}
}
- @VisibleForTesting
- public static String tombstoneAbortMessage(int nodes, int tombstones, String cql)
- {
- return String.format("%s nodes scanned over %s tombstones and aborted the query %s (see tombstone_failure_threshold)", nodes, tombstones, cql);
- }
-
- @VisibleForTesting
- public static String tombstoneWarnMessage(int nodes, int tombstones, String cql)
- {
- return String.format("%s nodes scanned up to %s tombstones and issued tombstone warnings for query %s (see tombstone_warn_threshold)", nodes, tombstones, cql);
- }
-
- private ColumnFamilyStore cfs()
- {
- return Schema.instance.getColumnFamilyStoreInstance(command.metadata().id);
- }
-
public void awaitResults() throws ReadFailureException, ReadTimeoutException
{
boolean signaled = await(command.getTimeout(MILLISECONDS), TimeUnit.MILLISECONDS);
@@ -160,24 +114,17 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
*/
int received = resolver.responses.size();
boolean failed = failures > 0 && (blockFor > received || !resolver.isDataPresent());
- WarningCounter warnings = warningCounter;
+ WarningContext warnings = warningContext;
+ // save the snapshot so abort state is not changed between now and when mayAbort gets called
+ WarningsSnapshot snapshot = null;
if (warnings != null)
{
- if (warnings.tombstoneAborts.get() > 0)
- {
- String msg = tombstoneAbortMessage(warnings.tombstoneAborts.get(), warnings.maxTombstoneAbortsCount.get(), command.toCQLString());
- ClientWarn.instance.warn(msg + " with " + command.loggableTokens());
- logger.warn(msg);
- cfs().metric.clientTombstoneAborts.mark();
- }
-
- if (warnings.tombstoneWarnings.get() > 0)
- {
- String msg = tombstoneWarnMessage(warnings.tombstoneWarnings.get(), warnings.maxTombstoneWarningCount.get(), command.toCQLString());
- ClientWarn.instance.warn(msg + " with " + command.loggableTokens());
- logger.warn(msg);
- cfs().metric.clientTombstoneWarnings.mark();
- }
+ snapshot = warnings.snapshot();
+ // this is possible due to a race condition between waiting and responding
+ // network thread creates the WarningContext to update metrics, but we are actively reading and see it is empty
+ // this is likely to happen when a timeout happens or from a speculative response
+ if (!snapshot.isEmpty())
+ CoordinatorWarnings.update(command, snapshot);
}
if (signaled && !failed)
return;
@@ -193,9 +140,8 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
logger.debug("{}; received {} of {} responses{}", failed ? "Failed" : "Timed out", received, blockFor, gotData);
}
- if (warnings != null && warnings.tombstoneAborts.get() > 0)
- throw new TombstoneAbortException(warnings.tombstoneAborts.get(), warnings.maxTombstoneAbortsCount.get(), command.toCQLString(), resolver.isDataPresent(),
- replicaPlan.get().consistencyLevel(), received, blockFor, failureReasonByEndpoint);
+ if (snapshot != null)
+ snapshot.maybeAbort(command, replicaPlan().consistencyLevel(), received, blockFor, resolver.isDataPresent(), failureReasonByEndpoint);
// Same as for writes, see AbstractWriteResponseHandler
throw failed
@@ -213,15 +159,15 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
{
assertWaitingFor(message.from());
Map<ParamType, Object> params = message.header.params();
- if (params.containsKey(ParamType.TOMBSTONE_ABORT))
+ InetAddressAndPort from = message.from();
+ if (WarningContext.isSupported(params.keySet()))
{
- getWarningCounter().addTombstoneAbort(message.from(), (Integer) params.get(ParamType.TOMBSTONE_ABORT));
- onFailure(message.from(), RequestFailureReason.READ_TOO_MANY_TOMBSTONES);
- return;
- }
- else if (params.containsKey(ParamType.TOMBSTONE_WARNING))
- {
- getWarningCounter().addTombstoneWarning(message.from(), (Integer) params.get(ParamType.TOMBSTONE_WARNING));
+ RequestFailureReason reason = getWarningContext().updateCounters(params, from);
+ if (reason != null)
+ {
+ onFailure(message.from(), reason);
+ return;
+ }
}
resolver.preprocess(message);
@@ -235,16 +181,16 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
condition.signalAll();
}
- private WarningCounter getWarningCounter()
+ private WarningContext getWarningContext()
{
- WarningCounter current;
+ WarningContext current;
do {
- current = warningCounter;
+ current = warningContext;
if (current != null)
return current;
- current = new WarningCounter();
+ current = new WarningContext();
} while (!warningsUpdater.compareAndSet(this, null, current));
return current;
}
@@ -286,12 +232,8 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
*/
private void assertWaitingFor(InetAddressAndPort from)
{
- assert waitingFor(from): "Received read response from unexpected replica: " + from;
- }
-
- private boolean waitingFor(InetAddressAndPort from)
- {
- return !replicaPlan().consistencyLevel().isDatacenterLocal()
- || DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(from));
+ assert !replicaPlan().consistencyLevel().isDatacenterLocal()
+ || DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(from))
+ : "Received read response from unexpected replica: " + from;
}
}
diff --git a/src/java/org/apache/cassandra/service/reads/trackwarnings/CoordinatorWarnings.java b/src/java/org/apache/cassandra/service/reads/trackwarnings/CoordinatorWarnings.java
new file mode 100644
index 0000000..076c816
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/reads/trackwarnings/CoordinatorWarnings.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.reads.trackwarnings;
+
+import java.util.AbstractMap;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.util.concurrent.FastThreadLocal;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.metrics.TableMetrics;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.service.ClientWarn;
+import org.apache.cassandra.service.reads.ReadCallback;
+
+public class CoordinatorWarnings
+{
+ private static final Logger logger = LoggerFactory.getLogger(CoordinatorWarnings.class);
+ private static final boolean ENABLE_DEFENSIVE_CHECKS = Boolean.getBoolean("cassandra.track_warnings.coordinator.defensive_checks_enabled");
+
+ // when .init() is called set the STATE to be INIT; this is to lazy allocate the map only when warnings are generated
+ private static final Map<ReadCommand, WarningsSnapshot> INIT = Collections.emptyMap();
+ private static final FastThreadLocal<Map<ReadCommand, WarningsSnapshot>> STATE = new FastThreadLocal<>();
+
+ private CoordinatorWarnings() {}
+
+ public static void init()
+ {
+ logger.trace("CoordinatorTrackWarnings.init()");
+ if (STATE.get() != null)
+ {
+ if (ENABLE_DEFENSIVE_CHECKS)
+ throw new AssertionError("CoordinatorTrackWarnings.init called while state is not null: " + STATE.get());
+ return;
+ }
+ STATE.set(INIT);
+ }
+
+ public static void reset()
+ {
+ logger.trace("CoordinatorTrackWarnings.reset()");
+ STATE.remove();
+ }
+
+ public static void update(ReadCommand cmd, WarningsSnapshot snapshot)
+ {
+ logger.trace("CoordinatorTrackWarnings.update({}, {})", cmd.metadata(), snapshot);
+ Map<ReadCommand, WarningsSnapshot> map = mutable();
+ WarningsSnapshot previous = map.get(cmd);
+ WarningsSnapshot update = WarningsSnapshot.merge(previous, snapshot);
+ if (update == null) // null happens when the merge had null input or EMPTY input... remove the command from the map
+ map.remove(cmd);
+ else
+ map.put(cmd, update);
+ }
+
+ public static void done()
+ {
+ Map<ReadCommand, WarningsSnapshot> map = readonly();
+ logger.trace("CoordinatorTrackWarnings.done() with state {}", map);
+ map.forEach((command, merged) -> {
+ ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(command.metadata().id);
+ // race condition when dropping tables, also happens in unit tests as Schema may be bypassed
+ if (cfs == null)
+ return;
+
+ String cql = command.toCQLString();
+ String loggableTokens = command.loggableTokens();
+ recordAborts(merged.tombstones, cql, loggableTokens, cfs.metric.clientTombstoneAborts, WarningsSnapshot::tombstoneAbortMessage);
+ recordWarnings(merged.tombstones, cql, loggableTokens, cfs.metric.clientTombstoneWarnings, WarningsSnapshot::tombstoneWarnMessage);
+
+ recordAborts(merged.localReadSize, cql, loggableTokens, cfs.metric.localReadSizeAborts, WarningsSnapshot::localReadSizeAbortMessage);
+ recordWarnings(merged.localReadSize, cql, loggableTokens, cfs.metric.localReadSizeWarnings, WarningsSnapshot::localReadSizeWarnMessage);
+
+ recordAborts(merged.rowIndexTooSize, cql, loggableTokens, cfs.metric.rowIndexSizeAborts, WarningsSnapshot::rowIndexSizeAbortMessage);
+ recordWarnings(merged.rowIndexTooSize, cql, loggableTokens, cfs.metric.rowIndexSizeWarnings, WarningsSnapshot::rowIndexSizeWarnMessage);
+ });
+
+ // reset the state to block from double publishing
+ clearState();
+ }
+
+ private static Map<ReadCommand, WarningsSnapshot> mutable()
+ {
+ Map<ReadCommand, WarningsSnapshot> map = STATE.get();
+ if (map == null)
+ {
+ if (ENABLE_DEFENSIVE_CHECKS)
+ throw new AssertionError("CoordinatorTrackWarnings.mutable calling without calling .init() first");
+ // set map to an "ignore" map; dropping all mutations
+ // since init was not called, it isn't clear that the state will be cleaned up, so avoid populating
+ map = IgnoreMap.get();
+ }
+ else if (map == INIT)
+ {
+ map = new HashMap<>();
+ STATE.set(map);
+ }
+ return map;
+ }
+
+ private static Map<ReadCommand, WarningsSnapshot> readonly()
+ {
+ Map<ReadCommand, WarningsSnapshot> map = STATE.get();
+ if (map == null)
+ {
+ if (ENABLE_DEFENSIVE_CHECKS)
+ throw new AssertionError("CoordinatorTrackWarnings.readonly calling without calling .init() first");
+ // since init was not called, it isn't clear that the state will be cleaned up, so avoid populating
+ map = Collections.emptyMap();
+ }
+ return map;
+ }
+
+ private static void clearState()
+ {
+ Map<ReadCommand, WarningsSnapshot> map = STATE.get();
+ if (map == null || map == INIT)
+ return;
+ // map is mutable, so set to INIT
+ STATE.set(INIT);
+ }
+
+ // utility interface to let callers use static functions
+ @FunctionalInterface
+ private interface ToString
+ {
+ String apply(int count, long value, String cql);
+ }
+
+ private static void recordAborts(WarningsSnapshot.Warnings counter, String cql, String loggableTokens, TableMetrics.TableMeter metric, ToString toString)
+ {
+ if (!counter.aborts.instances.isEmpty())
+ {
+ String msg = toString.apply(counter.aborts.instances.size(), counter.aborts.maxValue, cql);
+ ClientWarn.instance.warn(msg + " with " + loggableTokens);
+ logger.warn(msg);
+ metric.mark();
+ }
+ }
+
+ private static void recordWarnings(WarningsSnapshot.Warnings counter, String cql, String loggableTokens, TableMetrics.TableMeter metric, ToString toString)
+ {
+ if (!counter.warnings.instances.isEmpty())
+ {
+ String msg = toString.apply(counter.warnings.instances.size(), counter.warnings.maxValue, cql);
+ ClientWarn.instance.warn(msg + " with " + loggableTokens);
+ logger.warn(msg);
+ metric.mark();
+ }
+ }
+
+ /**
+ * Utility class to create an immutable map which does not fail on mutation but instead ignores it.
+ */
+ private static final class IgnoreMap extends AbstractMap<Object, Object>
+ {
+ private static final IgnoreMap INSTANCE = new IgnoreMap();
+
+ private static <K, V> Map<K, V> get()
+ {
+ return (Map<K, V>) INSTANCE;
+ }
+
+ @Override
+ public Object put(Object key, Object value)
+ {
+ return null;
+ }
+
+ @Override
+ public Set<Entry<Object, Object>> entrySet()
+ {
+ return Collections.emptySet();
+ }
+ }
+}
diff --git a/src/java/org/apache/cassandra/service/reads/trackwarnings/WarnAbortCounter.java b/src/java/org/apache/cassandra/service/reads/trackwarnings/WarnAbortCounter.java
new file mode 100644
index 0000000..55955c3
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/reads/trackwarnings/WarnAbortCounter.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.reads.trackwarnings;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.cassandra.locator.InetAddressAndPort;
+
+public class WarnAbortCounter
+{
+ final Set<InetAddressAndPort> warnings = Collections.newSetFromMap(new ConcurrentHashMap<>());
+ // the highest number reported by a node's warning
+ final AtomicLong maxWarningValue = new AtomicLong();
+
+ final Set<InetAddressAndPort> aborts = Collections.newSetFromMap(new ConcurrentHashMap<>());
+ // the highest number reported by a node's rejection.
+ final AtomicLong maxAbortsValue = new AtomicLong();
+
+ void addWarning(InetAddressAndPort from, long value)
+ {
+ maxWarningValue.accumulateAndGet(value, Math::max);
+ // call add last so concurrent reads see empty even if values > 0; if done in different order then
+ // size=1 could have values == 0
+ warnings.add(from);
+ }
+
+ void addAbort(InetAddressAndPort from, long value)
+ {
+ maxAbortsValue.accumulateAndGet(value, Math::max);
+ // call add last so concurrent reads see empty even if values > 0; if done in different order then
+ // size=1 could have values == 0
+ aborts.add(from);
+ }
+
+ public WarningsSnapshot.Warnings snapshot()
+ {
+ return WarningsSnapshot.Warnings.create(WarningsSnapshot.Counter.create(warnings, maxWarningValue), WarningsSnapshot.Counter.create(aborts, maxAbortsValue));
+ }
+}
diff --git a/src/java/org/apache/cassandra/service/reads/trackwarnings/WarningContext.java b/src/java/org/apache/cassandra/service/reads/trackwarnings/WarningContext.java
new file mode 100644
index 0000000..bbd52cf
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/reads/trackwarnings/WarningContext.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.reads.trackwarnings;
+
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.cassandra.exceptions.RequestFailureReason;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.ParamType;
+
+public class WarningContext
+{
+ private static EnumSet<ParamType> SUPPORTED = EnumSet.of(ParamType.TOMBSTONE_WARNING, ParamType.TOMBSTONE_ABORT,
+ ParamType.LOCAL_READ_SIZE_WARN, ParamType.LOCAL_READ_SIZE_ABORT,
+ ParamType.ROW_INDEX_SIZE_WARN, ParamType.ROW_INDEX_SIZE_ABORT);
+
+ final WarnAbortCounter tombstones = new WarnAbortCounter();
+ final WarnAbortCounter localReadSize = new WarnAbortCounter();
+ final WarnAbortCounter rowIndexTooLarge = new WarnAbortCounter();
+
+ public static boolean isSupported(Set<ParamType> keys)
+ {
+ return !Collections.disjoint(keys, SUPPORTED);
+ }
+
+ public RequestFailureReason updateCounters(Map<ParamType, Object> params, InetAddressAndPort from)
+ {
+ for (Map.Entry<ParamType, Object> entry : params.entrySet())
+ {
+ WarnAbortCounter counter = null;
+ RequestFailureReason reason = null;
+ switch (entry.getKey())
+ {
+ case ROW_INDEX_SIZE_ABORT:
+ reason = RequestFailureReason.READ_SIZE;
+ case ROW_INDEX_SIZE_WARN:
+ counter = rowIndexTooLarge;
+ break;
+ case LOCAL_READ_SIZE_ABORT:
+ reason = RequestFailureReason.READ_SIZE;
+ case LOCAL_READ_SIZE_WARN:
+ counter = localReadSize;
+ break;
+ case TOMBSTONE_ABORT:
+ reason = RequestFailureReason.READ_TOO_MANY_TOMBSTONES;
+ case TOMBSTONE_WARNING:
+ counter = tombstones;
+ break;
+ }
+ if (reason != null)
+ {
+ counter.addAbort(from, ((Number) entry.getValue()).longValue());
+ return reason;
+ }
+ if (counter != null)
+ counter.addWarning(from, ((Number) entry.getValue()).longValue());
+ }
+ return null;
+ }
+
+ public WarningsSnapshot snapshot()
+ {
+ return WarningsSnapshot.create(tombstones.snapshot(), localReadSize.snapshot(), rowIndexTooLarge.snapshot());
+ }
+}
diff --git a/src/java/org/apache/cassandra/service/reads/trackwarnings/WarningsSnapshot.java b/src/java/org/apache/cassandra/service/reads/trackwarnings/WarningsSnapshot.java
new file mode 100644
index 0000000..2995e94
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/reads/trackwarnings/WarningsSnapshot.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.reads.trackwarnings;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.exceptions.ReadSizeAbortException;
+import org.apache.cassandra.exceptions.RequestFailureReason;
+import org.apache.cassandra.exceptions.TombstoneAbortException;
+import org.apache.cassandra.locator.InetAddressAndPort;
+
+public class WarningsSnapshot
+{
+ private static final WarningsSnapshot EMPTY = new WarningsSnapshot(Warnings.EMPTY, Warnings.EMPTY, Warnings.EMPTY);
+
+ public final Warnings tombstones, localReadSize, rowIndexTooSize;
+
+ private WarningsSnapshot(Warnings tombstones, Warnings localReadSize, Warnings rowIndexTooSize)
+ {
+ this.tombstones = tombstones;
+ this.localReadSize = localReadSize;
+ this.rowIndexTooSize = rowIndexTooSize;
+ }
+
+ public static WarningsSnapshot empty()
+ {
+ return EMPTY;
+ }
+
+ public static WarningsSnapshot create(Warnings tombstones, Warnings localReadSize, Warnings rowIndexTooLarge)
+ {
+ if (tombstones == localReadSize && tombstones == rowIndexTooLarge && tombstones == Warnings.EMPTY)
+ return EMPTY;
+ return new WarningsSnapshot(tombstones, localReadSize, rowIndexTooLarge);
+ }
+
+ public static WarningsSnapshot merge(WarningsSnapshot... values)
+ {
+ if (values == null || values.length == 0)
+ return null;
+
+ WarningsSnapshot accum = EMPTY;
+ for (WarningsSnapshot a : values)
+ accum = accum.merge(a);
+ return accum == EMPTY ? null : accum;
+ }
+
+ public boolean isEmpty()
+ {
+ return this == EMPTY;
+ }
+
+ public boolean isDefined()
+ {
+ return this != EMPTY;
+ }
+
+ @VisibleForTesting
+ WarningsSnapshot merge(WarningsSnapshot other)
+ {
+ if (other == null || other == EMPTY)
+ return this;
+ return WarningsSnapshot.create(tombstones.merge(other.tombstones), localReadSize.merge(other.localReadSize), rowIndexTooSize.merge(other.rowIndexTooSize));
+ }
+
+ public void maybeAbort(ReadCommand command, ConsistencyLevel cl, int received, int blockFor, boolean isDataPresent, Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint)
+ {
+ if (!tombstones.aborts.instances.isEmpty())
+ throw new TombstoneAbortException(tombstones.aborts.instances.size(), tombstones.aborts.maxValue, command.toCQLString(), isDataPresent,
+ cl, received, blockFor, failureReasonByEndpoint);
+
+ if (!localReadSize.aborts.instances.isEmpty())
+ throw new ReadSizeAbortException(localReadSizeAbortMessage(localReadSize.aborts.instances.size(), localReadSize.aborts.maxValue, command.toCQLString()),
+ cl, received, blockFor, isDataPresent, failureReasonByEndpoint);
+
+ if (!rowIndexTooSize.aborts.instances.isEmpty())
+ throw new ReadSizeAbortException(rowIndexSizeAbortMessage(rowIndexTooSize.aborts.instances.size(), rowIndexTooSize.aborts.maxValue, command.toCQLString()),
+ cl, received, blockFor, isDataPresent, failureReasonByEndpoint);
+ }
+
+ @VisibleForTesting
+ public static String tombstoneAbortMessage(int nodes, long tombstones, String cql)
+ {
+ return String.format("%s nodes scanned over %s tombstones and aborted the query %s (see tombstone_failure_threshold)", nodes, tombstones, cql);
+ }
+
+ @VisibleForTesting
+ public static String tombstoneWarnMessage(int nodes, long tombstones, String cql)
+ {
+ return String.format("%s nodes scanned up to %s tombstones and issued tombstone warnings for query %s (see tombstone_warn_threshold)", nodes, tombstones, cql);
+ }
+
+ @VisibleForTesting
+ public static String localReadSizeAbortMessage(long nodes, long bytes, String cql)
+ {
+ return String.format("%s nodes loaded over %s bytes and aborted the query %s (see track_warnings.local_read_size.abort_threshold_kb)", nodes, bytes, cql);
+ }
+
+ @VisibleForTesting
+ public static String localReadSizeWarnMessage(int nodes, long bytes, String cql)
+ {
+ return String.format("%s nodes loaded over %s bytes and issued local read size warnings for query %s (see track_warnings.local_read_size.warn_threshold_kb)", nodes, bytes, cql);
+ }
+
+ @VisibleForTesting
+ public static String rowIndexSizeAbortMessage(long nodes, long bytes, String cql)
+ {
+ return String.format("%s nodes loaded over %s bytes in RowIndexEntry and aborted the query %s (see track_warnings.row_index_size.abort_threshold_kb)", nodes, bytes, cql);
+ }
+
+ @VisibleForTesting
+ public static String rowIndexSizeWarnMessage(int nodes, long bytes, String cql)
+ {
+ return String.format("%s nodes loaded over %s bytes in RowIndexEntry and issued warnings for query %s (see track_warnings.row_index_size.warn_threshold_kb)", nodes, bytes, cql);
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ WarningsSnapshot that = (WarningsSnapshot) o;
+ return Objects.equals(tombstones, that.tombstones) && Objects.equals(localReadSize, that.localReadSize) && Objects.equals(rowIndexTooSize, that.rowIndexTooSize);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(tombstones, localReadSize, rowIndexTooSize);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "(tombstones=" + tombstones + ", localReadSize=" + localReadSize + ", rowIndexTooLarge=" + rowIndexTooSize + ')';
+ }
+
+ public static final class Warnings
+ {
+ private static final Warnings EMPTY = new Warnings(Counter.EMPTY, Counter.EMPTY);
+
+ public final Counter warnings;
+ public final Counter aborts;
+
+ private Warnings(Counter warnings, Counter aborts)
+ {
+ this.warnings = warnings;
+ this.aborts = aborts;
+ }
+
+ public static Warnings create(Counter warnings, Counter aborts)
+ {
+ if (warnings == Counter.EMPTY && aborts == Counter.EMPTY)
+ return EMPTY;
+ return new Warnings(warnings, aborts);
+ }
+
+ public Warnings merge(Warnings other)
+ {
+ if (other == EMPTY)
+ return this;
+ return Warnings.create(warnings.merge(other.warnings), aborts.merge(other.aborts));
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Warnings warnings1 = (Warnings) o;
+ return Objects.equals(warnings, warnings1.warnings) && Objects.equals(aborts, warnings1.aborts);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(warnings, aborts);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "(warnings=" + warnings + ", aborts=" + aborts + ')';
+ }
+ }
+
+ public static final class Counter
+ {
+ private static final Counter EMPTY = new Counter(ImmutableSet.of(), 0);
+
+ public final ImmutableSet<InetAddressAndPort> instances;
+ public final long maxValue;
+
+ @VisibleForTesting
+ Counter(ImmutableSet<InetAddressAndPort> instances, long maxValue)
+ {
+ this.instances = instances;
+ this.maxValue = maxValue;
+ }
+
+ @VisibleForTesting
+ static Counter empty()
+ {
+ return EMPTY;
+ }
+
+ public static Counter create(Set<InetAddressAndPort> instances, AtomicLong maxValue)
+ {
+ ImmutableSet<InetAddressAndPort> copy = ImmutableSet.copyOf(instances);
+ // if instances is empty ignore value
+ // writes and reads are concurrent (write = networking callback, read = coordinator thread), so there is
+ // an edge case where instances is empty and maxValue > 0; this is caused by the fact we update value first before count
+ // we write: value then instance
+ // we read: instance then value
+ if (copy.isEmpty())
+ return EMPTY;
+ return new Counter(copy, maxValue.get());
+ }
+
+ public Counter merge(Counter other)
+ {
+ if (other == EMPTY)
+ return this;
+ ImmutableSet<InetAddressAndPort> copy = ImmutableSet.<InetAddressAndPort>builder()
+ .addAll(instances)
+ .addAll(other.instances)
+ .build();
+ // since other is NOT empty, then output can not be empty; so skip create method
+ return new Counter(copy, Math.max(maxValue, other.maxValue));
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Counter counter = (Counter) o;
+ return maxValue == counter.maxValue && Objects.equals(instances, counter.instances);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(instances, maxValue);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "(" + instances + ", " + maxValue + ')';
+ }
+ }
+
+ @VisibleForTesting
+ static Builder builder()
+ {
+ return new Builder();
+ }
+
+ @VisibleForTesting
+ public static final class Builder
+ {
+ private WarningsSnapshot snapshot = empty();
+
+ public Builder tombstonesWarning(ImmutableSet<InetAddressAndPort> instances, long maxValue)
+ {
+ return tombstonesWarning(new Counter(Objects.requireNonNull(instances), maxValue));
+ }
+
+ public Builder tombstonesWarning(Counter counter)
+ {
+ Objects.requireNonNull(counter);
+ snapshot = snapshot.merge(new WarningsSnapshot(new Warnings(counter, Counter.EMPTY), Warnings.EMPTY, Warnings.EMPTY));
+ return this;
+ }
+
+ public Builder tombstonesAbort(ImmutableSet<InetAddressAndPort> instances, long maxValue)
+ {
+ return tombstonesAbort(new Counter(Objects.requireNonNull(instances), maxValue));
+ }
+
+ public Builder tombstonesAbort(Counter counter)
+ {
+ Objects.requireNonNull(counter);
+ snapshot = snapshot.merge(new WarningsSnapshot(new Warnings(Counter.EMPTY, counter), Warnings.EMPTY, Warnings.EMPTY));
+ return this;
+ }
+
+ public Builder localReadSizeWarning(ImmutableSet<InetAddressAndPort> instances, long maxValue)
+ {
+ return localReadSizeWarning(new Counter(Objects.requireNonNull(instances), maxValue));
+ }
+
+ public Builder localReadSizeWarning(Counter counter)
+ {
+ Objects.requireNonNull(counter);
+ snapshot = snapshot.merge(new WarningsSnapshot(Warnings.EMPTY, new Warnings(counter, Counter.EMPTY), Warnings.EMPTY));
+ return this;
+ }
+
+ public Builder localReadSizeAbort(ImmutableSet<InetAddressAndPort> instances, long maxValue)
+ {
+ return localReadSizeAbort(new Counter(Objects.requireNonNull(instances), maxValue));
+ }
+
+ public Builder localReadSizeAbort(Counter counter)
+ {
+ Objects.requireNonNull(counter);
+ snapshot = snapshot.merge(new WarningsSnapshot(Warnings.EMPTY, new Warnings(Counter.EMPTY, counter), Warnings.EMPTY));
+ return this;
+ }
+
+ public Builder rowIndexSizeWarning(Counter counter)
+ {
+ Objects.requireNonNull(counter);
+ snapshot = snapshot.merge(new WarningsSnapshot(Warnings.EMPTY, Warnings.EMPTY, new Warnings(counter, Counter.EMPTY)));
+ return this;
+ }
+
+ public Builder rowIndexSizeAbort(Counter counter)
+ {
+ Objects.requireNonNull(counter);
+ snapshot = snapshot.merge(new WarningsSnapshot(Warnings.EMPTY, Warnings.EMPTY, new Warnings(Counter.EMPTY, counter)));
+ return this;
+ }
+
+ public WarningsSnapshot build()
+ {
+ return snapshot;
+ }
+ }
+}
diff --git a/src/java/org/apache/cassandra/transport/Dispatcher.java b/src/java/org/apache/cassandra/transport/Dispatcher.java
index 9b0a8f2..29d4748 100644
--- a/src/java/org/apache/cassandra/transport/Dispatcher.java
+++ b/src/java/org/apache/cassandra/transport/Dispatcher.java
@@ -35,6 +35,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.net.FrameEncoder;
import org.apache.cassandra.service.ClientWarn;
import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.service.reads.trackwarnings.CoordinatorWarnings;
import org.apache.cassandra.transport.ClientResourceLimits.Overload;
import org.apache.cassandra.transport.Flusher.FlushItem;
import org.apache.cassandra.transport.messages.ErrorMessage;
@@ -88,6 +89,11 @@ public class Dispatcher
if (connection.getVersion().isGreaterOrEqualTo(ProtocolVersion.V4))
ClientWarn.instance.captureWarnings();
+ // even if ClientWarn is disabled, still setup CoordinatorTrackWarnings, as this will populate metrics and
+ // emit logs on the server; the warnings will just be ignored and not sent to the client
+ if (request.isTrackable())
+ CoordinatorWarnings.init();
+
if (backpressure == Overload.REQUESTS)
{
String message = String.format("Request breached global limit of %d requests/second and triggered backpressure.",
@@ -110,6 +116,10 @@ public class Dispatcher
Message.logger.trace("Received: {}, v={}", request, connection.getVersion());
connection.requests.inc();
Message.Response response = request.execute(qstate, queryStartNanoTime);
+
+ if (request.isTrackable())
+ CoordinatorWarnings.done();
+
response.setStreamId(request.getStreamId());
response.setWarnings(ClientWarn.instance.getWarnings());
response.attach(connection);
@@ -136,13 +146,19 @@ public class Dispatcher
catch (Throwable t)
{
JVMStabilityInspector.inspectThrowable(t);
+
+ if (request.isTrackable())
+ CoordinatorWarnings.done();
+
Predicate<Throwable> handler = ExceptionHandlers.getUnexpectedExceptionHandler(channel, true);
ErrorMessage error = ErrorMessage.fromException(t, handler);
error.setStreamId(request.getStreamId());
+ error.setWarnings(ClientWarn.instance.getWarnings());
toFlush = forFlusher.toFlushItem(channel, request, error);
}
finally
{
+ CoordinatorWarnings.reset();
ClientWarn.instance.resetWarnings();
}
flush(toFlush);
diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java
index 0284489..0f8002f 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -209,11 +209,22 @@ public abstract class Message
throw new IllegalArgumentException();
}
+ /**
+ * @return true if the execution of this {@link Request} should be recorded in a tracing session
+ */
protected boolean isTraceable()
{
return false;
}
+ /**
+ * @return true if warnings should be tracked and aborts enforced for resource limits on this {@link Request}
+ */
+ protected boolean isTrackable()
+ {
+ return false;
+ }
+
protected abstract Response execute(QueryState queryState, long queryStartNanoTime, boolean traceRequest);
public final Response execute(QueryState queryState, long queryStartNanoTime)
diff --git a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
index e760960..59b5b53 100644
--- a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
@@ -161,6 +161,12 @@ public class BatchMessage extends Message.Request
}
@Override
+ protected boolean isTrackable()
+ {
+ return true;
+ }
+
+ @Override
protected Message.Response execute(QueryState state, long queryStartNanoTime, boolean traceRequest)
{
List<QueryHandler.Prepared> prepared = null;
diff --git a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
index 3b98996..05186fb 100644
--- a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
@@ -110,6 +110,12 @@ public class ExecuteMessage extends Message.Request
}
@Override
+ protected boolean isTrackable()
+ {
+ return true;
+ }
+
+ @Override
protected Message.Response execute(QueryState state, long queryStartNanoTime, boolean traceRequest)
{
QueryHandler.Prepared prepared = null;
diff --git a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
index 71d7c73..4d5d1e1 100644
--- a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
@@ -90,6 +90,12 @@ public class QueryMessage extends Message.Request
}
@Override
+ protected boolean isTrackable()
+ {
+ return true;
+ }
+
+ @Override
protected Message.Response execute(QueryState state, long queryStartNanoTime, boolean traceRequest)
{
CQLStatement statement = null;
diff --git a/src/java/org/apache/cassandra/exceptions/TombstoneAbortException.java b/src/java/org/apache/cassandra/utils/Int64Serializer.java
similarity index 50%
copy from src/java/org/apache/cassandra/exceptions/TombstoneAbortException.java
copy to src/java/org/apache/cassandra/utils/Int64Serializer.java
index e86e760..3f65d60 100644
--- a/src/java/org/apache/cassandra/exceptions/TombstoneAbortException.java
+++ b/src/java/org/apache/cassandra/utils/Int64Serializer.java
@@ -16,24 +16,34 @@
* limitations under the License.
*/
-package org.apache.cassandra.exceptions;
+package org.apache.cassandra.utils;
-import java.util.Map;
+import java.io.IOException;
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
-import static org.apache.cassandra.service.reads.ReadCallback.tombstoneAbortMessage;
-
-public class TombstoneAbortException extends ReadAbortException
+public class Int64Serializer implements IVersionedSerializer<Long>
{
- public final int nodes;
- public final int tombstones;
+ public static final Int64Serializer serializer = new Int64Serializer();
+
+ @Override
+ public void serialize(Long t, DataOutputPlus out, int version) throws IOException
+ {
+ out.writeLong(t);
+ }
+
+ @Override
+ public Long deserialize(DataInputPlus in, int version) throws IOException
+ {
+ return in.readLong();
+ }
- public TombstoneAbortException(int nodes, int tombstones, String cql, boolean dataPresent, ConsistencyLevel consistency, int received, int blockFor, Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint)
+ @Override
+ public long serializedSize(Long t, int version)
{
- super(tombstoneAbortMessage(nodes, tombstones, cql), consistency, received, blockFor, dataPresent, failureReasonByEndpoint);
- this.nodes = nodes;
- this.tombstones = tombstones;
+ return TypeSizes.sizeof(t);
}
}
diff --git a/src/java/org/apache/cassandra/utils/ObjectSizes.java b/src/java/org/apache/cassandra/utils/ObjectSizes.java
index 04e5c65..a8e8b6c 100644
--- a/src/java/org/apache/cassandra/utils/ObjectSizes.java
+++ b/src/java/org/apache/cassandra/utils/ObjectSizes.java
@@ -32,7 +32,6 @@ import org.github.jamm.MemoryMeter;
public class ObjectSizes
{
private static final MemoryMeter meter = new MemoryMeter()
- .omitSharedBufferOverhead()
.withGuessing(MemoryMeter.Guess.FALLBACK_UNSAFE)
.ignoreKnownSingletons();
@@ -125,7 +124,7 @@ public class ObjectSizes
// if we're only referencing a sub-portion of the ByteBuffer, don't count the array overhead (assume it's slab
// allocated, so amortized over all the allocations the overhead is negligible and better to undercount than over)
if (buffer.capacity() > buffer.remaining())
- return buffer.remaining();
+ return BUFFER_EMPTY_SIZE + buffer.remaining();
return BUFFER_EMPTY_SIZE + sizeOfArray(buffer.capacity(), 1);
}
diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml
index c8ac72b..d556852 100644
--- a/test/conf/cassandra.yaml
+++ b/test/conf/cassandra.yaml
@@ -53,6 +53,14 @@ enable_materialized_views: true
enable_drop_compact_storage: true
file_cache_enabled: true
auto_hints_cleanup_enabled: true
-client_track_warnings_enabled: true
-client_large_read_warn_threshold_kb: 1024
-client_large_read_abort_threshold_kb: 4096
+track_warnings:
+ enabled: true
+ coordinator_read_size:
+ warn_threshold_kb: 1024
+ abort_threshold_kb: 4096
+ local_read_size:
+ warn_threshold_kb: 4096
+ abort_threshold_kb: 8192
+ row_index_size:
+ warn_threshold_kb: 4096
+ abort_threshold_kb: 8192
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java b/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
index e31ce2c..02f01c4 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
@@ -32,7 +32,6 @@ import com.google.common.collect.Iterators;
import org.apache.cassandra.cql3.CQLStatement;
import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.cql3.QueryProcessor;
-import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.cql3.statements.SelectStatement;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.ICoordinator;
@@ -43,8 +42,7 @@ import org.apache.cassandra.distributed.api.SimpleQueryResult;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.ClientWarn;
import org.apache.cassandra.service.QueryState;
-import org.apache.cassandra.service.pager.QueryPager;
-import org.apache.cassandra.transport.ClientStat;
+import org.apache.cassandra.service.reads.trackwarnings.CoordinatorWarnings;
import org.apache.cassandra.transport.ProtocolVersion;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.transport.messages.ResultMessage;
@@ -99,23 +97,36 @@ public class Coordinator implements ICoordinator
// Start capturing warnings on this thread. Note that this will implicitly clear out any previous
// warnings as it sets a new State instance on the ThreadLocal.
ClientWarn.instance.captureWarnings();
-
- ResultMessage res = prepared.execute(QueryState.forInternalCalls(),
- QueryOptions.create(toCassandraCL(consistencyLevel),
- boundBBValues,
- false,
- Integer.MAX_VALUE,
- null,
- null,
- ProtocolVersion.CURRENT,
- null),
- System.nanoTime());
-
- // Collect warnings reported during the query.
- if (res != null)
- res.setWarnings(ClientWarn.instance.getWarnings());
-
- return RowUtil.toQueryResult(res);
+ CoordinatorWarnings.init();
+ try
+ {
+ ResultMessage res = prepared.execute(QueryState.forInternalCalls(),
+ QueryOptions.create(toCassandraCL(consistencyLevel),
+ boundBBValues,
+ false,
+ Integer.MAX_VALUE,
+ null,
+ null,
+ ProtocolVersion.CURRENT,
+ null),
+ System.nanoTime());
+ // Collect warnings reported during the query.
+ CoordinatorWarnings.done();
+ if (res != null)
+ res.setWarnings(ClientWarn.instance.getWarnings());
+
+ return RowUtil.toQueryResult(res);
+ }
+ catch (Exception | Error e)
+ {
+ CoordinatorWarnings.done();
+ throw e;
+ }
+ finally
+ {
+ CoordinatorWarnings.reset();
+ ClientWarn.instance.resetWarnings();
+ }
}
public Object[][] executeWithTracing(UUID sessionId, String query, ConsistencyLevel consistencyLevelOrigin, Object... boundValues)
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 4079707..2a6b638 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -23,6 +23,7 @@ import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
+import java.net.BindException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.security.Permission;
@@ -108,11 +109,13 @@ import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.CassandraDaemon;
import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.ClientWarn;
import org.apache.cassandra.service.DefaultFSErrorHandler;
import org.apache.cassandra.service.PendingRangeCalculatorService;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.StorageServiceMBean;
+import org.apache.cassandra.service.reads.trackwarnings.CoordinatorWarnings;
import org.apache.cassandra.service.snapshot.SnapshotManager;
import org.apache.cassandra.streaming.StreamReceiveTask;
import org.apache.cassandra.streaming.StreamTransferTask;
@@ -219,10 +222,29 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
public SimpleQueryResult executeInternalWithResult(String query, Object... args)
{
return sync(() -> {
- QueryHandler.Prepared prepared = QueryProcessor.prepareInternal(query);
- ResultMessage result = prepared.statement.executeLocally(QueryProcessor.internalQueryState(),
- QueryProcessor.makeInternalOptions(prepared.statement, args));
- return RowUtil.toQueryResult(result);
+ ClientWarn.instance.captureWarnings();
+ CoordinatorWarnings.init();
+ try
+ {
+ QueryHandler.Prepared prepared = QueryProcessor.prepareInternal(query);
+ ResultMessage result = prepared.statement.executeLocally(QueryProcessor.internalQueryState(),
+ QueryProcessor.makeInternalOptions(prepared.statement, args));
+ CoordinatorWarnings.done();
+
+ if (result != null)
+ result.setWarnings(ClientWarn.instance.getWarnings());
+ return RowUtil.toQueryResult(result);
+ }
+ catch (Exception | Error e)
+ {
+ CoordinatorWarnings.done();
+ throw e;
+ }
+ finally
+ {
+ CoordinatorWarnings.reset();
+ ClientWarn.instance.resetWarnings();
+ }
}).call();
}
@@ -545,7 +567,18 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
if (config.has(GOSSIP))
{
MigrationManager.setUptimeFn(() -> TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startedAt));
- StorageService.instance.initServer();
+ try
+ {
+ StorageService.instance.initServer();
+ }
+ catch (Exception e)
+ {
+ // I am tired of looking up my notes for how to fix this... so why not tell the user?
+ Throwable cause = com.google.common.base.Throwables.getRootCause(e);
+ if (cause instanceof BindException && "Can't assign requested address".equals(cause.getMessage()))
+ throw new RuntimeException("Unable to bind, run the following in a termanl and try again:\nfor subnet in $(seq 0 5); do for id in $(seq 0 5); do sudo ifconfig lo0 alias \"127.0.$subnet.$id\"; done; done;", e);
+ throw e;
+ }
StorageService.instance.removeShutdownHook();
Gossiper.waitToSettle();
}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/ClientReadSizeWarningTest.java b/test/distributed/org/apache/cassandra/distributed/test/ClientReadSizeWarningTest.java
deleted file mode 100644
index 2d790f4..0000000
--- a/test/distributed/org/apache/cassandra/distributed/test/ClientReadSizeWarningTest.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.distributed.test;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Random;
-import java.util.function.Consumer;
-
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import org.junit.*;
-
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.SimpleStatement;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.cql3.QueryProcessor;
-import org.apache.cassandra.distributed.Cluster;
-import org.apache.cassandra.distributed.api.*;
-import org.apache.cassandra.exceptions.ReadSizeAbortException;
-import org.apache.cassandra.exceptions.RequestFailureReason;
-import org.apache.cassandra.service.ClientWarn;
-import org.apache.cassandra.service.QueryState;
-import org.assertj.core.api.Assertions;
-import org.assertj.core.api.Condition;
-
-/**
- * ReadSize client warn/abort is coordinator only, so the fact ClientMetrics is coordinator only does not
- * impact the user experience
- */
-public class ClientReadSizeWarningTest extends TestBaseImpl
-{
- private static final Random RANDOM = new Random(0);
- private static ICluster<IInvokableInstance> CLUSTER;
- private static com.datastax.driver.core.Cluster JAVA_DRIVER;
- private static com.datastax.driver.core.Session JAVA_DRIVER_SESSION;
-
- @BeforeClass
- public static void setupClass() throws IOException
- {
- Cluster.Builder builder = Cluster.build(3);
- builder.withConfig(c -> c.with(Feature.NATIVE_PROTOCOL, Feature.GOSSIP));
- CLUSTER = builder.start();
- JAVA_DRIVER = JavaDriverUtils.create(CLUSTER);
- JAVA_DRIVER_SESSION = JAVA_DRIVER.connect();
-
- // setup threshold after init to avoid driver issues loading
- // the test uses a rather small limit, which causes driver to fail while loading metadata
- CLUSTER.stream().forEach(i -> i.runOnInstance(() -> {
- DatabaseDescriptor.setClientLargeReadWarnThresholdKB(1);
- DatabaseDescriptor.setClientLargeReadAbortThresholdKB(2);
- }));
- }
-
- @Before
- public void setup()
- {
- CLUSTER.schemaChange("DROP KEYSPACE IF EXISTS " + KEYSPACE);
- init(CLUSTER);
- CLUSTER.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v blob, PRIMARY KEY (pk, ck))");
- }
-
- private static void enable(boolean value)
- {
- CLUSTER.stream().forEach(i -> i.runOnInstance(() -> DatabaseDescriptor.setClientTrackWarningsEnabled(value)));
- }
-
- private static void assertPrefix(String expectedPrefix, String actual)
- {
- if (!actual.startsWith(expectedPrefix))
- throw new AssertionError(String.format("expected \"%s\" to begin with \"%s\"", actual, expectedPrefix));
- }
-
- private static ByteBuffer bytes(int size)
- {
- byte[] b = new byte[size];
- RANDOM.nextBytes(b);
- return ByteBuffer.wrap(b);
- }
-
- @Test
- public void noWarningsSinglePartition()
- {
- noWarnings("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk=1");
- }
-
- @Test
- public void noWarningsScan()
- {
- noWarnings("SELECT * FROM " + KEYSPACE + ".tbl");
- }
-
- public void noWarnings(String cql)
- {
- CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, ?)", ConsistencyLevel.ALL, bytes(128));
- CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, ?)", ConsistencyLevel.ALL, bytes(128));
-
- Consumer<List<String>> test = warnings ->
- Assert.assertEquals(Collections.emptyList(), warnings);
-
- for (boolean b : Arrays.asList(true, false))
- {
- enable(b);
- SimpleQueryResult result = CLUSTER.coordinator(1).executeWithResult(cql, ConsistencyLevel.ALL);
- test.accept(result.warnings());
- test.accept(driverQueryAll(cql).getExecutionInfo().getWarnings());
- assertWarnAborts(0, 0, 0);
- }
- }
-
- @Test
- public void warnThresholdSinglePartition()
- {
- warnThreshold("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk=1");
- }
-
- @Test
- public void warnThresholdScan()
- {
- warnThreshold("SELECT * FROM " + KEYSPACE + ".tbl");
- }
-
- public void warnThreshold(String cql)
- {
- CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, ?)", ConsistencyLevel.ALL, bytes(512));
- CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, ?)", ConsistencyLevel.ALL, bytes(512));
-
- Consumer<List<String>> testEnabled = warnings ->
- assertPrefix("Read on table " + KEYSPACE + ".tbl has exceeded the size warning threshold", Iterables.getOnlyElement(warnings));
-
- enable(true);
- SimpleQueryResult result = CLUSTER.coordinator(1).executeWithResult(cql, ConsistencyLevel.ALL);
- testEnabled.accept(result.warnings());
- assertWarnAborts(1, 0, 0);
- testEnabled.accept(driverQueryAll(cql).getExecutionInfo().getWarnings());
- assertWarnAborts(2, 0, 0);
-
- enable(false);
- result = CLUSTER.coordinator(1).executeWithResult(cql, ConsistencyLevel.ALL);
- Assertions.assertThat(result.warnings()).isEmpty();
- Assertions.assertThat(driverQueryAll(cql).getExecutionInfo().getWarnings()).isEmpty();
- assertWarnAborts(2, 0, 0);
- }
-
- @Test
- public void failThresholdSinglePartition() throws UnknownHostException
- {
- failThreshold("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk=1");
- }
-
- @Test
- public void failThresholdScan() throws UnknownHostException
- {
- failThreshold("SELECT * FROM " + KEYSPACE + ".tbl");
- }
-
- public void failThreshold(String cql) throws UnknownHostException
- {
- CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, ?)", ConsistencyLevel.ALL, bytes(512));
- CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, ?)", ConsistencyLevel.ALL, bytes(512));
- CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 3, ?)", ConsistencyLevel.ALL, bytes(512));
- CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 4, ?)", ConsistencyLevel.ALL, bytes(512));
- CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 5, ?)", ConsistencyLevel.ALL, bytes(512));
-
- enable(true);
- List<String> warnings = CLUSTER.get(1).callsOnInstance(() -> {
- ClientWarn.instance.captureWarnings();
- try
- {
- QueryProcessor.execute(cql, org.apache.cassandra.db.ConsistencyLevel.ALL, QueryState.forInternalCalls());
- Assert.fail("Expected query failure");
- }
- catch (ReadSizeAbortException e)
- {
- // expected, client transport returns an error message and includes client warnings
- }
- return ClientWarn.instance.getWarnings();
- }).call();
- Assertions.assertThat(warnings).hasSize(1);
- assertPrefix("Read on table " + KEYSPACE + ".tbl has exceeded the size failure threshold", warnings.get(0));
- assertWarnAborts(0, 1, 1);
-
- try
- {
- driverQueryAll(cql);
- Assert.fail("Query should have thrown ReadFailureException");
- }
- catch (com.datastax.driver.core.exceptions.ReadFailureException e)
- {
- // without changing the client can't produce a better message...
- // client does NOT include the message sent from the server in the exception; so the message doesn't work
- // well in this case
- Assertions.assertThat(e.getMessage()).endsWith("(1 responses were required but only 0 replica responded, 1 failed)");
- ImmutableSet<InetAddress> expectedKeys = ImmutableSet.of(InetAddress.getByAddress(new byte[]{ 127, 0, 0, 1 }), InetAddress.getByAddress(new byte[]{ 127, 0, 0, 2 }), InetAddress.getByAddress(new byte[]{ 127, 0, 0, 3 }));
- Assertions.assertThat(e.getFailuresMap())
- .hasSize(1)
- // coordinator changes from run to run, so can't assert map as the key is dynamic... so assert the domain of keys and the single value expect
- .containsValue(RequestFailureReason.READ_TOO_LARGE.code)
- .hasKeySatisfying(new Condition<InetAddress>() {
- public boolean matches(InetAddress value)
- {
- return expectedKeys.contains(value);
- }
- });
- }
- assertWarnAborts(0, 2, 1);
-
- // query should no longer fail
- enable(false);
- SimpleQueryResult result = CLUSTER.coordinator(1).executeWithResult(cql, ConsistencyLevel.ALL);
- Assertions.assertThat(result.warnings()).isEmpty();
- Assertions.assertThat(driverQueryAll(cql).getExecutionInfo().getWarnings()).isEmpty();
- assertWarnAborts(0, 2, 0);
- }
-
- private static long GLOBAL_READ_ABORTS = 0;
- private static void assertWarnAborts(int warns, int aborts, int globalAborts)
- {
- Assertions.assertThat(totalWarnings()).as("warnings").isEqualTo(warns);
- Assertions.assertThat(totalAborts()).as("aborts").isEqualTo(aborts);
- long expectedGlobalAborts = GLOBAL_READ_ABORTS + globalAborts;
- Assertions.assertThat(totalReadAborts()).as("global aborts").isEqualTo(expectedGlobalAborts);
- GLOBAL_READ_ABORTS = expectedGlobalAborts;
- }
-
- private static long totalWarnings()
- {
- return CLUSTER.stream().mapToLong(i -> i.metrics().getCounter("org.apache.cassandra.metrics.keyspace.ClientReadSizeWarnings." + KEYSPACE)).sum();
- }
-
- private static long totalAborts()
- {
- return CLUSTER.stream().mapToLong(i -> i.metrics().getCounter("org.apache.cassandra.metrics.keyspace.ClientReadSizeAborts." + KEYSPACE)).sum();
- }
-
- private static long totalReadAborts()
- {
- return CLUSTER.stream().mapToLong(i -> i.metrics().getCounter("org.apache.cassandra.metrics.ClientRequest.Aborts.Read-ALL")).sum();
- }
-
- private static ResultSet driverQueryAll(String cql)
- {
- return JAVA_DRIVER_SESSION.execute(new SimpleStatement(cql).setConsistencyLevel(com.datastax.driver.core.ConsistencyLevel.ALL));
- }
-}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/trackwarnings/AbstractClientSizeWarning.java b/test/distributed/org/apache/cassandra/distributed/test/trackwarnings/AbstractClientSizeWarning.java
new file mode 100644
index 0000000..42670c7
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/trackwarnings/AbstractClientSizeWarning.java
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.trackwarnings;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.function.Consumer;
+
+import com.google.common.collect.ImmutableSet;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.SimpleStatement;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.ICoordinator;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.test.JavaDriverUtils;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.exceptions.ReadSizeAbortException;
+import org.apache.cassandra.exceptions.RequestFailureReason;
+import org.apache.cassandra.service.ClientWarn;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.service.reads.trackwarnings.CoordinatorWarnings;
+import org.assertj.core.api.Condition;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public abstract class AbstractClientSizeWarning extends TestBaseImpl
+{
+ private static final String CQL_PK_READ = "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk=1";
+ private static final String CQL_TABLE_SCAN = "SELECT * FROM " + KEYSPACE + ".tbl";
+
+ private static final Random RANDOM = new Random(0);
+ protected static ICluster<IInvokableInstance> CLUSTER;
+ protected static com.datastax.driver.core.Cluster JAVA_DRIVER;
+ protected static com.datastax.driver.core.Session JAVA_DRIVER_SESSION;
+
+ @BeforeClass
+ public static void setupClass() throws IOException
+ {
+ Cluster.Builder builder = Cluster.build(3);
+ builder.withConfig(c -> c.with(Feature.NATIVE_PROTOCOL, Feature.GOSSIP));
+ CLUSTER = builder.start();
+ JAVA_DRIVER = JavaDriverUtils.create(CLUSTER);
+ JAVA_DRIVER_SESSION = JAVA_DRIVER.connect();
+ }
+
+ protected abstract long totalWarnings();
+ protected abstract long totalAborts();
+ protected abstract void assertWarnings(List<String> warnings);
+ protected abstract void assertAbortWarnings(List<String> warnings);
+ protected boolean shouldFlush()
+ {
+ return false;
+ }
+
+ @Before
+ public void setup()
+ {
+ CLUSTER.schemaChange("DROP KEYSPACE IF EXISTS " + KEYSPACE);
+ init(CLUSTER);
+ // disable key cache so RowIndexEntry is read each time
+ CLUSTER.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v blob, PRIMARY KEY (pk, ck)) WITH caching = { 'keys' : 'NONE'}");
+ }
+
+ @Test
+ public void noWarningsSinglePartition()
+ {
+ noWarnings(CQL_PK_READ);
+ }
+
+ @Test
+ public void noWarningsScan()
+ {
+ noWarnings(CQL_TABLE_SCAN);
+ }
+
+ public void noWarnings(String cql)
+ {
+ CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, ?)", ConsistencyLevel.ALL, bytes(128));
+ CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, ?)", ConsistencyLevel.ALL, bytes(128));
+ if (shouldFlush())
+ CLUSTER.stream().forEach(i -> i.flush(KEYSPACE));
+
+ Consumer<List<String>> test = warnings ->
+ Assert.assertEquals(Collections.emptyList(), warnings);
+
+ for (boolean b : Arrays.asList(true, false))
+ {
+ enable(b);
+ checkpointHistogram();
+ SimpleQueryResult result = CLUSTER.coordinator(1).executeWithResult(cql, ConsistencyLevel.ALL);
+ test.accept(result.warnings());
+ if (b)
+ {
+ assertHistogramUpdated();
+ }
+ else
+ {
+ assertHistogramNotUpdated();
+ }
+ test.accept(driverQueryAll(cql).getExecutionInfo().getWarnings());
+ if (b)
+ {
+ assertHistogramUpdated();
+ }
+ else
+ {
+ assertHistogramNotUpdated();
+ }
+ assertWarnAborts(0, 0, 0);
+ }
+ }
+
+ @Test
+ public void warnThresholdSinglePartition()
+ {
+ warnThreshold(CQL_PK_READ, false);
+ }
+
+ @Test
+ public void warnThresholdScan()
+ {
+ warnThreshold(CQL_TABLE_SCAN, false);
+ }
+
+ @Test
+ public void warnThresholdSinglePartitionWithReadRepair()
+ {
+ warnThreshold(CQL_PK_READ, true);
+ }
+
+ @Test
+ public void warnThresholdScanWithReadRepair()
+ {
+ warnThreshold(CQL_TABLE_SCAN, true);
+ }
+
+ protected int warnThresholdRowCount()
+ {
+ return 2;
+ }
+
+ public void warnThreshold(String cql, boolean triggerReadRepair)
+ {
+ for (int i = 0; i < warnThresholdRowCount(); i++)
+ {
+ if (triggerReadRepair)
+ {
+ int finalI = i;
+ // cell timestamps will not match (even though the values match) which will trigger a read-repair
+ CLUSTER.stream().forEach(node -> node.executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)", finalI + 1, bytes(512)));
+ }
+ else
+ {
+ CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)", ConsistencyLevel.ALL, i + 1, bytes(512));
+ }
+ }
+
+ if (shouldFlush())
+ CLUSTER.stream().forEach(i -> i.flush(KEYSPACE));
+
+ enable(true);
+ checkpointHistogram();
+ SimpleQueryResult result = CLUSTER.coordinator(1).executeWithResult(cql, ConsistencyLevel.ALL);
+ assertWarnings(result.warnings());
+ assertHistogramUpdated();
+ assertWarnAborts(1, 0, 0);
+ assertWarnings(driverQueryAll(cql).getExecutionInfo().getWarnings());
+ assertHistogramUpdated();
+ assertWarnAborts(2, 0, 0);
+
+ enable(false);
+ result = CLUSTER.coordinator(1).executeWithResult(cql, ConsistencyLevel.ALL);
+ assertThat(result.warnings()).isEmpty();
+ assertHistogramNotUpdated();
+ assertThat(driverQueryAll(cql).getExecutionInfo().getWarnings()).isEmpty();
+ assertHistogramNotUpdated();
+ assertWarnAborts(2, 0, 0);
+ }
+
+ @Test
+ public void failThresholdSinglePartition() throws UnknownHostException
+ {
+ failThreshold(CQL_PK_READ);
+ }
+
+ @Test
+ public void failThresholdScan() throws UnknownHostException
+ {
+ failThreshold(CQL_TABLE_SCAN);
+ }
+
+ protected int failThresholdRowCount()
+ {
+ return 5;
+ }
+
+ public void failThreshold(String cql) throws UnknownHostException
+ {
+ ICoordinator node = CLUSTER.coordinator(1);
+ for (int i = 0; i < failThresholdRowCount(); i++)
+ node.execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)", ConsistencyLevel.ALL, i + 1, bytes(512));
+
+ if (shouldFlush())
+ CLUSTER.stream().forEach(i -> i.flush(KEYSPACE));
+
+ enable(true);
+ checkpointHistogram();
+ List<String> warnings = CLUSTER.get(1).callsOnInstance(() -> {
+ ClientWarn.instance.captureWarnings();
+ CoordinatorWarnings.init();
+ try
+ {
+ QueryProcessor.execute(cql, org.apache.cassandra.db.ConsistencyLevel.ALL, QueryState.forInternalCalls());
+ Assert.fail("Expected query failure");
+ }
+ catch (ReadSizeAbortException e)
+ {
+ // expected, client transport returns an error message and includes client warnings
+ }
+ CoordinatorWarnings.done();
+ CoordinatorWarnings.reset();
+ return ClientWarn.instance.getWarnings();
+ }).call();
+ assertAbortWarnings(warnings);
+ assertHistogramUpdated();
+ assertWarnAborts(0, 1, 1);
+
+ try
+ {
+ driverQueryAll(cql);
+ Assert.fail("Query should have thrown ReadFailureException");
+ }
+ catch (com.datastax.driver.core.exceptions.ReadFailureException e)
+ {
+ // without changing the client can't produce a better message...
+ // client does NOT include the message sent from the server in the exception; so the message doesn't work
+ // well in this case
+ assertThat(e.getMessage()).contains("responses were required but only 0 replica responded");
+ ImmutableSet<InetAddress> expectedKeys = ImmutableSet.of(InetAddress.getByAddress(new byte[]{ 127, 0, 0, 1 }), InetAddress.getByAddress(new byte[]{ 127, 0, 0, 2 }), InetAddress.getByAddress(new byte[]{ 127, 0, 0, 3 }));
+ assertThat(e.getFailuresMap())
+ .hasSizeBetween(1, 3)
+ // coordinator changes from run to run, so can't assert map as the key is dynamic... so assert the domain of keys and the single value expect
+ .containsValue(RequestFailureReason.READ_SIZE.code)
+ .hasKeySatisfying(new Condition<InetAddress>() {
+ public boolean matches(InetAddress value)
+ {
+ return expectedKeys.contains(value);
+ }
+ });
+ }
+ assertHistogramUpdated();
+ assertWarnAborts(0, 2, 1);
+
+ // query should no longer fail
+ enable(false);
+ SimpleQueryResult result = node.executeWithResult(cql, ConsistencyLevel.ALL);
+ assertThat(result.warnings()).isEmpty();
+ assertHistogramNotUpdated();
+ assertThat(driverQueryAll(cql).getExecutionInfo().getWarnings()).isEmpty();
+ assertHistogramNotUpdated();
+ assertWarnAborts(0, 2, 0);
+ }
+
+ protected static void enable(boolean value)
+ {
+ CLUSTER.stream().forEach(i -> i.runOnInstance(() -> DatabaseDescriptor.setTrackWarningsEnabled(value)));
+ }
+
+ protected static ByteBuffer bytes(int size)
+ {
+ byte[] b = new byte[size];
+ RANDOM.nextBytes(b);
+ return ByteBuffer.wrap(b);
+ }
+
+ protected static ResultSet driverQueryAll(String cql)
+ {
+ return JAVA_DRIVER_SESSION.execute(new SimpleStatement(cql).setConsistencyLevel(com.datastax.driver.core.ConsistencyLevel.ALL));
+ }
+
+ protected abstract long[] getHistogram();
+ private static long[] previous = new long[0];
+ protected void assertHistogramUpdated()
+ {
+ long[] latestCount = getHistogram();
+ try
+ {
+ // why notEquals? timing can cause 1 replica to not process before the failure makes it to the test
+ // for this reason it is possible 1 replica was not updated but the others were; by expecting everyone
+ // to update the test will become flaky
+ assertThat(latestCount).isNotEqualTo(previous);
+ }
+ finally
+ {
+ previous = latestCount;
+ }
+ }
+
+ protected void assertHistogramNotUpdated()
+ {
+ long[] latestCount = getHistogram();
+ try
+ {
+ assertThat(latestCount).isEqualTo(previous);
+ }
+ finally
+ {
+ previous = latestCount;
+ }
+ }
+
+ private void checkpointHistogram()
+ {
+ previous = getHistogram();
+ }
+
+ private static long GLOBAL_READ_ABORTS = 0;
+ protected void assertWarnAborts(int warns, int aborts, int globalAborts)
+ {
+ assertThat(totalWarnings()).as("warnings").isEqualTo(warns);
+ assertThat(totalAborts()).as("aborts").isEqualTo(aborts);
+ long expectedGlobalAborts = GLOBAL_READ_ABORTS + globalAborts;
+ assertThat(totalReadAborts()).as("global aborts").isEqualTo(expectedGlobalAborts);
+ GLOBAL_READ_ABORTS = expectedGlobalAborts;
+ }
+
+ protected static long totalReadAborts()
+ {
+ return CLUSTER.stream().mapToLong(i ->
+ i.metrics().getCounter("org.apache.cassandra.metrics.ClientRequest.Aborts.Read-ALL")
+ + i.metrics().getCounter("org.apache.cassandra.metrics.ClientRequest.Aborts.RangeSlice")
+ ).sum();
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/trackwarnings/CoordinatorReadSizeWarningTest.java b/test/distributed/org/apache/cassandra/distributed/test/trackwarnings/CoordinatorReadSizeWarningTest.java
new file mode 100644
index 0000000..523220d
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/trackwarnings/CoordinatorReadSizeWarningTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.trackwarnings;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.junit.*;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * ReadSize client warn/abort is coordinator only, so the fact ClientMetrics is coordinator only does not
+ * impact the user experience
+ */
+public class CoordinatorReadSizeWarningTest extends AbstractClientSizeWarning
+{
+ @BeforeClass
+ public static void setupClass() throws IOException
+ {
+ AbstractClientSizeWarning.setupClass();
+
+ // setup threshold after init to avoid driver issues loading
+ // the test uses a rather small limit, which causes driver to fail while loading metadata
+ CLUSTER.stream().forEach(i -> i.runOnInstance(() -> {
+ DatabaseDescriptor.setCoordinatorReadSizeWarnThresholdKB(1);
+ DatabaseDescriptor.setCoordinatorReadSizeAbortThresholdKB(2);
+ }));
+ }
+
+ private static void assertPrefix(String expectedPrefix, String actual)
+ {
+ if (!actual.startsWith(expectedPrefix))
+ throw new AssertionError(String.format("expected \"%s\" to begin with \"%s\"", actual, expectedPrefix));
+ }
+
+ @Override
+ protected void assertWarnings(List<String> warnings)
+ {
+ assertThat(warnings).hasSize(1);
+ assertPrefix("Read on table " + KEYSPACE + ".tbl has exceeded the size warning threshold", warnings.get(0));
+ }
+
+ @Override
+ protected void assertAbortWarnings(List<String> warnings)
+ {
+ assertThat(warnings).hasSize(1);
+ assertPrefix("Read on table " + KEYSPACE + ".tbl has exceeded the size failure threshold", warnings.get(0));
+ }
+
+ @Override
+ protected long[] getHistogram()
+ {
+ return CLUSTER.stream().mapToLong(i -> i.metrics().getCounter("org.apache.cassandra.metrics.keyspace.CoordinatorReadSize." + KEYSPACE)).toArray();
+ }
+
+ @Override
+ protected long totalWarnings()
+ {
+ return CLUSTER.stream().mapToLong(i -> i.metrics().getCounter("org.apache.cassandra.metrics.keyspace.CoordinatorReadSizeWarnings." + KEYSPACE)).sum();
+ }
+
+ @Override
+ protected long totalAborts()
+ {
+ return CLUSTER.stream().mapToLong(i -> i.metrics().getCounter("org.apache.cassandra.metrics.keyspace.CoordinatorReadSizeAborts." + KEYSPACE)).sum();
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/trackwarnings/LocalReadSizeWarningTest.java b/test/distributed/org/apache/cassandra/distributed/test/trackwarnings/LocalReadSizeWarningTest.java
new file mode 100644
index 0000000..8060bd5
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/trackwarnings/LocalReadSizeWarningTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.trackwarnings;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.junit.BeforeClass;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class LocalReadSizeWarningTest extends AbstractClientSizeWarning
+{
+ @BeforeClass
+ public static void setupClass() throws IOException
+ {
+ AbstractClientSizeWarning.setupClass();
+
+ // setup threshold after init to avoid driver issues loading
+ // the test uses a rather small limit, which causes driver to fail while loading metadata
+ CLUSTER.stream().forEach(i -> i.runOnInstance(() -> {
+ // disable coordinator version
+ DatabaseDescriptor.setCoordinatorReadSizeWarnThresholdKB(0);
+ DatabaseDescriptor.setCoordinatorReadSizeAbortThresholdKB(0);
+
+ DatabaseDescriptor.setLocalReadSizeWarnThresholdKb(1);
+ DatabaseDescriptor.setLocalReadSizeAbortThresholdKb(2);
+ }));
+ }
+
+ @Override
+ protected void assertWarnings(List<String> warnings)
+ {
+ assertThat(warnings).hasSize(1);
+ assertThat(warnings.get(0)).contains("(see track_warnings.local_read_size.warn_threshold_kb)").contains("and issued local read size warnings for query");
+ }
+
+ @Override
+ protected void assertAbortWarnings(List<String> warnings)
+ {
+ assertThat(warnings).hasSize(1);
+ assertThat(warnings.get(0)).contains("(see track_warnings.local_read_size.abort_threshold_kb)").contains("aborted the query");
+ }
+
+ @Override
+ protected long[] getHistogram()
+ {
+ return CLUSTER.stream().mapToLong(i -> i.metrics().getCounter("org.apache.cassandra.metrics.keyspace.LocalReadSize." + KEYSPACE)).toArray();
+ }
+
+ @Override
+ protected long totalWarnings()
+ {
+ return CLUSTER.stream().mapToLong(i -> i.metrics().getCounter("org.apache.cassandra.metrics.keyspace.LocalReadSizeWarnings." + KEYSPACE)).sum();
+ }
+
+ @Override
+ protected long totalAborts()
+ {
+ return CLUSTER.stream().mapToLong(i -> i.metrics().getCounter("org.apache.cassandra.metrics.keyspace.LocalReadSizeAborts." + KEYSPACE)).sum();
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/trackwarnings/RowIndexSizeWarningTest.java b/test/distributed/org/apache/cassandra/distributed/test/trackwarnings/RowIndexSizeWarningTest.java
new file mode 100644
index 0000000..e2158e5
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/trackwarnings/RowIndexSizeWarningTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.trackwarnings;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.junit.Assume;
+import org.junit.BeforeClass;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class RowIndexSizeWarningTest extends AbstractClientSizeWarning
+{
+ @BeforeClass
+ public static void setupClass() throws IOException
+ {
+ AbstractClientSizeWarning.setupClass();
+
+ CLUSTER.stream().forEach(i -> i.runOnInstance(() -> {
+ DatabaseDescriptor.setRowIndexSizeWarnThresholdKb(1);
+ DatabaseDescriptor.setRowIndexSizeAbortThresholdKb(2);
+
+ // hack to force multiple index entries
+ DatabaseDescriptor.setColumnIndexCacheSize(1 << 20);
+ DatabaseDescriptor.setColumnIndexSize(0);
+ }));
+ }
+
+ @Override
+ protected boolean shouldFlush()
+ {
+ // need to flush as RowIndexEntry is at the SSTable level
+ return true;
+ }
+
+ @Override
+ protected int warnThresholdRowCount()
+ {
+ return 15;
+ }
+
+ @Override
+ protected int failThresholdRowCount()
+ {
+ // since the RowIndexEntry grows slower than a partition, need even more rows to trigger
+ return 40;
+ }
+
+ @Override
+ public void noWarningsScan()
+ {
+ Assume.assumeFalse("Ignore Scans", true);
+ }
+
+ @Override
+ public void warnThresholdScan()
+ {
+ Assume.assumeFalse("Ignore Scans", true);
+ }
+
+ @Override
+ public void warnThresholdScanWithReadRepair()
+ {
+ Assume.assumeFalse("Ignore Scans", true);
+ }
+
+ @Override
+ public void failThresholdScan()
+ {
+ Assume.assumeFalse("Ignore Scans", true);
+ }
+
+ @Override
+ protected void assertWarnings(List<String> warnings)
+ {
+ assertThat(warnings).hasSize(1);
+ assertThat(warnings.get(0)).contains("(see track_warnings.row_index_size.warn_threshold_kb)").contains("bytes in RowIndexEntry and issued warnings for query");
+ }
+
+ @Override
+ protected void assertAbortWarnings(List<String> warnings)
+ {
+ assertThat(warnings).hasSize(1);
+ assertThat(warnings.get(0)).contains("(see track_warnings.row_index_size.abort_threshold_kb)").contains("bytes in RowIndexEntry and aborted the query");
+ }
+
+ @Override
+ protected long[] getHistogram()
+ {
+ return CLUSTER.stream().mapToLong(i -> i.metrics().getCounter("org.apache.cassandra.metrics.keyspace.RowIndexSize." + KEYSPACE)).toArray();
+ }
+
+ @Override
+ protected long totalWarnings()
+ {
+ return CLUSTER.stream().mapToLong(i -> i.metrics().getCounter("org.apache.cassandra.metrics.keyspace.RowIndexSizeWarnings." + KEYSPACE)).sum();
+ }
+
+ @Override
+ protected long totalAborts()
+ {
+ return CLUSTER.stream().mapToLong(i -> i.metrics().getCounter("org.apache.cassandra.metrics.keyspace.RowIndexSizeAborts." + KEYSPACE)).sum();
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/ClientTombstoneWarningTest.java b/test/distributed/org/apache/cassandra/distributed/test/trackwarnings/TombstoneWarningTest.java
similarity index 96%
rename from test/distributed/org/apache/cassandra/distributed/test/ClientTombstoneWarningTest.java
rename to test/distributed/org/apache/cassandra/distributed/test/trackwarnings/TombstoneWarningTest.java
index d529515..be83fc1 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/ClientTombstoneWarningTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/trackwarnings/TombstoneWarningTest.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.cassandra.distributed.test;
+package org.apache.cassandra.distributed.test.trackwarnings;
import java.io.IOException;
import java.net.InetAddress;
@@ -44,14 +44,17 @@ import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.distributed.api.ICluster;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.test.JavaDriverUtils;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
import org.apache.cassandra.exceptions.ReadFailureException;
import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.exceptions.TombstoneAbortException;
import org.apache.cassandra.service.ClientWarn;
import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.service.reads.trackwarnings.CoordinatorWarnings;
import org.assertj.core.api.Assertions;
-public class ClientTombstoneWarningTest extends TestBaseImpl
+public class TombstoneWarningTest extends TestBaseImpl
{
private static final int TOMBSTONE_WARN = 50;
private static final int TOMBSTONE_FAIL = 100;
@@ -90,7 +93,7 @@ public class ClientTombstoneWarningTest extends TestBaseImpl
private static void enable(boolean value)
{
- CLUSTER.stream().forEach(i -> i.runOnInstance(() -> DatabaseDescriptor.setClientTrackWarningsEnabled(value)));
+ CLUSTER.stream().forEach(i -> i.runOnInstance(() -> DatabaseDescriptor.setTrackWarningsEnabled(value)));
}
@Test
@@ -194,6 +197,7 @@ public class ClientTombstoneWarningTest extends TestBaseImpl
enable(true);
List<String> warnings = CLUSTER.get(1).callsOnInstance(() -> {
ClientWarn.instance.captureWarnings();
+ CoordinatorWarnings.init();
try
{
QueryProcessor.execute(cql, org.apache.cassandra.db.ConsistencyLevel.ALL, QueryState.forInternalCalls());
@@ -205,6 +209,8 @@ public class ClientTombstoneWarningTest extends TestBaseImpl
Assert.assertEquals(TOMBSTONE_FAIL + 1, e.tombstones);
// expected, client transport returns an error message and includes client warnings
}
+ CoordinatorWarnings.done();
+ CoordinatorWarnings.reset();
return ClientWarn.instance.getWarnings();
}).call();
Assertions.assertThat(Iterables.getOnlyElement(warnings))
diff --git a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
index 2d6132e..6e1e98c 100644
--- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
+++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
@@ -97,6 +97,9 @@ public class DatabaseDescriptorRefTest
"org.apache.cassandra.config.YamlConfigurationLoader$CustomConstructor",
"org.apache.cassandra.config.TransparentDataEncryptionOptions",
"org.apache.cassandra.config.SubnetGroups",
+ "org.apache.cassandra.config.TrackWarnings",
+ "org.apache.cassandra.config.TrackWarnings$LongByteThreshold",
+ "org.apache.cassandra.config.TrackWarnings$IntByteThreshold",
"org.apache.cassandra.db.ConsistencyLevel",
"org.apache.cassandra.db.commitlog.CommitLogSegmentManagerFactory",
"org.apache.cassandra.db.commitlog.DefaultCommitLogSegmentMgrFactory",
diff --git a/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java b/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
index 0038938..b15d8cd 100644
--- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
+++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
@@ -34,6 +34,7 @@ import org.junit.Test;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.exceptions.ConfigurationException;
+import org.assertj.core.api.Assertions;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
@@ -590,4 +591,154 @@ public class DatabaseDescriptorTest
Assert.assertEquals(Integer.valueOf(1), config.num_tokens);
Assert.assertEquals(1, DatabaseDescriptor.tokensFromString(config.initial_token).size());
}
+
+ // coordinator read
+ @Test
+ public void testClientLargeReadWarnAndAbortNegative()
+ {
+ Config conf = new Config();
+ conf.track_warnings.coordinator_read_size.warn_threshold_kb = -2;
+ conf.track_warnings.coordinator_read_size.abort_threshold_kb = -2;
+ DatabaseDescriptor.applyTrackWarningsValidations(conf);
+ Assertions.assertThat(conf.track_warnings.coordinator_read_size.warn_threshold_kb).isEqualTo(0);
+ Assertions.assertThat(conf.track_warnings.coordinator_read_size.abort_threshold_kb).isEqualTo(0);
+ }
+
+ @Test
+ public void testClientLargeReadWarnGreaterThanAbort()
+ {
+ Config conf = new Config();
+ conf.track_warnings.coordinator_read_size.warn_threshold_kb = 2;
+ conf.track_warnings.coordinator_read_size.abort_threshold_kb = 1;
+ Assertions.assertThatThrownBy(() -> DatabaseDescriptor.applyTrackWarningsValidations(conf))
+ .isInstanceOf(ConfigurationException.class)
+ .hasMessage("abort_threshold_kb (1) must be greater than or equal to warn_threshold_kb (2); see track_warnings.coordinator_read_size");
+ }
+
+ @Test
+ public void testClientLargeReadWarnEqAbort()
+ {
+ Config conf = new Config();
+ conf.track_warnings.coordinator_read_size.warn_threshold_kb = 2;
+ conf.track_warnings.coordinator_read_size.abort_threshold_kb = 2;
+ DatabaseDescriptor.applyTrackWarningsValidations(conf);
+ }
+
+ @Test
+ public void testClientLargeReadWarnEnabledAbortDisabled()
+ {
+ Config conf = new Config();
+ conf.track_warnings.coordinator_read_size.warn_threshold_kb = 2;
+ conf.track_warnings.coordinator_read_size.abort_threshold_kb = 0;
+ DatabaseDescriptor.applyTrackWarningsValidations(conf);
+ }
+
+ @Test
+ public void testClientLargeReadAbortEnabledWarnDisabled()
+ {
+ Config conf = new Config();
+ conf.track_warnings.coordinator_read_size.warn_threshold_kb = 0;
+ conf.track_warnings.coordinator_read_size.abort_threshold_kb = 2;
+ DatabaseDescriptor.applyTrackWarningsValidations(conf);
+ }
+
+ // local read
+ @Test
+ public void testLocalLargeReadWarnAndAbortNegative()
+ {
+ Config conf = new Config();
+ conf.track_warnings.local_read_size.warn_threshold_kb = -2;
+ conf.track_warnings.local_read_size.abort_threshold_kb = -2;
+ DatabaseDescriptor.applyTrackWarningsValidations(conf);
+ Assertions.assertThat(conf.track_warnings.local_read_size.warn_threshold_kb).isEqualTo(0);
+ Assertions.assertThat(conf.track_warnings.local_read_size.abort_threshold_kb).isEqualTo(0);
+ }
+
+ @Test
+ public void testLocalLargeReadWarnGreaterThanAbort()
+ {
+ Config conf = new Config();
+ conf.track_warnings.local_read_size.warn_threshold_kb = 2;
+ conf.track_warnings.local_read_size.abort_threshold_kb = 1;
+ Assertions.assertThatThrownBy(() -> DatabaseDescriptor.applyTrackWarningsValidations(conf))
+ .isInstanceOf(ConfigurationException.class)
+ .hasMessage("abort_threshold_kb (1) must be greater than or equal to warn_threshold_kb (2); see track_warnings.local_read_size");
+ }
+
+ @Test
+ public void testLocalLargeReadWarnEqAbort()
+ {
+ Config conf = new Config();
+ conf.track_warnings.local_read_size.warn_threshold_kb = 2;
+ conf.track_warnings.local_read_size.abort_threshold_kb = 2;
+ DatabaseDescriptor.applyTrackWarningsValidations(conf);
+ }
+
+ @Test
+ public void testLocalLargeReadWarnEnabledAbortDisabled()
+ {
+ Config conf = new Config();
+ conf.track_warnings.local_read_size.warn_threshold_kb = 2;
+ conf.track_warnings.local_read_size.abort_threshold_kb = 0;
+ DatabaseDescriptor.applyTrackWarningsValidations(conf);
+ }
+
+ @Test
+ public void testLocalLargeReadAbortEnabledWarnDisabled()
+ {
+ Config conf = new Config();
+ conf.track_warnings.local_read_size.warn_threshold_kb = 0;
+ conf.track_warnings.local_read_size.abort_threshold_kb = 2;
+ DatabaseDescriptor.applyTrackWarningsValidations(conf);
+ }
+
+ // row index entry
+ @Test
+ public void testRowIndexSizeWarnAndAbortNegative()
+ {
+ Config conf = new Config();
+ conf.track_warnings.row_index_size.warn_threshold_kb = -2;
+ conf.track_warnings.row_index_size.abort_threshold_kb = -2;
+ DatabaseDescriptor.applyTrackWarningsValidations(conf);
+ Assertions.assertThat(conf.track_warnings.row_index_size.warn_threshold_kb).isEqualTo(0);
+ Assertions.assertThat(conf.track_warnings.row_index_size.abort_threshold_kb).isEqualTo(0);
+ }
+
+ @Test
+ public void testRowIndexSizeWarnGreaterThanAbort()
+ {
+ Config conf = new Config();
+ conf.track_warnings.row_index_size.warn_threshold_kb = 2;
+ conf.track_warnings.row_index_size.abort_threshold_kb = 1;
+ Assertions.assertThatThrownBy(() -> DatabaseDescriptor.applyTrackWarningsValidations(conf))
+ .isInstanceOf(ConfigurationException.class)
+ .hasMessage("abort_threshold_kb (1) must be greater than or equal to warn_threshold_kb (2); see track_warnings.row_index_size");
+ }
+
+ @Test
+ public void testRowIndexSizeWarnEqAbort()
+ {
+ Config conf = new Config();
+ conf.track_warnings.row_index_size.warn_threshold_kb = 2;
+ conf.track_warnings.row_index_size.abort_threshold_kb = 2;
+ DatabaseDescriptor.applyTrackWarningsValidations(conf);
+ }
+
+ @Test
+ public void testRowIndexSizeWarnEnabledAbortDisabled()
+ {
+ Config conf = new Config();
+ conf.track_warnings.row_index_size.warn_threshold_kb = 2;
+ conf.track_warnings.row_index_size.abort_threshold_kb = 0;
+ DatabaseDescriptor.applyTrackWarningsValidations(conf);
+ }
+
+ @Test
+ public void testRowIndexSizeAbortEnabledWarnDisabled()
+ {
+ Config conf = new Config();
+ conf.track_warnings.row_index_size.warn_threshold_kb = 0;
+ conf.track_warnings.row_index_size.abort_threshold_kb = 2;
+ DatabaseDescriptor.applyTrackWarningsValidations(conf);
+ }
}
diff --git a/test/unit/org/apache/cassandra/config/YamlConfigurationLoaderTest.java b/test/unit/org/apache/cassandra/config/YamlConfigurationLoaderTest.java
index b533a8f..a1cb955 100644
--- a/test/unit/org/apache/cassandra/config/YamlConfigurationLoaderTest.java
+++ b/test/unit/org/apache/cassandra/config/YamlConfigurationLoaderTest.java
@@ -18,6 +18,8 @@
package org.apache.cassandra.config;
+import java.io.File;
+import java.net.MalformedURLException;
import java.net.URL;
import java.util.Arrays;
import java.util.Collections;
@@ -26,8 +28,6 @@ import java.util.Map;
import com.google.common.collect.ImmutableMap;
import org.junit.Test;
-import org.assertj.core.api.Assertions;
-
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
@@ -35,6 +35,48 @@ import static org.junit.Assert.assertEquals;
public class YamlConfigurationLoaderTest
{
@Test
+ public void trackWarningsFromConfig()
+ {
+ // this test just makes sure snakeyaml loads the test config properly and populates the fields (track warnings uses final in some places)
+ // if the config is changed, its ok to update this test to reflect that change
+ TrackWarnings tw = load("test/conf/cassandra.yaml").track_warnings;
+ assertThat(tw.enabled).isTrue();
+
+ assertThat(tw.coordinator_read_size.warn_threshold_kb).isGreaterThan(0);
+ assertThat(tw.coordinator_read_size.abort_threshold_kb).isGreaterThan(0);
+
+ assertThat(tw.local_read_size.warn_threshold_kb).isGreaterThan(0);
+ assertThat(tw.local_read_size.abort_threshold_kb).isGreaterThan(0);
+
+ assertThat(tw.row_index_size.warn_threshold_kb).isGreaterThan(0);
+ assertThat(tw.row_index_size.abort_threshold_kb).isGreaterThan(0);
+ }
+
+ @Test
+ public void trackWarningsFromMap()
+ {
+ Map<String, Object> map = ImmutableMap.of("track_warnings", ImmutableMap.of(
+ "enabled", true,
+ "coordinator_read_size", ImmutableMap.of("warn_threshold_kb", 1024),
+ "local_read_size", ImmutableMap.of("abort_threshold_kb", 1024),
+ "row_index_size", ImmutableMap.of("warn_threshold_kb", 1024, "abort_threshold_kb", 1024)
+ ));
+
+ Config config = YamlConfigurationLoader.fromMap(map, Config.class);
+ TrackWarnings tw = config.track_warnings;
+ assertThat(tw.enabled).isTrue();
+
+ assertThat(tw.coordinator_read_size.warn_threshold_kb).isEqualTo(1024);
+ assertThat(tw.coordinator_read_size.abort_threshold_kb).isEqualTo(0);
+
+ assertThat(tw.local_read_size.warn_threshold_kb).isEqualTo(0);
+ assertThat(tw.local_read_size.abort_threshold_kb).isEqualTo(1024);
+
+ assertThat(tw.row_index_size.warn_threshold_kb).isEqualTo(1024);
+ assertThat(tw.row_index_size.abort_threshold_kb).isEqualTo(1024);
+ }
+
+ @Test
public void fromMapTest()
{
int storagePort = 123;
@@ -59,10 +101,26 @@ public class YamlConfigurationLoaderTest
@Test
public void sharedErrorReportingExclusions()
{
- URL url = YamlConfigurationLoaderTest.class.getClassLoader().getResource("data/config/YamlConfigurationLoaderTest/shared_client_error_reporting_exclusions.yaml");
- Config config = new YamlConfigurationLoader().loadConfig(url);
+ Config config = load("data/config/YamlConfigurationLoaderTest/shared_client_error_reporting_exclusions.yaml");
SubnetGroups expected = new SubnetGroups(Arrays.asList("127.0.0.1", "127.0.0.0/31"));
assertThat(config.client_error_reporting_exclusions).isEqualTo(expected);
assertThat(config.internode_error_reporting_exclusions).isEqualTo(expected);
}
+
+ private static Config load(String path)
+ {
+ URL url = YamlConfigurationLoaderTest.class.getClassLoader().getResource(path);
+ if (url == null)
+ {
+ try
+ {
+ url = new File(path).toURI().toURL();
+ }
+ catch (MalformedURLException e)
+ {
+ throw new AssertionError(e);
+ }
+ }
+ return new YamlConfigurationLoader().loadConfig(url);
+ }
}
diff --git a/test/unit/org/apache/cassandra/db/CellSpecTest.java b/test/unit/org/apache/cassandra/db/CellSpecTest.java
index 44fc625..040f09f 100644
--- a/test/unit/org/apache/cassandra/db/CellSpecTest.java
+++ b/test/unit/org/apache/cassandra/db/CellSpecTest.java
@@ -44,9 +44,9 @@ import org.apache.cassandra.utils.UUIDGen;
import org.apache.cassandra.utils.concurrent.OpOrder;
import org.apache.cassandra.utils.memory.NativeAllocator;
import org.apache.cassandra.utils.memory.NativePool;
-import org.assertj.core.api.Assertions;
import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+import static org.assertj.core.api.Assertions.assertThat;
@RunWith(Parameterized.class)
public class CellSpecTest
@@ -60,6 +60,28 @@ public class CellSpecTest
}
@Test
+ public void unsharedHeapSize()
+ {
+ long empty = ObjectSizes.measure(cell);
+ long actual = ObjectSizes.measureDeep(cell);
+ long expected;
+ if (cell instanceof NativeCell)
+ {
+ // NativeCell stores the contents off-heap, so the cost on-heap is just the object's empty case
+ expected = empty;
+ }
+ else
+ {
+ expected = empty + valueSizeOnHeapOf(cell.value());
+ if (cell.path() != null)
+ expected += cell.path().unsharedHeapSize();
+ }
+
+ assertThat(expected).isEqualTo(actual);
+ assertThat(cell.unsharedHeapSize()).isEqualTo(expected);
+ }
+
+ @Test
public void unsharedHeapSizeExcludingData()
{
long empty = ObjectSizes.measure(cell);
@@ -77,10 +99,27 @@ public class CellSpecTest
expected += cell.path().unsharedHeapSizeExcludingData();
}
- Assertions.assertThat(cell.unsharedHeapSizeExcludingData())
+ assertThat(cell.unsharedHeapSizeExcludingData())
.isEqualTo(expected);
}
+ private long valueSizeOnHeapOf(Object value)
+ {
+ if (value instanceof ByteBuffer)
+ {
+ ByteBuffer bb = (ByteBuffer) value;
+ long size = ObjectSizes.sizeOfEmptyHeapByteBuffer();
+ if (!bb.isDirect())
+ size += ObjectSizes.sizeOfArray(bb.array());
+ return size;
+ }
+ else if (value instanceof byte[])
+ {
+ return ObjectSizes.sizeOfArray((byte[]) value);
+ }
+ throw new IllegalArgumentException("Unsupported type: " + value.getClass());
+ }
+
private static long valuePtrSize(Object value)
{
if (value instanceof ByteBuffer)
diff --git a/test/unit/org/apache/cassandra/db/ClusteringHeapSizeTest.java b/test/unit/org/apache/cassandra/db/ClusteringHeapSizeTest.java
index 54d0ce1..f1c3430 100644
--- a/test/unit/org/apache/cassandra/db/ClusteringHeapSizeTest.java
+++ b/test/unit/org/apache/cassandra/db/ClusteringHeapSizeTest.java
@@ -45,12 +45,6 @@ public class ClusteringHeapSizeTest
public void unsharedHeap()
{
long measureDeep = ObjectSizes.measureDeep(clustering);
- if (clustering instanceof BufferClustering)
- {
- // jamm (used in measureDeep) uses .remaining() where as .sizeOnHeapOf() done in unsharedHeapSize actually looks at memory cost
- // without assuming the array is shared (unless capacity > remaining); so account for that
- measureDeep += ObjectSizes.measureDeep(new byte[0]);
- }
long unsharedHeapSize = clustering.unsharedHeapSize();
double allowedDiff = 0.1; // 10% is seen as "close enough"
diff --git a/test/unit/org/apache/cassandra/service/reads/trackwarnings/WarningsSnapshotTest.java b/test/unit/org/apache/cassandra/service/reads/trackwarnings/WarningsSnapshotTest.java
new file mode 100644
index 0000000..f9428d8
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/reads/trackwarnings/WarningsSnapshotTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.reads.trackwarnings;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+import com.google.common.collect.ImmutableSet;
+import org.junit.Test;
+
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.quicktheories.core.Gen;
+import org.quicktheories.generators.SourceDSL;
+import org.quicktheories.impl.Constraint;
+
+import static org.apache.cassandra.service.reads.trackwarnings.WarningsSnapshot.*;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.quicktheories.QuickTheory.qt;
+
+public class WarningsSnapshotTest
+{
+ private static final InetAddressAndPort HOME = address(127, 0, 0, 1);
+ private static final InetAddressAndPort VACATION_HOME = address(127, 0, 0, 2);
+
+ @Test
+ public void staticMergeEmtpy()
+ {
+ WarningsSnapshot result = merge(null, empty(), null, empty());
+ assertThat(result).isNull();
+ }
+
+ @Test
+ public void staticMergeNonEmtpy()
+ {
+ qt().forAll(nonEmpty(), nonEmpty()).check((a, b) -> {
+ WarningsSnapshot result = merge(a, b, null, empty());
+ return result != null && !result.isEmpty();
+ });
+ }
+
+ @Test
+ public void mergeEmtpy()
+ {
+ WarningsSnapshot result = empty().merge(empty());
+ assertThat(result).isEqualTo(empty());
+ }
+
+ @Test
+ public void mergeSelf()
+ {
+ qt().forAll(all()).check(self -> self.merge(self).equals(self));
+ }
+
+ @Test
+ public void mergeSelfWithEmpty()
+ {
+ qt().forAll(all()).check(self -> self.merge(empty()).equals(self) && empty().merge(self).equals(self));
+ }
+
+ @Test
+ public void mergeNonEmpty()
+ {
+ WarningsSnapshot expected = builder()
+ .tombstonesAbort(ImmutableSet.of(HOME), 42)
+ .localReadSizeWarning(ImmutableSet.of(VACATION_HOME), 12)
+ .build();
+ // validate builder to protect against empty = empty passing this test
+ assertThat(expected.tombstones.aborts.instances).isEqualTo(ImmutableSet.of(HOME));
+ assertThat(expected.tombstones.aborts.maxValue).isEqualTo(42);
+ assertThat(expected.localReadSize.warnings.instances).isEqualTo(ImmutableSet.of(VACATION_HOME));
+ assertThat(expected.localReadSize.warnings.maxValue).isEqualTo(12);
+
+ WarningsSnapshot output = empty().merge(expected);
+ assertThat(output).isEqualTo(expected).isEqualTo(expected.merge(empty()));
+ assertThat(output.merge(expected)).isEqualTo(expected);
+ }
+
+ @Test
+ public void mergeNonEmpty2()
+ {
+ WarningsSnapshot a = builder()
+ .tombstonesAbort(ImmutableSet.of(HOME), 42)
+ .build();
+ WarningsSnapshot b = builder()
+ .localReadSizeWarning(ImmutableSet.of(VACATION_HOME), 12)
+ .build();
+ WarningsSnapshot expected = builder()
+ .tombstonesAbort(ImmutableSet.of(HOME), 42)
+ .localReadSizeWarning(ImmutableSet.of(VACATION_HOME), 12)
+ .build();
+
+ // validate builder to protect against empty = empty passing this test
+ assertThat(a.tombstones.aborts.instances).isEqualTo(expected.tombstones.aborts.instances).isEqualTo(ImmutableSet.of(HOME));
+ assertThat(a.tombstones.aborts.maxValue).isEqualTo(expected.tombstones.aborts.maxValue).isEqualTo(42);
+ assertThat(b.localReadSize.warnings.instances).isEqualTo(expected.localReadSize.warnings.instances).isEqualTo(ImmutableSet.of(VACATION_HOME));
+ assertThat(b.localReadSize.warnings.maxValue).isEqualTo(expected.localReadSize.warnings.maxValue).isEqualTo(12);
+
+ WarningsSnapshot output = a.merge(b);
+ assertThat(output).isEqualTo(expected).isEqualTo(expected.merge(empty()));
+ assertThat(output.merge(expected)).isEqualTo(expected);
+ }
+
+ @Test
+ public void mergeConflict()
+ {
+ WarningsSnapshot a = builder().tombstonesAbort(ImmutableSet.of(HOME), 42).build();
+ WarningsSnapshot b = builder().tombstonesAbort(ImmutableSet.of(VACATION_HOME), 12).build();
+ WarningsSnapshot expected = builder().tombstonesAbort(ImmutableSet.of(HOME, VACATION_HOME), 42).build();
+
+ // validate builder to protect against empty = empty passing this test
+ assertThat(a.tombstones.aborts.instances).isEqualTo(ImmutableSet.of(HOME));
+ assertThat(a.tombstones.aborts.maxValue).isEqualTo(42);
+ assertThat(b.tombstones.aborts.instances).isEqualTo(ImmutableSet.of(VACATION_HOME));
+ assertThat(b.tombstones.aborts.maxValue).isEqualTo(12);
+ assertThat(expected.tombstones.aborts.instances).isEqualTo(ImmutableSet.of(HOME, VACATION_HOME));
+ assertThat(expected.tombstones.aborts.maxValue).isEqualTo(42);
+
+ WarningsSnapshot output = a.merge(b);
+ assertThat(output).isEqualTo(expected).isEqualTo(expected.merge(empty()));
+ assertThat(output.merge(expected)).isEqualTo(expected);
+ }
+
+ private static InetAddressAndPort address(int a, int b, int c, int d)
+ {
+ try
+ {
+ InetAddress address = InetAddress.getByAddress(new byte[]{ (byte) a, (byte) b, (byte) c, (byte) d });
+ return InetAddressAndPort.getByAddress(address);
+ }
+ catch (UnknownHostException e)
+ {
+ throw new AssertionError(e);
+ }
+ }
+
+ private static Gen<WarningsSnapshot> all()
+ {
+ Gen<Boolean> empty = SourceDSL.booleans().all();
+ Gen<WarningsSnapshot> nonEmpty = nonEmpty();
+ Gen<WarningsSnapshot> gen = rs ->
+ empty.generate(rs) ? empty() : nonEmpty.generate(rs);
+ return gen.describedAs(WarningsSnapshot::toString);
+ }
+
+ private static Gen<WarningsSnapshot> nonEmpty()
+ {
+ Gen<Counter> counter = counter();
+ Gen<WarningsSnapshot> gen = rs -> {
+ Builder builder = builder();
+ builder.tombstonesWarning(counter.generate(rs));
+ builder.tombstonesAbort(counter.generate(rs));
+ builder.localReadSizeWarning(counter.generate(rs));
+ builder.localReadSizeAbort(counter.generate(rs));
+ builder.rowIndexSizeWarning(counter.generate(rs));
+ builder.rowIndexSizeAbort(counter.generate(rs));
+ return builder.build();
+ };
+ return gen.assuming(WarningsSnapshot::isDefined).describedAs(WarningsSnapshot::toString);
+ }
+
+ private static Gen<Counter> counter()
+ {
+ Gen<Boolean> empty = SourceDSL.booleans().all();
+ Constraint maxValue = Constraint.between(1, Long.MAX_VALUE);
+ Gen<ImmutableSet<InetAddressAndPort>> instances = SourceDSL.arbitrary()
+ .pick(ImmutableSet.of(HOME), ImmutableSet.of(VACATION_HOME), ImmutableSet.of(HOME, VACATION_HOME));
+ Gen<Counter> gen = rs ->
+ empty.generate(rs) ? Counter.empty()
+ : new Counter(instances.generate(rs), rs.next(maxValue));
+ return gen.describedAs(Counter::toString);
+ }
+}
\ No newline at end of file
diff --git a/test/unit/org/apache/cassandra/utils/ObjectSizesTest.java b/test/unit/org/apache/cassandra/utils/ObjectSizesTest.java
new file mode 100644
index 0000000..c952fb2
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/ObjectSizesTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import java.nio.ByteBuffer;
+
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class ObjectSizesTest
+{
+ @Test
+ public void heapByteBuffer()
+ {
+ byte[] bytes = {0, 1, 2, 3, 4};
+ ByteBuffer buffer = ByteBuffer.wrap(bytes);
+
+ long empty = ObjectSizes.sizeOfEmptyHeapByteBuffer();
+ long actual = ObjectSizes.measureDeep(buffer);
+
+ assertThat(actual).isEqualTo(empty + ObjectSizes.sizeOfArray(bytes));
+ assertThat(ObjectSizes.sizeOnHeapOf(buffer)).isEqualTo(actual);
+ }
+
+ @Test
+ public void shouldIgnoreArrayOverheadForSubBuffer()
+ {
+ byte[] bytes = {0, 1, 2, 3, 4};
+ ByteBuffer buffer = ByteBuffer.wrap(bytes);
+ buffer.position(1);
+ assertThat(ObjectSizes.sizeOnHeapOf(buffer)).isEqualTo(ObjectSizes.sizeOfEmptyHeapByteBuffer() + 4);
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org