You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by vi...@apache.org on 2012/02/10 19:40:20 UTC
git commit: add dclocal read repair to the DC-level patch by Vijay
and Sylvain Lebresne; reviewed by Sylvain Lebresne for CASSANDRA-2506
Updated Branches:
refs/heads/trunk 59673232d -> ced22f76b
add dclocal read repair to the DC-level
patch by Vijay and Sylvain Lebresne; reviewed by Sylvain Lebresne for
CASSANDRA-2506
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ced22f76
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ced22f76
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ced22f76
Branch: refs/heads/trunk
Commit: ced22f76b3a0495257c26d861ab3b85a94727d47
Parents: 5967323
Author: Vijay Parthasarathy <vi...@gmail.com>
Authored: Fri Feb 10 10:39:46 2012 -0800
Committer: Vijay Parthasarathy <vi...@gmail.com>
Committed: Fri Feb 10 10:39:46 2012 -0800
----------------------------------------------------------------------
interface/cassandra.thrift | 1 +
.../org/apache/cassandra/thrift/Cassandra.java | 6 +-
.../org/apache/cassandra/thrift/CfDef.java | 96 ++++++++++++++-
src/avro/internode.genavro | 1 +
src/java/org/apache/cassandra/cli/CliClient.java | 11 ++
.../org/apache/cassandra/config/CFMetaData.java | 23 ++++
.../apache/cassandra/cql/AlterTableStatement.java | 1 +
src/java/org/apache/cassandra/cql/CFPropDefs.java | 2 +
.../cassandra/cql/CreateColumnFamilyStatement.java | 1 +
src/java/org/apache/cassandra/cql3/CFPropDefs.java | 2 +
.../cql3/statements/AlterTableStatement.java | 1 +
.../statements/CreateColumnFamilyStatement.java | 1 +
.../cassandra/service/DatacenterReadCallback.java | 4 -
.../org/apache/cassandra/service/ReadCallback.java | 43 +++++--
.../org/apache/cassandra/cli/CliHelp.yaml | 14 ++
15 files changed, 187 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ced22f76/interface/cassandra.thrift
----------------------------------------------------------------------
diff --git a/interface/cassandra.thrift b/interface/cassandra.thrift
index 5fd2af6..a2275b6 100644
--- a/interface/cassandra.thrift
+++ b/interface/cassandra.thrift
@@ -417,6 +417,7 @@ struct CfDef {
34: optional string caching="keys_only",
35: optional list<binary> column_aliases,
36: optional binary value_alias,
+ 37: optional double dclocal_read_repair_chance = 0.0,
}
/* describes a keyspace. */
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ced22f76/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
----------------------------------------------------------------------
diff --git a/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java b/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
index 58f5ac5..3cfe58c 100644
--- a/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
+++ b/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
@@ -8526,6 +8526,8 @@ public class Cassandra {
private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
try {
+ // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+ __isset_bit_vector = new BitSet(1);
read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
} catch (org.apache.thrift.TException te) {
throw new java.io.IOException(te);
@@ -26538,8 +26540,6 @@ public class Cassandra {
private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
try {
- // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
- __isset_bit_vector = new BitSet(1);
read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
} catch (org.apache.thrift.TException te) {
throw new java.io.IOException(te);
@@ -34148,8 +34148,6 @@ public class Cassandra {
private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
try {
- // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
- __isset_bit_vector = new BitSet(1);
read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
} catch (org.apache.thrift.TException te) {
throw new java.io.IOException(te);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ced22f76/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java
----------------------------------------------------------------------
diff --git a/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java b/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java
index 8593794..0fe3a7d 100644
--- a/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java
+++ b/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java
@@ -69,6 +69,7 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
private static final org.apache.thrift.protocol.TField CACHING_FIELD_DESC = new org.apache.thrift.protocol.TField("caching", org.apache.thrift.protocol.TType.STRING, (short)34);
private static final org.apache.thrift.protocol.TField COLUMN_ALIASES_FIELD_DESC = new org.apache.thrift.protocol.TField("column_aliases", org.apache.thrift.protocol.TType.LIST, (short)35);
private static final org.apache.thrift.protocol.TField VALUE_ALIAS_FIELD_DESC = new org.apache.thrift.protocol.TField("value_alias", org.apache.thrift.protocol.TType.STRING, (short)36);
+ private static final org.apache.thrift.protocol.TField DCLOCAL_READ_REPAIR_CHANCE_FIELD_DESC = new org.apache.thrift.protocol.TField("dclocal_read_repair_chance", org.apache.thrift.protocol.TType.DOUBLE, (short)37);
public String keyspace; // required
public String name; // required
@@ -94,6 +95,7 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
public String caching; // required
public List<ByteBuffer> column_aliases; // required
public ByteBuffer value_alias; // required
+ public double dclocal_read_repair_chance; // required
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -120,7 +122,8 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
BLOOM_FILTER_FP_CHANCE((short)33, "bloom_filter_fp_chance"),
CACHING((short)34, "caching"),
COLUMN_ALIASES((short)35, "column_aliases"),
- VALUE_ALIAS((short)36, "value_alias");
+ VALUE_ALIAS((short)36, "value_alias"),
+ DCLOCAL_READ_REPAIR_CHANCE((short)37, "dclocal_read_repair_chance");
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
@@ -183,6 +186,8 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
return COLUMN_ALIASES;
case 36: // VALUE_ALIAS
return VALUE_ALIAS;
+ case 37: // DCLOCAL_READ_REPAIR_CHANCE
+ return DCLOCAL_READ_REPAIR_CHANCE;
default:
return null;
}
@@ -231,7 +236,8 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
private static final int __REPLICATE_ON_WRITE_ISSET_ID = 5;
private static final int __MERGE_SHARDS_CHANCE_ISSET_ID = 6;
private static final int __BLOOM_FILTER_FP_CHANCE_ISSET_ID = 7;
- private BitSet __isset_bit_vector = new BitSet(8);
+ private static final int __DCLOCAL_READ_REPAIR_CHANCE_ISSET_ID = 8;
+ private BitSet __isset_bit_vector = new BitSet(9);
public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
@@ -290,6 +296,8 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING , true))));
tmpMap.put(_Fields.VALUE_ALIAS, new org.apache.thrift.meta_data.FieldMetaData("value_alias", org.apache.thrift.TFieldRequirementType.OPTIONAL,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING , true)));
+ tmpMap.put(_Fields.DCLOCAL_READ_REPAIR_CHANCE, new org.apache.thrift.meta_data.FieldMetaData("dclocal_read_repair_chance", org.apache.thrift.TFieldRequirementType.OPTIONAL,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.DOUBLE)));
metaDataMap = Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(CfDef.class, metaDataMap);
}
@@ -303,6 +311,8 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
this.caching = "keys_only";
+ this.dclocal_read_repair_chance = 0;
+
}
public CfDef(
@@ -412,6 +422,7 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
this.value_alias = org.apache.thrift.TBaseHelper.copyBinary(other.value_alias);
;
}
+ this.dclocal_read_repair_chance = other.dclocal_read_repair_chance;
}
public CfDef deepCopy() {
@@ -455,6 +466,8 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
this.column_aliases = null;
this.value_alias = null;
+ this.dclocal_read_repair_chance = 0;
+
}
public String getKeyspace() {
@@ -1097,6 +1110,29 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
}
}
+ public double getDclocal_read_repair_chance() {
+ return this.dclocal_read_repair_chance;
+ }
+
+ public CfDef setDclocal_read_repair_chance(double dclocal_read_repair_chance) {
+ this.dclocal_read_repair_chance = dclocal_read_repair_chance;
+ setDclocal_read_repair_chanceIsSet(true);
+ return this;
+ }
+
+ public void unsetDclocal_read_repair_chance() {
+ __isset_bit_vector.clear(__DCLOCAL_READ_REPAIR_CHANCE_ISSET_ID);
+ }
+
+ /** Returns true if field dclocal_read_repair_chance is set (has been assigned a value) and false otherwise */
+ public boolean isSetDclocal_read_repair_chance() {
+ return __isset_bit_vector.get(__DCLOCAL_READ_REPAIR_CHANCE_ISSET_ID);
+ }
+
+ public void setDclocal_read_repair_chanceIsSet(boolean value) {
+ __isset_bit_vector.set(__DCLOCAL_READ_REPAIR_CHANCE_ISSET_ID, value);
+ }
+
public void setFieldValue(_Fields field, Object value) {
switch (field) {
case KEYSPACE:
@@ -1291,6 +1327,14 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
}
break;
+ case DCLOCAL_READ_REPAIR_CHANCE:
+ if (value == null) {
+ unsetDclocal_read_repair_chance();
+ } else {
+ setDclocal_read_repair_chance((Double)value);
+ }
+ break;
+
}
}
@@ -1368,6 +1412,9 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
case VALUE_ALIAS:
return getValue_alias();
+ case DCLOCAL_READ_REPAIR_CHANCE:
+ return Double.valueOf(getDclocal_read_repair_chance());
+
}
throw new IllegalStateException();
}
@@ -1427,6 +1474,8 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
return isSetColumn_aliases();
case VALUE_ALIAS:
return isSetValue_alias();
+ case DCLOCAL_READ_REPAIR_CHANCE:
+ return isSetDclocal_read_repair_chance();
}
throw new IllegalStateException();
}
@@ -1660,6 +1709,15 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
return false;
}
+ boolean this_present_dclocal_read_repair_chance = true && this.isSetDclocal_read_repair_chance();
+ boolean that_present_dclocal_read_repair_chance = true && that.isSetDclocal_read_repair_chance();
+ if (this_present_dclocal_read_repair_chance || that_present_dclocal_read_repair_chance) {
+ if (!(this_present_dclocal_read_repair_chance && that_present_dclocal_read_repair_chance))
+ return false;
+ if (this.dclocal_read_repair_chance != that.dclocal_read_repair_chance)
+ return false;
+ }
+
return true;
}
@@ -1787,6 +1845,11 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
if (present_value_alias)
builder.append(value_alias);
+ boolean present_dclocal_read_repair_chance = true && (isSetDclocal_read_repair_chance());
+ builder.append(present_dclocal_read_repair_chance);
+ if (present_dclocal_read_repair_chance)
+ builder.append(dclocal_read_repair_chance);
+
return builder.toHashCode();
}
@@ -2038,6 +2101,16 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
return lastComparison;
}
}
+ lastComparison = Boolean.valueOf(isSetDclocal_read_repair_chance()).compareTo(typedOther.isSetDclocal_read_repair_chance());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetDclocal_read_repair_chance()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.dclocal_read_repair_chance, typedOther.dclocal_read_repair_chance);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
return 0;
}
@@ -2276,6 +2349,14 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
}
break;
+ case 37: // DCLOCAL_READ_REPAIR_CHANCE
+ if (field.type == org.apache.thrift.protocol.TType.DOUBLE) {
+ this.dclocal_read_repair_chance = iprot.readDouble();
+ setDclocal_read_repair_chanceIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
}
@@ -2469,6 +2550,11 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
oprot.writeFieldEnd();
}
}
+ if (isSetDclocal_read_repair_chance()) {
+ oprot.writeFieldBegin(DCLOCAL_READ_REPAIR_CHANCE_FIELD_DESC);
+ oprot.writeDouble(this.dclocal_read_repair_chance);
+ oprot.writeFieldEnd();
+ }
oprot.writeFieldStop();
oprot.writeStructEnd();
}
@@ -2681,6 +2767,12 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
}
first = false;
}
+ if (isSetDclocal_read_repair_chance()) {
+ if (!first) sb.append(", ");
+ sb.append("dclocal_read_repair_chance:");
+ sb.append(this.dclocal_read_repair_chance);
+ first = false;
+ }
sb.append(")");
return sb.toString();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ced22f76/src/avro/internode.genavro
----------------------------------------------------------------------
diff --git a/src/avro/internode.genavro b/src/avro/internode.genavro
index 36c2cba..d060d6e 100644
--- a/src/avro/internode.genavro
+++ b/src/avro/internode.genavro
@@ -69,6 +69,7 @@ protocol InterNode {
union { null, string } caching = null;
union { null, array<bytes> } column_aliases = null;
union { null, bytes } value_alias = null;
+ union { double, null } dclocal_read_repair_chance = 0.0;
}
@aliases(["org.apache.cassandra.config.avro.KsDef"])
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ced22f76/src/java/org/apache/cassandra/cli/CliClient.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cli/CliClient.java b/src/java/org/apache/cassandra/cli/CliClient.java
index c132a77..05409e5 100644
--- a/src/java/org/apache/cassandra/cli/CliClient.java
+++ b/src/java/org/apache/cassandra/cli/CliClient.java
@@ -124,6 +124,7 @@ public class CliClient
KEYS_CACHED,
KEY_CACHE_SAVE_PERIOD,
READ_REPAIR_CHANCE,
+ DCLOCAL_READ_REPAIR_CHANCE,
GC_GRACE,
COLUMN_METADATA,
MEMTABLE_OPERATIONS,
@@ -1196,6 +1197,14 @@ public class CliClient
cfDef.setRead_repair_chance(chance);
break;
+ case DCLOCAL_READ_REPAIR_CHANCE:
+ double localChance = Double.parseDouble(mValue);
+
+ if (localChance < 0 || localChance > 1)
+ throw new RuntimeException("Error: dclocal_read_repair_chance must be between 0 and 1.");
+
+ cfDef.setDclocal_read_repair_chance(localChance);
+ break;
case GC_GRACE:
cfDef.setGc_grace_seconds(Integer.parseInt(mValue));
break;
@@ -1622,6 +1631,7 @@ public class CliClient
normaliseType(cfDef.key_validation_class, "org.apache.cassandra.db.marshal"));
writeAttr(output, false, "read_repair_chance", cfDef.read_repair_chance);
+ writeAttr(output, false, "dclocal_read_repair_chance", cfDef.dclocal_read_repair_chance);
writeAttr(output, false, "gc_grace", cfDef.gc_grace_seconds);
writeAttr(output, false, "min_compaction_threshold", cfDef.min_compaction_threshold);
writeAttr(output, false, "max_compaction_threshold", cfDef.max_compaction_threshold);
@@ -1975,6 +1985,7 @@ public class CliClient
sessionState.out.printf(" GC grace seconds: %s%n", cf_def.gc_grace_seconds);
sessionState.out.printf(" Compaction min/max thresholds: %s/%s%n", cf_def.min_compaction_threshold, cf_def.max_compaction_threshold);
sessionState.out.printf(" Read repair chance: %s%n", cf_def.read_repair_chance);
+ sessionState.out.printf(" DC Local Read repair chance: %s%n", cf_def.dclocal_read_repair_chance);
sessionState.out.printf(" Replicate on write: %s%n", cf_def.replicate_on_write);
sessionState.out.printf(" Caching: %s%n", cf_def.caching);
sessionState.out.printf(" Bloom Filter FP chance: %s%n", cf_def.isSetBloom_filter_fp_chance() ? cf_def.bloom_filter_fp_chance : "default");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ced22f76/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index d879a2c..defa6cf 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -63,6 +63,7 @@ public final class CFMetaData
private static Logger logger = LoggerFactory.getLogger(CFMetaData.class);
public final static double DEFAULT_READ_REPAIR_CHANCE = 0.1;
+ public final static double DEFAULT_DCLOCAL_READ_REPAIR_CHANCE = 0.0;
public final static boolean DEFAULT_REPLICATE_ON_WRITE = true;
public final static int DEFAULT_GC_GRACE_SECONDS = 864000;
public final static int DEFAULT_MIN_COMPACTION_THRESHOLD = 4;
@@ -148,6 +149,7 @@ public final class CFMetaData
//OPTIONAL
private String comment; // default none, for humans only
private double readRepairChance; // default 1.0 (always), chance [0.0,1.0] of read repair
+ private double dcLocalReadRepairChance; // default 0.0
private boolean replicateOnWrite; // default false
private int gcGraceSeconds; // default 864000 (ten days)
private AbstractType<?> defaultValidator; // default BytesType (no-op), use comparator types
@@ -176,6 +178,7 @@ public final class CFMetaData
public CFMetaData comment(String prop) { comment = enforceCommentNotNull(prop); return this;}
public CFMetaData readRepairChance(double prop) {readRepairChance = prop; return this;}
+ public CFMetaData dclocalReadRepairChance(double prop) {dcLocalReadRepairChance = prop; return this;}
public CFMetaData replicateOnWrite(boolean prop) {replicateOnWrite = prop; return this;}
public CFMetaData gcGraceSeconds(int prop) {gcGraceSeconds = prop; return this;}
public CFMetaData defaultValidator(AbstractType<?> prop) {defaultValidator = prop; updateCfDef(); return this;}
@@ -229,6 +232,7 @@ public final class CFMetaData
{
// Set a bunch of defaults
readRepairChance = DEFAULT_READ_REPAIR_CHANCE;
+ dcLocalReadRepairChance = DEFAULT_DCLOCAL_READ_REPAIR_CHANCE;
replicateOnWrite = DEFAULT_REPLICATE_ON_WRITE;
gcGraceSeconds = DEFAULT_GC_GRACE_SECONDS;
minCompactionThreshold = DEFAULT_MIN_COMPACTION_THRESHOLD;
@@ -264,6 +268,7 @@ public final class CFMetaData
return newCFMD.comment(comment)
.readRepairChance(0)
+ .dclocalReadRepairChance(0)
.gcGraceSeconds(0)
.mergeShardsChance(0.0);
}
@@ -273,6 +278,7 @@ public final class CFMetaData
return new CFMetaData(parent.ksName, parent.indexColumnFamilyName(info), ColumnFamilyType.Standard, columnComparator, null)
.keyValidator(info.getValidator())
.readRepairChance(0.0)
+ .dclocalReadRepairChance(0.0)
.gcGraceSeconds(parent.gcGraceSeconds)
.minCompactionThreshold(parent.minCompactionThreshold)
.maxCompactionThreshold(parent.maxCompactionThreshold);
@@ -294,6 +300,7 @@ public final class CFMetaData
{
return newCFMD.comment(oldCFMD.comment)
.readRepairChance(oldCFMD.readRepairChance)
+ .dclocalReadRepairChance(oldCFMD.dcLocalReadRepairChance)
.replicateOnWrite(oldCFMD.replicateOnWrite)
.gcGraceSeconds(oldCFMD.gcGraceSeconds)
.defaultValidator(oldCFMD.defaultValidator)
@@ -407,6 +414,7 @@ public final class CFMetaData
return newCFMD.comment(cf.comment.toString())
.readRepairChance(cf.read_repair_chance)
+ .dclocalReadRepairChance(cf.dclocal_read_repair_chance)
.replicateOnWrite(cf.replicate_on_write)
.gcGraceSeconds(cf.gc_grace_seconds)
.defaultValidator(validator)
@@ -439,6 +447,11 @@ public final class CFMetaData
return readRepairChance;
}
+ public double getDcLocalReadRepair()
+ {
+ return dcLocalReadRepairChance;
+ }
+
public double getMergeShardsChance()
{
return mergeShardsChance;
@@ -539,6 +552,7 @@ public final class CFMetaData
.append(subcolumnComparator, rhs.subcolumnComparator)
.append(comment, rhs.comment)
.append(readRepairChance, rhs.readRepairChance)
+ .append(dcLocalReadRepairChance, rhs.dcLocalReadRepairChance)
.append(replicateOnWrite, rhs.replicateOnWrite)
.append(gcGraceSeconds, rhs.gcGraceSeconds)
.append(defaultValidator, rhs.defaultValidator)
@@ -569,6 +583,7 @@ public final class CFMetaData
.append(subcolumnComparator)
.append(comment)
.append(readRepairChance)
+ .append(dcLocalReadRepairChance)
.append(replicateOnWrite)
.append(gcGraceSeconds)
.append(defaultValidator)
@@ -625,6 +640,8 @@ public final class CFMetaData
put(CompressionParameters.SSTABLE_COMPRESSION, SnappyCompressor.class.getCanonicalName());
}});
}
+ if (!cf_def.isSetDclocal_read_repair_chance())
+ cf_def.setDclocal_read_repair_chance(CFMetaData.DEFAULT_DCLOCAL_READ_REPAIR_CHANCE);
}
public static CFMetaData fromThrift(org.apache.cassandra.thrift.CfDef cf_def) throws InvalidRequestException, ConfigurationException
@@ -660,6 +677,8 @@ public final class CFMetaData
newCFMD.bloomFilterFpChance(cf_def.bloom_filter_fp_chance);
if (cf_def.isSetCaching())
newCFMD.caching(Caching.fromString(cf_def.caching));
+ if (cf_def.isSetDclocal_read_repair_chance())
+ newCFMD.dclocalReadRepairChance(cf_def.dclocal_read_repair_chance);
CompressionParameters cp = CompressionParameters.create(cf_def.compression_options);
@@ -740,6 +759,8 @@ public final class CFMetaData
comment = enforceCommentNotNull(cf_def.comment);
readRepairChance = cf_def.read_repair_chance;
+ if (cf_def.isSetDclocal_read_repair_chance())
+ dcLocalReadRepairChance = cf_def.dclocal_read_repair_chance;
replicateOnWrite = cf_def.replicate_on_write;
gcGraceSeconds = cf_def.gc_grace_seconds;
defaultValidator = TypeParser.parse(cf_def.default_validation_class);
@@ -870,6 +891,7 @@ public final class CFMetaData
}
def.setComment(enforceCommentNotNull(comment));
def.setRead_repair_chance(readRepairChance);
+ def.setDclocal_read_repair_chance(dcLocalReadRepairChance);
def.setReplicate_on_write(replicateOnWrite);
def.setGc_grace_seconds(gcGraceSeconds);
def.setDefault_validation_class(defaultValidator == null ? null : defaultValidator.toString());
@@ -1220,6 +1242,7 @@ public final class CFMetaData
.append("subcolumncomparator", subcolumnComparator)
.append("comment", comment)
.append("readRepairChance", readRepairChance)
+ .append("dclocalReadRepairChance", dcLocalReadRepairChance)
.append("replicateOnWrite", replicateOnWrite)
.append("gcGraceSeconds", gcGraceSeconds)
.append("defaultValidator", defaultValidator)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ced22f76/src/java/org/apache/cassandra/cql/AlterTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/AlterTableStatement.java b/src/java/org/apache/cassandra/cql/AlterTableStatement.java
index 5ca92bb..0557397 100644
--- a/src/java/org/apache/cassandra/cql/AlterTableStatement.java
+++ b/src/java/org/apache/cassandra/cql/AlterTableStatement.java
@@ -182,6 +182,7 @@ public class AlterTableStatement
}
cfDef.read_repair_chance = cfProps.getPropertyDouble(CFPropDefs.KW_READREPAIRCHANCE, cfDef.read_repair_chance);
+ cfDef.dclocal_read_repair_chance = cfProps.getPropertyDouble(CFPropDefs.KW_DCLOCALREADREPAIRCHANCE, cfDef.dclocal_read_repair_chance);
cfDef.gc_grace_seconds = cfProps.getPropertyInt(CFPropDefs.KW_GCGRACESECONDS, cfDef.gc_grace_seconds);
cfDef.replicate_on_write = cfProps.getPropertyBoolean(CFPropDefs.KW_REPLICATEONWRITE, cfDef.replicate_on_write);
cfDef.min_compaction_threshold = cfProps.getPropertyInt(CFPropDefs.KW_MINCOMPACTIONTHRESHOLD, cfDef.min_compaction_threshold);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ced22f76/src/java/org/apache/cassandra/cql/CFPropDefs.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/CFPropDefs.java b/src/java/org/apache/cassandra/cql/CFPropDefs.java
index 896915a..8bcbcf2 100644
--- a/src/java/org/apache/cassandra/cql/CFPropDefs.java
+++ b/src/java/org/apache/cassandra/cql/CFPropDefs.java
@@ -44,6 +44,7 @@ public class CFPropDefs {
public static final String KW_COMPARATOR = "comparator";
public static final String KW_COMMENT = "comment";
public static final String KW_READREPAIRCHANCE = "read_repair_chance";
+ public static final String KW_DCLOCALREADREPAIRCHANCE = "dclocal_read_repair_chance";
public static final String KW_GCGRACESECONDS = "gc_grace_seconds";
public static final String KW_DEFAULTVALIDATION = "default_validation";
public static final String KW_MINCOMPACTIONTHRESHOLD = "min_compaction_threshold";
@@ -81,6 +82,7 @@ public class CFPropDefs {
keywords.add(KW_COMPARATOR);
keywords.add(KW_COMMENT);
keywords.add(KW_READREPAIRCHANCE);
+ keywords.add(KW_DCLOCALREADREPAIRCHANCE);
keywords.add(KW_GCGRACESECONDS);
keywords.add(KW_DEFAULTVALIDATION);
keywords.add(KW_MINCOMPACTIONTHRESHOLD);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ced22f76/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java b/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
index 960cc9d..062cd90 100644
--- a/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
+++ b/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
@@ -177,6 +177,7 @@ public class CreateColumnFamilyStatement
newCFMD.comment(cfProps.getProperty(CFPropDefs.KW_COMMENT))
.readRepairChance(getPropertyDouble(CFPropDefs.KW_READREPAIRCHANCE, CFMetaData.DEFAULT_READ_REPAIR_CHANCE))
+ .dclocalReadRepairChance(getPropertyDouble(CFPropDefs.KW_DCLOCALREADREPAIRCHANCE, CFMetaData.DEFAULT_DCLOCAL_READ_REPAIR_CHANCE))
.replicateOnWrite(getPropertyBoolean(CFPropDefs.KW_REPLICATEONWRITE, CFMetaData.DEFAULT_REPLICATE_ON_WRITE))
.gcGraceSeconds(getPropertyInt(CFPropDefs.KW_GCGRACESECONDS, CFMetaData.DEFAULT_GC_GRACE_SECONDS))
.defaultValidator(cfProps.getValidator())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ced22f76/src/java/org/apache/cassandra/cql3/CFPropDefs.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/CFPropDefs.java b/src/java/org/apache/cassandra/cql3/CFPropDefs.java
index 63f74b0..7ac2167 100644
--- a/src/java/org/apache/cassandra/cql3/CFPropDefs.java
+++ b/src/java/org/apache/cassandra/cql3/CFPropDefs.java
@@ -40,6 +40,7 @@ public class CFPropDefs
public static final String KW_COMMENT = "comment";
public static final String KW_READREPAIRCHANCE = "read_repair_chance";
+ public static final String KW_DCLOCALREADREPAIRCHANCE = "dclocal_read_repair_chance";
public static final String KW_GCGRACESECONDS = "gc_grace_seconds";
public static final String KW_MINCOMPACTIONTHRESHOLD = "min_compaction_threshold";
public static final String KW_MAXCOMPACTIONTHRESHOLD = "max_compaction_threshold";
@@ -75,6 +76,7 @@ public class CFPropDefs
keywords.add(KW_COMMENT);
keywords.add(KW_READREPAIRCHANCE);
+ keywords.add(KW_DCLOCALREADREPAIRCHANCE);
keywords.add(KW_GCGRACESECONDS);
keywords.add(KW_MINCOMPACTIONTHRESHOLD);
keywords.add(KW_MAXCOMPACTIONTHRESHOLD);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ced22f76/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
index 4114773..368eb6d 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
@@ -154,6 +154,7 @@ public class AlterTableStatement extends SchemaAlteringStatement
}
cfDef.read_repair_chance = cfProps.getDouble(CFPropDefs.KW_READREPAIRCHANCE, cfDef.read_repair_chance);
+ cfDef.dclocal_read_repair_chance = cfProps.getDouble(CFPropDefs.KW_DCLOCALREADREPAIRCHANCE, cfDef.dclocal_read_repair_chance);
cfDef.gc_grace_seconds = cfProps.getInt(CFPropDefs.KW_GCGRACESECONDS, cfDef.gc_grace_seconds);
cfDef.replicate_on_write = cfProps.getBoolean(CFPropDefs.KW_REPLICATEONWRITE, cfDef.replicate_on_write);
cfDef.min_compaction_threshold = cfProps.getInt(CFPropDefs.KW_MINCOMPACTIONTHRESHOLD, cfDef.min_compaction_threshold);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ced22f76/src/java/org/apache/cassandra/cql3/statements/CreateColumnFamilyStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateColumnFamilyStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateColumnFamilyStatement.java
index 44d187c..767437e 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateColumnFamilyStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateColumnFamilyStatement.java
@@ -107,6 +107,7 @@ public class CreateColumnFamilyStatement extends SchemaAlteringStatement
newCFMD.comment(properties.get(CFPropDefs.KW_COMMENT))
.readRepairChance(properties.getDouble(CFPropDefs.KW_READREPAIRCHANCE, CFMetaData.DEFAULT_READ_REPAIR_CHANCE))
+ .dclocalReadRepairChance(properties.getDouble(CFPropDefs.KW_DCLOCALREADREPAIRCHANCE, CFMetaData.DEFAULT_DCLOCAL_READ_REPAIR_CHANCE))
.replicateOnWrite(properties.getBoolean(CFPropDefs.KW_REPLICATEONWRITE, CFMetaData.DEFAULT_REPLICATE_ON_WRITE))
.gcGraceSeconds(properties.getInt(CFPropDefs.KW_GCGRACESECONDS, CFMetaData.DEFAULT_GC_GRACE_SECONDS))
.defaultValidator(defaultValidator)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ced22f76/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DatacenterReadCallback.java b/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
index eaca5ef..00540ca 100644
--- a/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
+++ b/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
@@ -22,7 +22,6 @@ package org.apache.cassandra.service;
import java.net.InetAddress;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
@@ -35,15 +34,12 @@ import org.apache.cassandra.locator.NetworkTopologyStrategy;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.UnavailableException;
-import org.apache.cassandra.utils.FBUtilities;
/**
* Datacenter Quorum response handler blocks for a quorum of responses from the local DC
*/
public class DatacenterReadCallback<T> extends ReadCallback<T>
{
- private static final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
- private static final String localdc = snitch.getDatacenter(FBUtilities.getBroadcastAddress());
private static final Comparator<InetAddress> localComparator = new Comparator<InetAddress>()
{
public int compare(InetAddress endpoint1, InetAddress endpoint2)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ced22f76/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index b02c5cd..3f30a2d 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -37,6 +37,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.ReadResponse;
import org.apache.cassandra.db.Table;
+import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.net.IAsyncCallback;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
@@ -46,10 +47,15 @@ import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.SimpleCondition;
import org.apache.cassandra.utils.WrappedRunnable;
+import com.google.common.collect.Lists;
+
public class ReadCallback<T> implements IAsyncCallback
{
protected static final Logger logger = LoggerFactory.getLogger( ReadCallback.class );
+ protected static final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+ protected static final String localdc = snitch.getDatacenter(FBUtilities.getBroadcastAddress());
+
public final IResponseResolver<T> resolver;
protected final SimpleCondition condition = new SimpleCondition();
private final long startTime;
@@ -67,15 +73,10 @@ public class ReadCallback<T> implements IAsyncCallback
this.blockfor = determineBlockFor(consistencyLevel, command.getKeyspace());
this.resolver = resolver;
this.startTime = System.currentTimeMillis();
- boolean repair = randomlyReadRepair();
sortForConsistencyLevel(endpoints);
- this.endpoints = repair || resolver instanceof RowRepairResolver
- ? endpoints
- : endpoints.subList(0, Math.min(endpoints.size(), blockfor));
-
+ this.endpoints = resolver instanceof RowRepairResolver ? endpoints : filterEndpoints(endpoints);
if (logger.isDebugEnabled())
- logger.debug(String.format("Blockfor/repair is %s/%s; setting up requests to %s",
- blockfor, repair, StringUtils.join(this.endpoints, ",")));
+ logger.debug(String.format("Blockfor is %s; setting up requests to %s", blockfor, StringUtils.join(this.endpoints, ",")));
}
/**
@@ -89,7 +90,7 @@ public class ReadCallback<T> implements IAsyncCallback
// no-op except in DRC
}
- private boolean randomlyReadRepair()
+ private List<InetAddress> filterEndpoints(List<InetAddress> ep)
{
if (resolver instanceof RowDigestResolver)
{
@@ -97,10 +98,32 @@ public class ReadCallback<T> implements IAsyncCallback
String table = ((RowDigestResolver) resolver).table;
String columnFamily = ((ReadCommand) command).getColumnFamilyName();
CFMetaData cfmd = Schema.instance.getTableMetaData(table).get(columnFamily);
- return cfmd.getReadRepairChance() > FBUtilities.threadLocalRandom().nextDouble();
+ double chance = FBUtilities.threadLocalRandom().nextDouble();
+
+ // if global repair then just return all the ep's
+ if (cfmd.getReadRepairChance() > chance)
+ return ep;
+
+ // if local repair then just return localDC ep's
+ if (cfmd.getDcLocalReadRepair() > chance)
+ {
+ List<InetAddress> local = Lists.newArrayList();
+ List<InetAddress> other = Lists.newArrayList();
+ for (InetAddress add : ep)
+ {
+ if (snitch.getDatacenter(add).equals(localdc))
+ local.add(add);
+ else
+ other.add(add);
+ }
+ // check if blockfor more than we have localep's
+ if (local.size() < blockfor)
+ local.addAll(other.subList(0, Math.min(blockfor - local.size(), other.size())));
+ return local;
+ }
}
// we don't read repair on range scans
- return false;
+ return ep.subList(0, Math.min(ep.size(), blockfor));
}
public T get() throws TimeoutException, DigestMismatchException, IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ced22f76/src/resources/org/apache/cassandra/cli/CliHelp.yaml
----------------------------------------------------------------------
diff --git a/src/resources/org/apache/cassandra/cli/CliHelp.yaml b/src/resources/org/apache/cassandra/cli/CliHelp.yaml
index 873cc41..326d051 100644
--- a/src/resources/org/apache/cassandra/cli/CliHelp.yaml
+++ b/src/resources/org/apache/cassandra/cli/CliHelp.yaml
@@ -460,6 +460,20 @@ commands:
will not have any latency information from all the replicas to recognize
when one is performing worse than usual.
+ - dclocal_read_repair_chance: Probability (0.0-1.0) with which to
+ perform read repairs against the node from the local data-center. If
+ this is lower than read_repair_chance, this will be ignored.
+
+ Example:
+ update column family Standard2
+ with read_repair_chance=0.1
+ and dclocal_read_repair_chance=0.5;
+
+ For 10 read queries, 1 will do read repair on all replicas (and
+ thus in particular on all replica of the local DC), 4 will only do
+ read repair on replica of the local DC and 5 will not do any read
+ repair.
+
- subcomparator: Validator to use to validate and compare sub column names
in this column family. Only applied to Super column families. Default is
BytesType, which is a straight forward lexical comparison of the bytes in