You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by vi...@apache.org on 2013/01/29 06:59:53 UTC
git commit: Speculative execution for Reads Patch by Vijay,
reviewed by jbellis for CASSANDRA-4705
Updated Branches:
refs/heads/trunk 34f08529e -> c25a6a14c
Speculative execution for Reads
Patch by Vijay, reviewed by jbellis for CASSANDRA-4705
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c25a6a14
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c25a6a14
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c25a6a14
Branch: refs/heads/trunk
Commit: c25a6a14c8d0592c9e842d42d1362042e7109599
Parents: 34f0852
Author: Vijay Parthasarathy <vi...@gmail.com>
Authored: Mon Jan 28 21:55:57 2013 -0800
Committer: Vijay Parthasarathy <vi...@gmail.com>
Committed: Mon Jan 28 21:55:57 2013 -0800
----------------------------------------------------------------------
interface/cassandra.thrift | 1 +
.../org/apache/cassandra/thrift/CfDef.java | 154 ++++++++--
src/java/org/apache/cassandra/cli/CliClient.java | 8 +-
.../org/apache/cassandra/config/CFMetaData.java | 92 ++++++
.../apache/cassandra/cql/AlterTableStatement.java | 1 +
src/java/org/apache/cassandra/cql/CFPropDefs.java | 2 +
.../cassandra/cql/CreateColumnFamilyStatement.java | 1 +
src/java/org/apache/cassandra/cql3/CFPropDefs.java | 3 +
.../org/apache/cassandra/db/ColumnFamilyStore.java | 23 ++
.../cassandra/metrics/ColumnFamilyMetrics.java | 3 +
.../cassandra/service/AbstractReadExecutor.java | 237 +++++++++++++++
.../cassandra/service/AbstractRowResolver.java | 20 ++-
.../cassandra/service/IResponseResolver.java | 2 +-
.../service/RangeSliceResponseResolver.java | 3 +-
.../org/apache/cassandra/service/ReadCallback.java | 29 +-
.../cassandra/service/RowDigestResolver.java | 5 +-
.../org/apache/cassandra/service/StorageProxy.java | 89 ++-----
.../apache/cassandra/utils/SimpleCondition.java | 2 +-
.../org/apache/cassandra/cli/CliHelp.yaml | 21 ++
19 files changed, 584 insertions(+), 112 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c25a6a14/interface/cassandra.thrift
----------------------------------------------------------------------
diff --git a/interface/cassandra.thrift b/interface/cassandra.thrift
index 69e9985..d4d5935 100644
--- a/interface/cassandra.thrift
+++ b/interface/cassandra.thrift
@@ -445,6 +445,7 @@ struct CfDef {
38: optional i32 memtable_flush_period_in_ms,
39: optional i32 default_time_to_live,
40: optional i32 index_interval,
+ 41: optional string speculative_retry="NONE",
/* All of the following are now ignored and unsupplied. */
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c25a6a14/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java
----------------------------------------------------------------------
diff --git a/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java b/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java
index 602e9df..1a87323 100644
--- a/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java
+++ b/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java
@@ -80,6 +80,7 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
private static final org.apache.thrift.protocol.TField MEMTABLE_FLUSH_PERIOD_IN_MS_FIELD_DESC = new org.apache.thrift.protocol.TField("memtable_flush_period_in_ms", org.apache.thrift.protocol.TType.I32, (short)38);
private static final org.apache.thrift.protocol.TField DEFAULT_TIME_TO_LIVE_FIELD_DESC = new org.apache.thrift.protocol.TField("default_time_to_live", org.apache.thrift.protocol.TType.I32, (short)39);
private static final org.apache.thrift.protocol.TField INDEX_INTERVAL_FIELD_DESC = new org.apache.thrift.protocol.TField("index_interval", org.apache.thrift.protocol.TType.I32, (short)40);
+ private static final org.apache.thrift.protocol.TField SPECULATIVE_RETRY_FIELD_DESC = new org.apache.thrift.protocol.TField("speculative_retry", org.apache.thrift.protocol.TType.STRING, (short)41);
private static final org.apache.thrift.protocol.TField ROW_CACHE_SIZE_FIELD_DESC = new org.apache.thrift.protocol.TField("row_cache_size", org.apache.thrift.protocol.TType.DOUBLE, (short)9);
private static final org.apache.thrift.protocol.TField KEY_CACHE_SIZE_FIELD_DESC = new org.apache.thrift.protocol.TField("key_cache_size", org.apache.thrift.protocol.TType.DOUBLE, (short)11);
private static final org.apache.thrift.protocol.TField ROW_CACHE_SAVE_PERIOD_IN_SECONDS_FIELD_DESC = new org.apache.thrift.protocol.TField("row_cache_save_period_in_seconds", org.apache.thrift.protocol.TType.I32, (short)19);
@@ -122,6 +123,7 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
public int memtable_flush_period_in_ms; // optional
public int default_time_to_live; // optional
public int index_interval; // optional
+ public String speculative_retry; // optional
/**
* @deprecated
*/
@@ -190,6 +192,7 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
MEMTABLE_FLUSH_PERIOD_IN_MS((short)38, "memtable_flush_period_in_ms"),
DEFAULT_TIME_TO_LIVE((short)39, "default_time_to_live"),
INDEX_INTERVAL((short)40, "index_interval"),
+ SPECULATIVE_RETRY((short)41, "speculative_retry"),
/**
* @deprecated
*/
@@ -294,6 +297,8 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
return DEFAULT_TIME_TO_LIVE;
case 40: // INDEX_INTERVAL
return INDEX_INTERVAL;
+ case 41: // SPECULATIVE_RETRY
+ return SPECULATIVE_RETRY;
case 9: // ROW_CACHE_SIZE
return ROW_CACHE_SIZE;
case 11: // KEY_CACHE_SIZE
@@ -375,7 +380,7 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
private static final int __MERGE_SHARDS_CHANCE_ISSET_ID = 18;
private static final int __ROW_CACHE_KEYS_TO_SAVE_ISSET_ID = 19;
private int __isset_bitfield = 0;
- private _Fields optionals[] = {_Fields.COLUMN_TYPE,_Fields.COMPARATOR_TYPE,_Fields.SUBCOMPARATOR_TYPE,_Fields.COMMENT,_Fields.READ_REPAIR_CHANCE,_Fields.COLUMN_METADATA,_Fields.GC_GRACE_SECONDS,_Fields.DEFAULT_VALIDATION_CLASS,_Fields.ID,_Fields.MIN_COMPACTION_THRESHOLD,_Fields.MAX_COMPACTION_THRESHOLD,_Fields.REPLICATE_ON_WRITE,_Fields.KEY_VALIDATION_CLASS,_Fields.KEY_ALIAS,_Fields.COMPACTION_STRATEGY,_Fields.COMPACTION_STRATEGY_OPTIONS,_Fields.COMPRESSION_OPTIONS,_Fields.BLOOM_FILTER_FP_CHANCE,_Fields.CACHING,_Fields.DCLOCAL_READ_REPAIR_CHANCE,_Fields.MEMTABLE_FLUSH_PERIOD_IN_MS,_Fields.DEFAULT_TIME_TO_LIVE,_Fields.INDEX_INTERVAL,_Fields.ROW_CACHE_SIZE,_Fields.KEY_CACHE_SIZE,_Fields.ROW_CACHE_SAVE_PERIOD_IN_SECONDS,_Fields.KEY_CACHE_SAVE_PERIOD_IN_SECONDS,_Fields.MEMTABLE_FLUSH_AFTER_MINS,_Fields.MEMTABLE_THROUGHPUT_IN_MB,_Fields.MEMTABLE_OPERATIONS_IN_MILLIONS,_Fields.MERGE_SHARDS_CHANCE,_Fields.ROW_CACHE_PROVIDER,_Fields.ROW_CACHE_KEYS_TO_SAVE};
+ private _Fields optionals[] = {_Fields.COLUMN_TYPE,_Fields.COMPARATOR_TYPE,_Fields.SUBCOMPARATOR_TYPE,_Fields.COMMENT,_Fields.READ_REPAIR_CHANCE,_Fields.COLUMN_METADATA,_Fields.GC_GRACE_SECONDS,_Fields.DEFAULT_VALIDATION_CLASS,_Fields.ID,_Fields.MIN_COMPACTION_THRESHOLD,_Fields.MAX_COMPACTION_THRESHOLD,_Fields.REPLICATE_ON_WRITE,_Fields.KEY_VALIDATION_CLASS,_Fields.KEY_ALIAS,_Fields.COMPACTION_STRATEGY,_Fields.COMPACTION_STRATEGY_OPTIONS,_Fields.COMPRESSION_OPTIONS,_Fields.BLOOM_FILTER_FP_CHANCE,_Fields.CACHING,_Fields.DCLOCAL_READ_REPAIR_CHANCE,_Fields.MEMTABLE_FLUSH_PERIOD_IN_MS,_Fields.DEFAULT_TIME_TO_LIVE,_Fields.INDEX_INTERVAL,_Fields.SPECULATIVE_RETRY,_Fields.ROW_CACHE_SIZE,_Fields.KEY_CACHE_SIZE,_Fields.ROW_CACHE_SAVE_PERIOD_IN_SECONDS,_Fields.KEY_CACHE_SAVE_PERIOD_IN_SECONDS,_Fields.MEMTABLE_FLUSH_AFTER_MINS,_Fields.MEMTABLE_THROUGHPUT_IN_MB,_Fields.MEMTABLE_OPERATIONS_IN_MILLIONS,_Fields.MERGE_SHARDS_CHANCE,_Fields.ROW_CACHE_PROVIDER,_Fields.ROW_CACHE_KEYS_TO_SAVE};
public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -434,6 +439,8 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32)));
tmpMap.put(_Fields.INDEX_INTERVAL, new org.apache.thrift.meta_data.FieldMetaData("index_interval", org.apache.thrift.TFieldRequirementType.OPTIONAL,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32)));
+ tmpMap.put(_Fields.SPECULATIVE_RETRY, new org.apache.thrift.meta_data.FieldMetaData("speculative_retry", org.apache.thrift.TFieldRequirementType.OPTIONAL,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
tmpMap.put(_Fields.ROW_CACHE_SIZE, new org.apache.thrift.meta_data.FieldMetaData("row_cache_size", org.apache.thrift.TFieldRequirementType.OPTIONAL,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.DOUBLE)));
tmpMap.put(_Fields.KEY_CACHE_SIZE, new org.apache.thrift.meta_data.FieldMetaData("key_cache_size", org.apache.thrift.TFieldRequirementType.OPTIONAL,
@@ -467,6 +474,8 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
this.dclocal_read_repair_chance = 0;
+ this.speculative_retry = "NONE";
+
}
public CfDef(
@@ -565,6 +574,9 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
this.memtable_flush_period_in_ms = other.memtable_flush_period_in_ms;
this.default_time_to_live = other.default_time_to_live;
this.index_interval = other.index_interval;
+ if (other.isSetSpeculative_retry()) {
+ this.speculative_retry = other.speculative_retry;
+ }
this.row_cache_size = other.row_cache_size;
this.key_cache_size = other.key_cache_size;
this.row_cache_save_period_in_seconds = other.row_cache_save_period_in_seconds;
@@ -624,6 +636,8 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
this.default_time_to_live = 0;
setIndex_intervalIsSet(false);
this.index_interval = 0;
+ this.speculative_retry = "NONE";
+
setRow_cache_sizeIsSet(false);
this.row_cache_size = 0.0;
setKey_cache_sizeIsSet(false);
@@ -1281,6 +1295,30 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
__isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __INDEX_INTERVAL_ISSET_ID, value);
}
+ public String getSpeculative_retry() {
+ return this.speculative_retry;
+ }
+
+ public CfDef setSpeculative_retry(String speculative_retry) {
+ this.speculative_retry = speculative_retry;
+ return this;
+ }
+
+ public void unsetSpeculative_retry() {
+ this.speculative_retry = null;
+ }
+
+ /** Returns true if field speculative_retry is set (has been assigned a value) and false otherwise */
+ public boolean isSetSpeculative_retry() {
+ return this.speculative_retry != null;
+ }
+
+ public void setSpeculative_retryIsSet(boolean value) {
+ if (!value) {
+ this.speculative_retry = null;
+ }
+ }
+
/**
* @deprecated
*/
@@ -1774,6 +1812,14 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
}
break;
+ case SPECULATIVE_RETRY:
+ if (value == null) {
+ unsetSpeculative_retry();
+ } else {
+ setSpeculative_retry((String)value);
+ }
+ break;
+
case ROW_CACHE_SIZE:
if (value == null) {
unsetRow_cache_size();
@@ -1934,6 +1980,9 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
case INDEX_INTERVAL:
return Integer.valueOf(getIndex_interval());
+ case SPECULATIVE_RETRY:
+ return getSpeculative_retry();
+
case ROW_CACHE_SIZE:
return Double.valueOf(getRow_cache_size());
@@ -2025,6 +2074,8 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
return isSetDefault_time_to_live();
case INDEX_INTERVAL:
return isSetIndex_interval();
+ case SPECULATIVE_RETRY:
+ return isSetSpeculative_retry();
case ROW_CACHE_SIZE:
return isSetRow_cache_size();
case KEY_CACHE_SIZE:
@@ -2287,6 +2338,15 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
return false;
}
+ boolean this_present_speculative_retry = true && this.isSetSpeculative_retry();
+ boolean that_present_speculative_retry = true && that.isSetSpeculative_retry();
+ if (this_present_speculative_retry || that_present_speculative_retry) {
+ if (!(this_present_speculative_retry && that_present_speculative_retry))
+ return false;
+ if (!this.speculative_retry.equals(that.speculative_retry))
+ return false;
+ }
+
boolean this_present_row_cache_size = true && this.isSetRow_cache_size();
boolean that_present_row_cache_size = true && that.isSetRow_cache_size();
if (this_present_row_cache_size || that_present_row_cache_size) {
@@ -2559,6 +2619,11 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
if (present_row_cache_keys_to_save)
builder.append(row_cache_keys_to_save);
+ boolean preset_speculative_retry = true && (isSetSpeculative_retry());
+ builder.append(preset_speculative_retry);
+ if (preset_speculative_retry)
+ builder.append(speculative_retry);
+
return builder.toHashCode();
}
@@ -2820,6 +2885,16 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
return lastComparison;
}
}
+ lastComparison = Boolean.valueOf(isSetSpeculative_retry()).compareTo(typedOther.isSetSpeculative_retry());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetSpeculative_retry()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.speculative_retry, typedOther.speculative_retry);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
lastComparison = Boolean.valueOf(isSetRow_cache_size()).compareTo(typedOther.isSetRow_cache_size());
if (lastComparison != 0) {
return lastComparison;
@@ -3141,6 +3216,16 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
sb.append(this.index_interval);
first = false;
}
+ if (isSetSpeculative_retry()) {
+ if (!first) sb.append(", ");
+ sb.append("speculative_retry:");
+ if (this.speculative_retry == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.speculative_retry);
+ }
+ first = false;
+ }
if (isSetRow_cache_size()) {
if (!first) sb.append(", ");
sb.append("row_cache_size:");
@@ -3491,6 +3576,14 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
+ case 41: // SPECULATIVE_RETRY
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.speculative_retry = iprot.readString();
+ struct.setSpeculative_retryIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
case 9: // ROW_CACHE_SIZE
if (schemeField.type == org.apache.thrift.protocol.TType.DOUBLE) {
struct.row_cache_size = iprot.readDouble();
@@ -3810,6 +3903,13 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
oprot.writeI32(struct.index_interval);
oprot.writeFieldEnd();
}
+ if (struct.speculative_retry != null) {
+ if (struct.isSetSpeculative_retry()) {
+ oprot.writeFieldBegin(SPECULATIVE_RETRY_FIELD_DESC);
+ oprot.writeString(struct.speculative_retry);
+ oprot.writeFieldEnd();
+ }
+ }
oprot.writeFieldStop();
oprot.writeStructEnd();
}
@@ -3899,37 +3999,40 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
if (struct.isSetIndex_interval()) {
optionals.set(22);
}
- if (struct.isSetRow_cache_size()) {
+ if (struct.isSetSpeculative_retry()) {
optionals.set(23);
}
- if (struct.isSetKey_cache_size()) {
+ if (struct.isSetRow_cache_size()) {
optionals.set(24);
}
- if (struct.isSetRow_cache_save_period_in_seconds()) {
+ if (struct.isSetKey_cache_size()) {
optionals.set(25);
}
- if (struct.isSetKey_cache_save_period_in_seconds()) {
+ if (struct.isSetRow_cache_save_period_in_seconds()) {
optionals.set(26);
}
- if (struct.isSetMemtable_flush_after_mins()) {
+ if (struct.isSetKey_cache_save_period_in_seconds()) {
optionals.set(27);
}
- if (struct.isSetMemtable_throughput_in_mb()) {
+ if (struct.isSetMemtable_flush_after_mins()) {
optionals.set(28);
}
- if (struct.isSetMemtable_operations_in_millions()) {
+ if (struct.isSetMemtable_throughput_in_mb()) {
optionals.set(29);
}
- if (struct.isSetMerge_shards_chance()) {
+ if (struct.isSetMemtable_operations_in_millions()) {
optionals.set(30);
}
- if (struct.isSetRow_cache_provider()) {
+ if (struct.isSetMerge_shards_chance()) {
optionals.set(31);
}
- if (struct.isSetRow_cache_keys_to_save()) {
+ if (struct.isSetRow_cache_provider()) {
optionals.set(32);
}
- oprot.writeBitSet(optionals, 33);
+ if (struct.isSetRow_cache_keys_to_save()) {
+ optionals.set(33);
+ }
+ oprot.writeBitSet(optionals, 34);
if (struct.isSetColumn_type()) {
oprot.writeString(struct.column_type);
}
@@ -4019,6 +4122,9 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
if (struct.isSetIndex_interval()) {
oprot.writeI32(struct.index_interval);
}
+ if (struct.isSetSpeculative_retry()) {
+ oprot.writeString(struct.speculative_retry);
+ }
if (struct.isSetRow_cache_size()) {
oprot.writeDouble(struct.row_cache_size);
}
@@ -4058,7 +4164,7 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
struct.setKeyspaceIsSet(true);
struct.name = iprot.readString();
struct.setNameIsSet(true);
- BitSet incoming = iprot.readBitSet(33);
+ BitSet incoming = iprot.readBitSet(34);
if (incoming.get(0)) {
struct.column_type = iprot.readString();
struct.setColumn_typeIsSet(true);
@@ -4184,42 +4290,46 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
struct.setIndex_intervalIsSet(true);
}
if (incoming.get(23)) {
+ struct.speculative_retry = iprot.readString();
+ struct.setSpeculative_retryIsSet(true);
+ }
+ if (incoming.get(24)) {
struct.row_cache_size = iprot.readDouble();
struct.setRow_cache_sizeIsSet(true);
}
- if (incoming.get(24)) {
+ if (incoming.get(25)) {
struct.key_cache_size = iprot.readDouble();
struct.setKey_cache_sizeIsSet(true);
}
- if (incoming.get(25)) {
+ if (incoming.get(26)) {
struct.row_cache_save_period_in_seconds = iprot.readI32();
struct.setRow_cache_save_period_in_secondsIsSet(true);
}
- if (incoming.get(26)) {
+ if (incoming.get(27)) {
struct.key_cache_save_period_in_seconds = iprot.readI32();
struct.setKey_cache_save_period_in_secondsIsSet(true);
}
- if (incoming.get(27)) {
+ if (incoming.get(28)) {
struct.memtable_flush_after_mins = iprot.readI32();
struct.setMemtable_flush_after_minsIsSet(true);
}
- if (incoming.get(28)) {
+ if (incoming.get(29)) {
struct.memtable_throughput_in_mb = iprot.readI32();
struct.setMemtable_throughput_in_mbIsSet(true);
}
- if (incoming.get(29)) {
+ if (incoming.get(30)) {
struct.memtable_operations_in_millions = iprot.readDouble();
struct.setMemtable_operations_in_millionsIsSet(true);
}
- if (incoming.get(30)) {
+ if (incoming.get(31)) {
struct.merge_shards_chance = iprot.readDouble();
struct.setMerge_shards_chanceIsSet(true);
}
- if (incoming.get(31)) {
+ if (incoming.get(32)) {
struct.row_cache_provider = iprot.readString();
struct.setRow_cache_providerIsSet(true);
}
- if (incoming.get(32)) {
+ if (incoming.get(33)) {
struct.row_cache_keys_to_save = iprot.readI32();
struct.setRow_cache_keys_to_saveIsSet(true);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c25a6a14/src/java/org/apache/cassandra/cli/CliClient.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cli/CliClient.java b/src/java/org/apache/cassandra/cli/CliClient.java
index 96749ad..caab476 100644
--- a/src/java/org/apache/cassandra/cli/CliClient.java
+++ b/src/java/org/apache/cassandra/cli/CliClient.java
@@ -140,7 +140,8 @@ public class CliClient
INDEX_INTERVAL,
MEMTABLE_FLUSH_PERIOD_IN_MS,
CACHING,
- DEFAULT_TIME_TO_LIVE
+ DEFAULT_TIME_TO_LIVE,
+ SPECULATIVE_RETRY
}
private static final String DEFAULT_PLACEMENT_STRATEGY = "org.apache.cassandra.locator.NetworkTopologyStrategy";
@@ -1338,6 +1339,9 @@ public class CliClient
case INDEX_INTERVAL:
cfDef.setIndex_interval(Integer.parseInt(mValue));
break;
+ case SPECULATIVE_RETRY:
+ cfDef.setSpeculative_retry(CliUtils.unescapeSQLString(mValue));
+ break;
default:
//must match one of the above or we'd throw an exception at the valueOf statement above.
assert(false);
@@ -1793,6 +1797,7 @@ public class CliClient
writeAttr(output, false, "compaction_strategy", cfDef.compaction_strategy);
writeAttr(output, false, "caching", cfDef.caching);
writeAttr(output, false, "default_time_to_live", cfDef.default_time_to_live);
+ writeAttr(output, false, "speculative_retry", cfDef.speculative_retry);
if (cfDef.isSetBloom_filter_fp_chance())
writeAttr(output, false, "bloom_filter_fp_chance", cfDef.bloom_filter_fp_chance);
@@ -2163,6 +2168,7 @@ public class CliClient
sessionState.out.printf(" Default time to live: %s%n", cf_def.default_time_to_live);
sessionState.out.printf(" Bloom Filter FP chance: %s%n", cf_def.isSetBloom_filter_fp_chance() ? cf_def.bloom_filter_fp_chance : "default");
sessionState.out.printf(" Index interval: %s%n", cf_def.isSetIndex_interval() ? cf_def.index_interval : "default");
+ sessionState.out.printf(" Speculative Retry: %s%n", cf_def.speculative_retry);
// if we have connection to the cfMBean established
if (cfMBean != null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c25a6a14/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index 2c62139..19aaec8 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -25,6 +25,7 @@ import java.nio.ByteBuffer;
import java.util.*;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
import com.google.common.collect.MapDifference;
import com.google.common.collect.Maps;
import org.apache.commons.lang.ArrayUtils;
@@ -80,6 +81,7 @@ public final class CFMetaData
public final static ByteBuffer DEFAULT_KEY_NAME = ByteBufferUtil.bytes("KEY");
public final static Caching DEFAULT_CACHING_STRATEGY = Caching.KEYS_ONLY;
public final static int DEFAULT_DEFAULT_TIME_TO_LIVE = 0;
+ public final static SpeculativeRetry DEFAULT_SPECULATIVE_RETRY = new SpeculativeRetry(SpeculativeRetry.RetryType.NONE, 0);
public final static int DEFAULT_INDEX_INTERVAL = 128;
// Note that this is the default only for user created tables
@@ -142,6 +144,7 @@ public final class CFMetaData
+ "compaction_strategy_options text,"
+ "default_read_consistency text,"
+ "default_write_consistency text,"
+ + "speculative_retry text,"
+ "PRIMARY KEY (keyspace_name, columnfamily_name)"
+ ") WITH COMMENT='ColumnFamily definitions' AND gc_grace_seconds=8640");
public static final CFMetaData SchemaColumnsCf = compile(10, "CREATE TABLE " + SystemTable.SCHEMA_COLUMNS_CF + "("
@@ -248,6 +251,75 @@ public final class CFMetaData
}
}
+ public static class SpeculativeRetry
+ {
+ public enum RetryType
+ {
+ NONE, CUSTOM, PERCENTILE, ALWAYS;
+ }
+
+ public final RetryType type;
+ public final long value;
+
+ private SpeculativeRetry(RetryType type, long value)
+ {
+ this.type = type;
+ this.value = value;
+ }
+
+ public static SpeculativeRetry fromString(String retry) throws ConfigurationException
+ {
+ String name = retry.toUpperCase();
+ try
+ {
+ if (name.endsWith(RetryType.PERCENTILE.toString()))
+ {
+ long value = Long.parseLong(name.substring(0, name.length() - 10));
+ if (value > 100 || value < 0)
+ throw new ConfigurationException("PERCENTILE should be between 0 and 100");
+ return new SpeculativeRetry(RetryType.PERCENTILE, value);
+ }
+ else if (name.endsWith("MS"))
+ {
+ long value = Long.parseLong(name.substring(0, name.length() - 2));
+ return new SpeculativeRetry(RetryType.CUSTOM, value);
+ }
+ else
+ {
+ return new SpeculativeRetry(RetryType.valueOf(name), 0);
+ }
+ }
+ catch (IllegalArgumentException e)
+ {
+ // ignore to throw the below exception.
+ }
+ throw new ConfigurationException("invalid speculative_retry type: " + retry);
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (! (obj instanceof SpeculativeRetry))
+ return false;
+ SpeculativeRetry rhs = (SpeculativeRetry) obj;
+ return Objects.equal(type, rhs.type) && Objects.equal(value, rhs.value);
+ }
+
+ @Override
+ public String toString()
+ {
+ switch (type)
+ {
+ case PERCENTILE:
+ return value + "PERCENTILE";
+ case CUSTOM:
+ return value + "MS";
+ default:
+ return type.toString();
+ }
+ }
+ }
+
//REQUIRED
public final UUID cfId; // internal id, never exposed to user
public final String ksName; // name of keyspace
@@ -274,6 +346,7 @@ public final class CFMetaData
private volatile int indexInterval = DEFAULT_INDEX_INTERVAL;
private int memtableFlushPeriod = 0;
private volatile int defaultTimeToLive = DEFAULT_DEFAULT_TIME_TO_LIVE;
+ private volatile SpeculativeRetry speculativeRetry = DEFAULT_SPECULATIVE_RETRY;
volatile Map<ByteBuffer, ColumnDefinition> column_metadata = new HashMap<ByteBuffer,ColumnDefinition>();
public volatile Class<? extends AbstractCompactionStrategy> compactionStrategyClass = DEFAULT_COMPACTION_STRATEGY_CLASS;
@@ -307,6 +380,7 @@ public final class CFMetaData
public CFMetaData indexInterval(int prop) {indexInterval = prop; return this;}
public CFMetaData memtableFlushPeriod(int prop) {memtableFlushPeriod = prop; return this;}
public CFMetaData defaultTimeToLive(int prop) {defaultTimeToLive = prop; return this;}
+ public CFMetaData speculativeRetry(SpeculativeRetry prop) {speculativeRetry = prop; return this;}
public CFMetaData(String keyspace, String name, ColumnFamilyType type, AbstractType<?> comp, AbstractType<?> subcc)
{
@@ -401,6 +475,7 @@ public final class CFMetaData
.dcLocalReadRepairChance(0.0)
.gcGraceSeconds(0)
.caching(indexCaching)
+ .speculativeRetry(parent.speculativeRetry)
.compactionStrategyClass(parent.compactionStrategyClass)
.compactionStrategyOptions(parent.compactionStrategyOptions)
.reloadSecondaryIndexMetadata(parent);
@@ -455,6 +530,7 @@ public final class CFMetaData
.caching(oldCFMD.caching)
.defaultTimeToLive(oldCFMD.defaultTimeToLive)
.indexInterval(oldCFMD.indexInterval)
+ .speculativeRetry(oldCFMD.speculativeRetry)
.memtableFlushPeriod(oldCFMD.memtableFlushPeriod);
}
@@ -586,6 +662,11 @@ public final class CFMetaData
return indexInterval;
}
+ public SpeculativeRetry getSpeculativeRetry()
+ {
+ return speculativeRetry;
+ }
+
public int getMemtableFlushPeriod()
{
return memtableFlushPeriod;
@@ -635,6 +716,7 @@ public final class CFMetaData
.append(caching, rhs.caching)
.append(defaultTimeToLive, rhs.defaultTimeToLive)
.append(indexInterval, rhs.indexInterval)
+ .append(speculativeRetry, rhs.speculativeRetry)
.isEquals();
}
@@ -667,6 +749,7 @@ public final class CFMetaData
.append(caching)
.append(defaultTimeToLive)
.append(indexInterval)
+ .append(speculativeRetry)
.toHashCode();
}
@@ -752,6 +835,8 @@ public final class CFMetaData
newCFMD.dcLocalReadRepairChance(cf_def.dclocal_read_repair_chance);
if (cf_def.isSetIndex_interval())
newCFMD.indexInterval(cf_def.index_interval);
+ if (cf_def.isSetSpeculative_retry())
+ newCFMD.speculativeRetry(SpeculativeRetry.fromString(cf_def.speculative_retry));
CompressionParameters cp = CompressionParameters.create(cf_def.compression_options);
@@ -835,6 +920,7 @@ public final class CFMetaData
memtableFlushPeriod = cfm.memtableFlushPeriod;
caching = cfm.caching;
defaultTimeToLive = cfm.defaultTimeToLive;
+ speculativeRetry = cfm.speculativeRetry;
MapDifference<ByteBuffer, ColumnDefinition> columnDiff = Maps.difference(column_metadata, cfm.column_metadata);
// columns that are no longer needed
@@ -988,6 +1074,7 @@ public final class CFMetaData
def.setMemtable_flush_period_in_ms(memtableFlushPeriod);
def.setCaching(caching.toString());
def.setDefault_time_to_live(defaultTimeToLive);
+ def.setSpeculative_retry(speculativeRetry.toString());
return def;
}
@@ -1317,6 +1404,7 @@ public final class CFMetaData
cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, "bloom_filter_fp_chance"));
cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, "caching"));
cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, "default_time_to_live"));
+ cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, "speculative_retry"));
cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, "compaction_strategy_class"));
cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, "compression_parameters"));
cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, "value_alias"));
@@ -1389,6 +1477,7 @@ public final class CFMetaData
cf.addColumn(Column.create(json(aliasesAsStrings(columnAliases)), timestamp, cfName, "column_aliases"));
cf.addColumn(Column.create(json(compactionStrategyOptions), timestamp, cfName, "compaction_strategy_options"));
cf.addColumn(Column.create(indexInterval, timestamp, cfName, "index_interval"));
+ cf.addColumn(Column.create(speculativeRetry.toString(), timestamp, cfName, "speculative_retry"));
}
// Package protected for use by tests
@@ -1431,6 +1520,8 @@ public final class CFMetaData
cfm.caching(Caching.valueOf(result.getString("caching")));
if (result.has("default_time_to_live"))
cfm.defaultTimeToLive(result.getInt("default_time_to_live"));
+ if (result.has("speculative_retry"))
+ cfm.speculativeRetry(SpeculativeRetry.fromString(result.getString("speculative_retry")));
cfm.compactionStrategyClass(createCompactionStrategy(result.getString("compaction_strategy_class")));
cfm.compressionParameters(CompressionParameters.create(fromJsonMap(result.getString("compression_parameters"))));
cfm.columnAliases(aliasesFromStrings(fromJsonList(result.getString("column_aliases"))));
@@ -1603,6 +1694,7 @@ public final class CFMetaData
.append("memtable_flush_period_in_ms", memtableFlushPeriod)
.append("caching", caching)
.append("defaultTimeToLive", defaultTimeToLive)
+ .append("speculative_retry", speculativeRetry)
.append("indexInterval", indexInterval)
.toString();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c25a6a14/src/java/org/apache/cassandra/cql/AlterTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/AlterTableStatement.java b/src/java/org/apache/cassandra/cql/AlterTableStatement.java
index c2d6ca8..a4ad501 100644
--- a/src/java/org/apache/cassandra/cql/AlterTableStatement.java
+++ b/src/java/org/apache/cassandra/cql/AlterTableStatement.java
@@ -187,6 +187,7 @@ public class AlterTableStatement
cfm.maxCompactionThreshold(cfProps.getPropertyInt(CFPropDefs.KW_MAXCOMPACTIONTHRESHOLD, cfm.getMaxCompactionThreshold()));
cfm.caching(CFMetaData.Caching.fromString(cfProps.getPropertyString(CFPropDefs.KW_CACHING, cfm.getCaching().toString())));
cfm.defaultTimeToLive(cfProps.getPropertyInt(CFPropDefs.KW_DEFAULT_TIME_TO_LIVE, cfm.getDefaultTimeToLive()));
+ cfm.speculativeRetry(CFMetaData.SpeculativeRetry.fromString(cfProps.getPropertyString(CFPropDefs.KW_SPECULATIVE_RETRY, cfm.getSpeculativeRetry().toString())));
cfm.bloomFilterFpChance(cfProps.getPropertyDouble(CFPropDefs.KW_BF_FP_CHANCE, cfm.getBloomFilterFpChance()));
cfm.memtableFlushPeriod(cfProps.getPropertyInt(CFPropDefs.KW_MEMTABLE_FLUSH_PERIOD, cfm.getMemtableFlushPeriod()));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c25a6a14/src/java/org/apache/cassandra/cql/CFPropDefs.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/CFPropDefs.java b/src/java/org/apache/cassandra/cql/CFPropDefs.java
index 6bbb9ad..b04a71f 100644
--- a/src/java/org/apache/cassandra/cql/CFPropDefs.java
+++ b/src/java/org/apache/cassandra/cql/CFPropDefs.java
@@ -49,6 +49,7 @@ public class CFPropDefs {
public static final String KW_COMPACTION_STRATEGY_CLASS = "compaction_strategy_class";
public static final String KW_CACHING = "caching";
public static final String KW_DEFAULT_TIME_TO_LIVE = "default_time_to_live";
+ public static final String KW_SPECULATIVE_RETRY = "speculative_retry";
public static final String KW_BF_FP_CHANCE = "bloom_filter_fp_chance";
public static final String KW_MEMTABLE_FLUSH_PERIOD = "memtable_flush_period_in_ms";
@@ -90,6 +91,7 @@ public class CFPropDefs {
keywords.add(KW_COMPACTION_STRATEGY_CLASS);
keywords.add(KW_CACHING);
keywords.add(KW_DEFAULT_TIME_TO_LIVE);
+ keywords.add(KW_SPECULATIVE_RETRY);
keywords.add(KW_BF_FP_CHANCE);
keywords.add(KW_MEMTABLE_FLUSH_PERIOD);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c25a6a14/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java b/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
index 6f848e2..be56e90 100644
--- a/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
+++ b/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
@@ -194,6 +194,7 @@ public class CreateColumnFamilyStatement
.compactionStrategyOptions(cfProps.compactionStrategyOptions)
.compressionParameters(CompressionParameters.create(cfProps.compressionParameters))
.caching(CFMetaData.Caching.fromString(getPropertyString(CFPropDefs.KW_CACHING, CFMetaData.DEFAULT_CACHING_STRATEGY.toString())))
+ .speculativeRetry(CFMetaData.SpeculativeRetry.fromString(getPropertyString(CFPropDefs.KW_SPECULATIVE_RETRY, CFMetaData.DEFAULT_SPECULATIVE_RETRY.toString())))
.bloomFilterFpChance(getPropertyDouble(CFPropDefs.KW_BF_FP_CHANCE, null))
.memtableFlushPeriod(getPropertyInt(CFPropDefs.KW_MEMTABLE_FLUSH_PERIOD, 0))
.defaultTimeToLive(getPropertyInt(CFPropDefs.KW_DEFAULT_TIME_TO_LIVE, CFMetaData.DEFAULT_DEFAULT_TIME_TO_LIVE));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c25a6a14/src/java/org/apache/cassandra/cql3/CFPropDefs.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/CFPropDefs.java b/src/java/org/apache/cassandra/cql3/CFPropDefs.java
index 5d0b249..f598f51 100644
--- a/src/java/org/apache/cassandra/cql3/CFPropDefs.java
+++ b/src/java/org/apache/cassandra/cql3/CFPropDefs.java
@@ -41,6 +41,7 @@ public class CFPropDefs extends PropertyDefinitions
public static final String KW_REPLICATEONWRITE = "replicate_on_write";
public static final String KW_CACHING = "caching";
public static final String KW_DEFAULT_TIME_TO_LIVE = "default_time_to_live";
+ public static final String KW_SPECULATIVE_RETRY = "speculative_retry";
public static final String KW_BF_FP_CHANCE = "bloom_filter_fp_chance";
public static final String KW_MEMTABLE_FLUSH_PERIOD = "memtable_flush_period_in_ms";
@@ -61,6 +62,7 @@ public class CFPropDefs extends PropertyDefinitions
keywords.add(KW_REPLICATEONWRITE);
keywords.add(KW_CACHING);
keywords.add(KW_DEFAULT_TIME_TO_LIVE);
+ keywords.add(KW_SPECULATIVE_RETRY);
keywords.add(KW_BF_FP_CHANCE);
keywords.add(KW_COMPACTION);
keywords.add(KW_COMPRESSION);
@@ -139,6 +141,7 @@ public class CFPropDefs extends PropertyDefinitions
cfm.maxCompactionThreshold(toInt(KW_MAXCOMPACTIONTHRESHOLD, getCompactionOptions().get(KW_MAXCOMPACTIONTHRESHOLD), cfm.getMaxCompactionThreshold()));
cfm.caching(CFMetaData.Caching.fromString(getString(KW_CACHING, cfm.getCaching().toString())));
cfm.defaultTimeToLive(getInt(KW_DEFAULT_TIME_TO_LIVE, cfm.getDefaultTimeToLive()));
+ cfm.speculativeRetry(CFMetaData.SpeculativeRetry.fromString(getString(KW_SPECULATIVE_RETRY, cfm.getSpeculativeRetry().toString())));
cfm.memtableFlushPeriod(getInt(KW_MEMTABLE_FLUSH_PERIOD, cfm.getMemtableFlushPeriod()));
if (compactionStrategyClass != null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c25a6a14/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index ca67a75..03db0df 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -44,6 +44,7 @@ import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.*;
+import org.apache.cassandra.config.CFMetaData.SpeculativeRetry;
import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.commitlog.ReplayPosition;
@@ -115,6 +116,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
private final AtomicLong liveRatioComputedAt = new AtomicLong(32);
public final ColumnFamilyMetrics metric;
+ public volatile long sampleLatency = Long.MAX_VALUE;
public void reload()
{
@@ -321,6 +323,27 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
throw new RuntimeException(e);
}
+ StorageService.optionalTasks.scheduleWithFixedDelay(new Runnable()
+ {
+ public void run()
+ {
+ SpeculativeRetry retryPolicy = ColumnFamilyStore.this.metadata.getSpeculativeRetry();
+ switch (retryPolicy.type)
+ {
+ case PERCENTILE:
+ double percentile = retryPolicy.value / 100d;
+ // get percentile and convert it to MS insted of dealing with micro
+ sampleLatency = (long) (metric.readLatency.latency.getSnapshot().getValue(percentile) / 1000);
+ break;
+ case CUSTOM:
+ sampleLatency = retryPolicy.value;
+ break;
+ default:
+ sampleLatency = Long.MAX_VALUE;
+ break;
+ }
+ }
+ }, 30, 30, TimeUnit.SECONDS);
}
/** call when dropping or renaming a CF. Performs mbean housekeeping and invalidates CFS to other operations */
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c25a6a14/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java b/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
index a42f74d..7c7cfa6 100644
--- a/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
@@ -79,6 +79,8 @@ public class ColumnFamilyMetrics
private final MetricNameFactory factory;
+ public final Counter speculativeRetry;
+
// for backward compatibility
@Deprecated public final EstimatedHistogram sstablesPerRead = new EstimatedHistogram(35);
@Deprecated public final EstimatedHistogram recentSSTablesPerRead = new EstimatedHistogram(35);
@@ -274,6 +276,7 @@ public class ColumnFamilyMetrics
return total;
}
});
+ speculativeRetry = Metrics.newCounter(factory.createMetricName("SpeculativeRetry"));
}
public void updateSSTableIterated(int count)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c25a6a14/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
new file mode 100644
index 0000000..c0361bf
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.ReadResponse;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.exceptions.ReadTimeoutException;
+import org.apache.cassandra.exceptions.UnavailableException;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageProxy.LocalReadRunnable;
+import org.apache.cassandra.utils.FBUtilities;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.primitives.Longs;
+
+public abstract class AbstractReadExecutor
+{
+ private static final Logger logger = LoggerFactory.getLogger(AbstractReadExecutor.class);
+ protected final ReadCallback<ReadResponse, Row> handler;
+ protected final ReadCommand command;
+ protected final RowDigestResolver resolver;
+ protected final List<InetAddress> unfiltered;
+ protected final List<InetAddress> endpoints;
+ protected final ColumnFamilyStore cfs;
+
+ AbstractReadExecutor(ColumnFamilyStore cfs,
+ ReadCommand command,
+ ConsistencyLevel consistency_level,
+ List<InetAddress> allReplicas,
+ List<InetAddress> queryTargets)
+ throws UnavailableException
+ {
+ unfiltered = allReplicas;
+ this.endpoints = queryTargets;
+ this.resolver = new RowDigestResolver(command.table, command.key);
+ this.handler = new ReadCallback<ReadResponse, Row>(resolver, consistency_level, command, this.endpoints);
+ this.command = command;
+ this.cfs = cfs;
+
+ handler.assureSufficientLiveNodes();
+ assert !handler.endpoints.isEmpty();
+ }
+
+ void executeAsync()
+ {
+ // The data-request message is sent to dataPoint, the node that will actually get the data for us
+ InetAddress dataPoint = handler.endpoints.get(0);
+ if (dataPoint.equals(FBUtilities.getBroadcastAddress()) && StorageProxy.OPTIMIZE_LOCAL_REQUESTS)
+ {
+ logger.trace("reading data locally");
+ StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(command, handler));
+ }
+ else
+ {
+ logger.trace("reading data from {}", dataPoint);
+ MessagingService.instance().sendRR(command.createMessage(), dataPoint, handler);
+ }
+
+ if (handler.endpoints.size() == 1)
+ return;
+
+ // send the other endpoints a digest request
+ ReadCommand digestCommand = command.copy();
+ digestCommand.setDigestQuery(true);
+ MessageOut<?> message = null;
+ for (int i = 1; i < handler.endpoints.size(); i++)
+ {
+ InetAddress digestPoint = handler.endpoints.get(i);
+ if (digestPoint.equals(FBUtilities.getBroadcastAddress()))
+ {
+ logger.trace("reading digest locally");
+ StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(digestCommand, handler));
+ }
+ else
+ {
+ logger.trace("reading digest from {}", digestPoint);
+ // (We lazy-construct the digest Message object since it may not be necessary if we
+ // are doing a local digest read, or no digest reads at all.)
+ if (message == null)
+ message = digestCommand.createMessage();
+ MessagingService.instance().sendRR(message, digestPoint, handler);
+ }
+ }
+ }
+
+ void speculate()
+ {
+ // noop by default.
+ }
+
+ Row get() throws ReadTimeoutException, DigestMismatchException, IOException
+ {
+ return handler.get();
+ }
+
+ public static AbstractReadExecutor getReadExecutor(ReadCommand command, ConsistencyLevel consistency_level) throws UnavailableException
+ {
+ Table table = Table.open(command.table);
+ ColumnFamilyStore cfs = table.getColumnFamilyStore(command.cfName);
+ List<InetAddress> allReplicas = StorageProxy.getLiveSortedEndpoints(table, command.key);
+ List<InetAddress> queryTargets = consistency_level.filterForQuery(table, allReplicas, cfs.metadata.newReadRepairDecision());
+
+ switch (cfs.metadata.getSpeculativeRetry().type)
+ {
+ case ALWAYS:
+ return new SpeculateAlwaysExecutor(cfs, command, consistency_level, allReplicas, queryTargets);
+ case PERCENTILE:
+ case CUSTOM:
+ return queryTargets.size() < allReplicas.size()
+ ? new SpeculativeReadExecutor(cfs, command, consistency_level, allReplicas, queryTargets)
+ : new DefaultReadExecutor(cfs, command, consistency_level, allReplicas, queryTargets);
+ default:
+ return new DefaultReadExecutor(cfs, command, consistency_level, allReplicas, queryTargets);
+ }
+ }
+
+ private static class DefaultReadExecutor extends AbstractReadExecutor
+ {
+ public DefaultReadExecutor(ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistency_level, List<InetAddress> allReplicas, List<InetAddress> queryTargets) throws UnavailableException
+ {
+ super(cfs, command, consistency_level, allReplicas, queryTargets);
+ }
+ }
+
+ private static class SpeculativeReadExecutor extends AbstractReadExecutor
+ {
+ public SpeculativeReadExecutor(ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistency_level, List<InetAddress> allReplicas, List<InetAddress> queryTargets) throws UnavailableException
+ {
+ super(cfs, command, consistency_level, allReplicas, queryTargets);
+ assert handler.endpoints.size() < unfiltered.size();
+ }
+
+ @Override
+ void speculate()
+ {
+ // no latency information, or we're overloaded
+ if (cfs.sampleLatency > command.getTimeout())
+ return;
+
+ if (!handler.await(cfs.sampleLatency))
+ {
+ InetAddress endpoint = unfiltered.get(handler.endpoints.size());
+
+ // could be waiting on the data, or on enough digests
+ ReadCommand scommand = command;
+ if (resolver.getData() != null)
+ {
+ scommand = command.copy();
+ scommand.setDigestQuery(true);
+ }
+
+ logger.trace("Speculating read retry on {}", endpoint);
+ MessagingService.instance().sendRR(scommand.createMessage(), endpoint, handler);
+ cfs.metric.speculativeRetry.inc();
+ }
+ }
+ }
+
+ private static class SpeculateAlwaysExecutor extends AbstractReadExecutor
+ {
+ public SpeculateAlwaysExecutor(ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistency_level, List<InetAddress> allReplicas, List<InetAddress> queryTargets) throws UnavailableException
+ {
+ super(cfs, command, consistency_level, allReplicas, queryTargets);
+ }
+
+ @Override
+ void executeAsync()
+ {
+ int limit = unfiltered.size() >= 2 ? 2 : 1;
+ for (int i = 0; i < limit; i++)
+ {
+ InetAddress endpoint = unfiltered.get(i);
+ if (endpoint.equals(FBUtilities.getBroadcastAddress()))
+ {
+ logger.trace("reading full data locally");
+ StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(command, handler));
+ }
+ else
+ {
+ logger.trace("reading full data from {}", endpoint);
+ MessagingService.instance().sendRR(command.createMessage(), endpoint, handler);
+ }
+ }
+ if (handler.endpoints.size() <= limit)
+ return;
+
+ ReadCommand digestCommand = command.copy();
+ digestCommand.setDigestQuery(true);
+ MessageOut<?> message = digestCommand.createMessage();
+ for (int i = limit; i < handler.endpoints.size(); i++)
+ {
+ // Send the message
+ InetAddress endpoint = handler.endpoints.get(i);
+ if (endpoint.equals(FBUtilities.getBroadcastAddress()))
+ {
+ logger.trace("reading data locally, isDigest: {}", command.isDigestQuery());
+ StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(digestCommand, handler));
+ }
+ else
+ {
+ logger.trace("reading full data from {}, isDigest: {}", endpoint, command.isDigestQuery());
+ MessagingService.instance().sendRR(message, endpoint, handler);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c25a6a14/src/java/org/apache/cassandra/service/AbstractRowResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractRowResolver.java b/src/java/org/apache/cassandra/service/AbstractRowResolver.java
index bdffc0b..b6204e2 100644
--- a/src/java/org/apache/cassandra/service/AbstractRowResolver.java
+++ b/src/java/org/apache/cassandra/service/AbstractRowResolver.java
@@ -43,9 +43,27 @@ public abstract class AbstractRowResolver implements IResponseResolver<ReadRespo
this.table = table;
}
- public void preprocess(MessageIn<ReadResponse> message)
+ public boolean preprocess(MessageIn<ReadResponse> message)
{
+ MessageIn<ReadResponse> toReplace = null;
+ for (MessageIn<ReadResponse> reply : replies)
+ {
+ if (reply.from.equals(message.from))
+ {
+ if (!message.payload.isDigestQuery())
+ toReplace = reply;
+ break;
+ }
+ }
+ // replace old message
+ if (toReplace != null)
+ {
+ replies.remove(toReplace);
+ replies.add(message);
+ return false;
+ }
replies.add(message);
+ return true;
}
public Iterable<MessageIn<ReadResponse>> getMessages()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c25a6a14/src/java/org/apache/cassandra/service/IResponseResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/IResponseResolver.java b/src/java/org/apache/cassandra/service/IResponseResolver.java
index 4ac226f..6e1c04a 100644
--- a/src/java/org/apache/cassandra/service/IResponseResolver.java
+++ b/src/java/org/apache/cassandra/service/IResponseResolver.java
@@ -40,6 +40,6 @@ public interface IResponseResolver<TMessage, TResolved> {
*/
public TResolved getData() throws IOException;
- public void preprocess(MessageIn<TMessage> message);
+ public boolean preprocess(MessageIn<TMessage> message);
public Iterable<MessageIn<TMessage>> getMessages();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c25a6a14/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java b/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
index 1dfd01e..2049954 100644
--- a/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
+++ b/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
@@ -88,9 +88,10 @@ public class RangeSliceResponseResolver implements IResponseResolver<RangeSliceR
return resolvedRows;
}
- public void preprocess(MessageIn message)
+ public boolean preprocess(MessageIn message)
{
responses.add(message);
+ return true;
}
public boolean isDataPresent()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c25a6a14/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index f1ca96e..92c8a85 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -24,14 +24,12 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.cassandra.config.Schema;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.Table;
@@ -52,7 +50,7 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
public final IResponseResolver<TMessage, TResolved> resolver;
private final SimpleCondition condition = new SimpleCondition();
- private final long startTime;
+ final long startTime;
private final int blockfor;
final List<InetAddress> endpoints;
private final IReadCommand command;
@@ -86,29 +84,36 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
return new ReadCallback(newResolver, consistencyLevel, blockfor, command, table, endpoints);
}
- public TResolved get() throws ReadTimeoutException, DigestMismatchException, IOException
+ public boolean await(long interimTimeout)
{
- long timeout = command.getTimeout() - (System.currentTimeMillis() - startTime);
- boolean success;
+ long timeout = interimTimeout - (System.currentTimeMillis() - startTime);
try
{
- success = condition.await(timeout, TimeUnit.MILLISECONDS);
+ return condition.await(timeout, TimeUnit.MILLISECONDS);
}
catch (InterruptedException ex)
{
throw new AssertionError(ex);
}
+ }
- if (!success)
- throw new ReadTimeoutException(consistencyLevel, received.get(), blockfor, resolver.isDataPresent());
-
+ public TResolved get() throws ReadTimeoutException, DigestMismatchException, IOException
+ {
+ long timeout = command.getTimeout() - (System.currentTimeMillis() - startTime);
+ if (!await(timeout))
+ {
+ ReadTimeoutException ex = new ReadTimeoutException(consistencyLevel, received.get(), blockfor, resolver.isDataPresent());
+ if (logger.isDebugEnabled())
+ logger.debug("Read timeout: {}", ex.toString());
+ throw ex;
+ }
return blockfor == 1 ? resolver.getData() : resolver.resolve();
}
public void response(MessageIn<TMessage> message)
{
- resolver.preprocess(message);
- int n = waitingFor(message)
+ boolean hasAdded = resolver.preprocess(message);
+ int n = (waitingFor(message) && hasAdded)
? received.incrementAndGet()
: received.get();
if (n >= blockfor && resolver.isDataPresent())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c25a6a14/src/java/org/apache/cassandra/service/RowDigestResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RowDigestResolver.java b/src/java/org/apache/cassandra/service/RowDigestResolver.java
index eeccbeb..c222865 100644
--- a/src/java/org/apache/cassandra/service/RowDigestResolver.java
+++ b/src/java/org/apache/cassandra/service/RowDigestResolver.java
@@ -35,7 +35,7 @@ public class RowDigestResolver extends AbstractRowResolver
/**
* Special case of resolve() so that CL.ONE reads never throw DigestMismatchException in the foreground
*/
- public Row getData() throws IOException
+ public Row getData()
{
for (MessageIn<ReadResponse> message : replies)
{
@@ -43,8 +43,7 @@ public class RowDigestResolver extends AbstractRowResolver
if (!result.isDigestQuery())
return result.row();
}
-
- throw new AssertionError("getData should not be invoked when no data is present");
+ return null;
}
/*
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c25a6a14/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index f052378..4249fb3 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -37,7 +37,6 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.*;
@@ -68,7 +67,7 @@ public class StorageProxy implements StorageProxyMBean
{
public static final String MBEAN_NAME = "org.apache.cassandra.db:type=StorageProxy";
private static final Logger logger = LoggerFactory.getLogger(StorageProxy.class);
- private static final boolean OPTIMIZE_LOCAL_REQUESTS = true; // set to false to test messagingservice path on single node
+ static final boolean OPTIMIZE_LOCAL_REQUESTS = true; // set to false to test messagingservice path on single node
public static final String UNREACHABLE = "UNREACHABLE";
@@ -868,7 +867,7 @@ public class StorageProxy implements StorageProxyMBean
do
{
List<ReadCommand> commands = commandsToRetry.isEmpty() ? initialCommands : commandsToRetry;
- ReadCallback<ReadResponse, Row>[] readCallbacks = new ReadCallback[commands.size()];
+ AbstractReadExecutor[] readExecutors = new AbstractReadExecutor[commands.size()];
if (!commandsToRetry.isEmpty())
logger.debug("Retrying {} commands", commandsToRetry.size());
@@ -877,101 +876,51 @@ public class StorageProxy implements StorageProxyMBean
for (int i = 0; i < commands.size(); i++)
{
ReadCommand command = commands.get(i);
- Table table = Table.open(command.getKeyspace());
assert !command.isDigestQuery();
logger.trace("Command/ConsistencyLevel is {}/{}", command, consistency_level);
- List<InetAddress> endpoints = getLiveSortedEndpoints(table, command.key);
- CFMetaData cfm = Schema.instance.getCFMetaData(command.getKeyspace(), command.getColumnFamilyName());
- endpoints = consistency_level.filterForQuery(table, endpoints, cfm.newReadRepairDecision());
-
- RowDigestResolver resolver = new RowDigestResolver(command.table, command.key);
- ReadCallback<ReadResponse, Row> handler = new ReadCallback(resolver, consistency_level, command, endpoints);
- handler.assureSufficientLiveNodes();
- assert !endpoints.isEmpty();
- readCallbacks[i] = handler;
-
- // The data-request message is sent to dataPoint, the node that will actually get the data for us
- InetAddress dataPoint = endpoints.get(0);
- if (dataPoint.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
- {
- logger.trace("reading data locally");
- StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(command, handler));
- }
- else
- {
- logger.trace("reading data from {}", dataPoint);
- MessagingService.instance().sendRR(command.createMessage(), dataPoint, handler);
- }
-
- if (endpoints.size() == 1)
- continue;
-
- // send the other endpoints a digest request
- ReadCommand digestCommand = command.copy();
- digestCommand.setDigestQuery(true);
- MessageOut message = null;
- for (InetAddress digestPoint : endpoints.subList(1, endpoints.size()))
- {
- if (digestPoint.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
- {
- logger.trace("reading digest locally");
- StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(digestCommand, handler));
- }
- else
- {
- logger.trace("reading digest from {}", digestPoint);
- // (We lazy-construct the digest Message object since it may not be necessary if we
- // are doing a local digest read, or no digest reads at all.)
- if (message == null)
- message = digestCommand.createMessage();
- MessagingService.instance().sendRR(message, digestPoint, handler);
- }
- }
+ AbstractReadExecutor exec = AbstractReadExecutor.getReadExecutor(command, consistency_level);
+ exec.executeAsync();
+ readExecutors[i] = exec;
}
+ for (AbstractReadExecutor exec: readExecutors)
+ exec.speculate();
+
// read results and make a second pass for any digest mismatches
List<ReadCommand> repairCommands = null;
List<ReadCallback<ReadResponse, Row>> repairResponseHandlers = null;
- for (int i = 0; i < commands.size(); i++)
+ for (AbstractReadExecutor exec: readExecutors)
{
- ReadCallback<ReadResponse, Row> handler = readCallbacks[i];
- ReadCommand command = commands.get(i);
try
{
- Row row = handler.get();
+ Row row = exec.get();
if (row != null)
{
- command.maybeTrim(row);
+ exec.command.maybeTrim(row);
rows.add(row);
}
- }
- catch (ReadTimeoutException ex)
- {
if (logger.isDebugEnabled())
- logger.debug("Read timeout: {}", ex.toString());
- throw ex;
+ logger.debug("Read: " + (System.currentTimeMillis() - exec.handler.startTime) + " ms.");
}
catch (DigestMismatchException ex)
{
- logger.debug("Digest mismatch: {}", ex.toString());
+ logger.trace("Digest mismatch: {}", ex);
// Do a full data read to resolve the correct response (and repair node that need be)
- RowDataResolver resolver = new RowDataResolver(command.table, command.key, command.filter());
- ReadCallback<ReadResponse, Row> repairHandler = handler.withNewResolver(resolver);
+ RowDataResolver resolver = new RowDataResolver(exec.command.table, exec.command.key, exec.command.filter());
+ ReadCallback<ReadResponse, Row> repairHandler = exec.handler.withNewResolver(resolver);
if (repairCommands == null)
{
repairCommands = new ArrayList<ReadCommand>();
repairResponseHandlers = new ArrayList<ReadCallback<ReadResponse, Row>>();
}
- repairCommands.add(command);
+ repairCommands.add(exec.command);
repairResponseHandlers.add(repairHandler);
- for (InetAddress endpoint : handler.endpoints)
- {
- MessageOut<ReadCommand> message = command.createMessage();
+ MessageOut<ReadCommand> message = exec.command.createMessage();
+ for (InetAddress endpoint : exec.handler.endpoints)
MessagingService.instance().sendRR(message, endpoint, repairHandler);
- }
}
}
@@ -1080,7 +1029,7 @@ public class StorageProxy implements StorageProxyMBean
}
}
- private static List<InetAddress> getLiveSortedEndpoints(Table table, ByteBuffer key)
+ public static List<InetAddress> getLiveSortedEndpoints(Table table, ByteBuffer key)
{
return getLiveSortedEndpoints(table, StorageService.instance.getPartitioner().decorateKey(key));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c25a6a14/src/java/org/apache/cassandra/utils/SimpleCondition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/SimpleCondition.java b/src/java/org/apache/cassandra/utils/SimpleCondition.java
index 4f00998..be7e654 100644
--- a/src/java/org/apache/cassandra/utils/SimpleCondition.java
+++ b/src/java/org/apache/cassandra/utils/SimpleCondition.java
@@ -26,7 +26,7 @@ import java.util.concurrent.locks.Condition;
// _after_ signal(), it will work as desired.)
public class SimpleCondition implements Condition
{
- boolean set;
+ private boolean set;
public synchronized void await() throws InterruptedException
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c25a6a14/src/resources/org/apache/cassandra/cli/CliHelp.yaml
----------------------------------------------------------------------
diff --git a/src/resources/org/apache/cassandra/cli/CliHelp.yaml b/src/resources/org/apache/cassandra/cli/CliHelp.yaml
index 069a6ce..d811663 100644
--- a/src/resources/org/apache/cassandra/cli/CliHelp.yaml
+++ b/src/resources/org/apache/cassandra/cli/CliHelp.yaml
@@ -792,6 +792,27 @@ commands:
- ROWS_ONLY
- NONE;
+ - speculative_retry: Speculative retry is used to speculate a read failure.
+
+ Speculative retry will execute additional read on a different nodes when
+ the read request doesn't complete within the x milliseconds.
+
+ Xpercentile will execute additional read requests to a different replicas
+ when read request was not completed within X percentile of the
+ normal/earlier recorded latencies.
+
+ Xms will execute additional read request to a diffrent replica when read
+ request was not completed in X milliseconds.
+
+ ALWAYS will execute data read request to 2 (If available) of the replicas
+ expecting a node to fail read.
+
+ Supported values are:
+ - ALWAYS
+ - Xpercentile
+ - Xms
+ - NONE;
+
- max_compaction_threshold: The maximum number of SSTables allowed before a
minor compaction is forced. Default is 32, setting to 0 disables minor
compactions.