You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by vi...@apache.org on 2012/02/10 19:51:20 UTC

git commit: dc local read repair setting down to the DC-level patch by Vijay and Sylvain Lebresne; reviewed by Sylvain Lebresne for CASSANDRA-2506

Updated Branches:
  refs/heads/cassandra-1.1 12d26f869 -> 4e6a4c016


dc local read repair setting down to the DC-level
patch by Vijay and Sylvain Lebresne; reviewed by Sylvain Lebresne for
CASSANDRA-2506

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4e6a4c01
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4e6a4c01
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4e6a4c01

Branch: refs/heads/cassandra-1.1
Commit: 4e6a4c016a9efe60c053318b4da81e6a90b9588b
Parents: 12d26f8
Author: Vijay Parthasarathy <vi...@gmail.com>
Authored: Fri Feb 10 10:51:04 2012 -0800
Committer: Vijay Parthasarathy <vi...@gmail.com>
Committed: Fri Feb 10 10:51:04 2012 -0800

----------------------------------------------------------------------
 interface/cassandra.thrift                         |    1 +
 .../org/apache/cassandra/thrift/Cassandra.java     |    6 +-
 .../org/apache/cassandra/thrift/CfDef.java         |   96 ++++++++++++++-
 src/avro/internode.genavro                         |    1 +
 src/java/org/apache/cassandra/cli/CliClient.java   |   11 ++
 .../org/apache/cassandra/config/CFMetaData.java    |   23 ++++
 .../apache/cassandra/cql/AlterTableStatement.java  |    1 +
 src/java/org/apache/cassandra/cql/CFPropDefs.java  |    2 +
 .../cassandra/cql/CreateColumnFamilyStatement.java |    1 +
 src/java/org/apache/cassandra/cql3/CFPropDefs.java |    2 +
 .../cql3/statements/AlterTableStatement.java       |    1 +
 .../statements/CreateColumnFamilyStatement.java    |    1 +
 .../cassandra/service/DatacenterReadCallback.java  |    4 -
 .../org/apache/cassandra/service/ReadCallback.java |   43 +++++--
 .../org/apache/cassandra/cli/CliHelp.yaml          |   14 ++
 15 files changed, 187 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e6a4c01/interface/cassandra.thrift
----------------------------------------------------------------------
diff --git a/interface/cassandra.thrift b/interface/cassandra.thrift
index 5fd2af6..a2275b6 100644
--- a/interface/cassandra.thrift
+++ b/interface/cassandra.thrift
@@ -417,6 +417,7 @@ struct CfDef {
     34: optional string caching="keys_only",
     35: optional list<binary> column_aliases,
     36: optional binary value_alias,
+    37: optional double dclocal_read_repair_chance = 0.0,
 }
 
 /* describes a keyspace. */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e6a4c01/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
----------------------------------------------------------------------
diff --git a/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java b/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
index 58f5ac5..3cfe58c 100644
--- a/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
+++ b/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
@@ -8526,6 +8526,8 @@ public class Cassandra {
 
     private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
       try {
+        // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+        __isset_bit_vector = new BitSet(1);
         read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
       } catch (org.apache.thrift.TException te) {
         throw new java.io.IOException(te);
@@ -26538,8 +26540,6 @@ public class Cassandra {
 
     private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
       try {
-        // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
-        __isset_bit_vector = new BitSet(1);
         read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
       } catch (org.apache.thrift.TException te) {
         throw new java.io.IOException(te);
@@ -34148,8 +34148,6 @@ public class Cassandra {
 
     private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
       try {
-        // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
-        __isset_bit_vector = new BitSet(1);
         read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
       } catch (org.apache.thrift.TException te) {
         throw new java.io.IOException(te);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e6a4c01/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java
----------------------------------------------------------------------
diff --git a/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java b/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java
index 8593794..0fe3a7d 100644
--- a/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java
+++ b/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java
@@ -69,6 +69,7 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
   private static final org.apache.thrift.protocol.TField CACHING_FIELD_DESC = new org.apache.thrift.protocol.TField("caching", org.apache.thrift.protocol.TType.STRING, (short)34);
   private static final org.apache.thrift.protocol.TField COLUMN_ALIASES_FIELD_DESC = new org.apache.thrift.protocol.TField("column_aliases", org.apache.thrift.protocol.TType.LIST, (short)35);
   private static final org.apache.thrift.protocol.TField VALUE_ALIAS_FIELD_DESC = new org.apache.thrift.protocol.TField("value_alias", org.apache.thrift.protocol.TType.STRING, (short)36);
+  private static final org.apache.thrift.protocol.TField DCLOCAL_READ_REPAIR_CHANCE_FIELD_DESC = new org.apache.thrift.protocol.TField("dclocal_read_repair_chance", org.apache.thrift.protocol.TType.DOUBLE, (short)37);
 
   public String keyspace; // required
   public String name; // required
@@ -94,6 +95,7 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
   public String caching; // required
   public List<ByteBuffer> column_aliases; // required
   public ByteBuffer value_alias; // required
+  public double dclocal_read_repair_chance; // required
 
   /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
   public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -120,7 +122,8 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
     BLOOM_FILTER_FP_CHANCE((short)33, "bloom_filter_fp_chance"),
     CACHING((short)34, "caching"),
     COLUMN_ALIASES((short)35, "column_aliases"),
-    VALUE_ALIAS((short)36, "value_alias");
+    VALUE_ALIAS((short)36, "value_alias"),
+    DCLOCAL_READ_REPAIR_CHANCE((short)37, "dclocal_read_repair_chance");
 
     private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
 
@@ -183,6 +186,8 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
           return COLUMN_ALIASES;
         case 36: // VALUE_ALIAS
           return VALUE_ALIAS;
+        case 37: // DCLOCAL_READ_REPAIR_CHANCE
+          return DCLOCAL_READ_REPAIR_CHANCE;
         default:
           return null;
       }
@@ -231,7 +236,8 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
   private static final int __REPLICATE_ON_WRITE_ISSET_ID = 5;
   private static final int __MERGE_SHARDS_CHANCE_ISSET_ID = 6;
   private static final int __BLOOM_FILTER_FP_CHANCE_ISSET_ID = 7;
-  private BitSet __isset_bit_vector = new BitSet(8);
+  private static final int __DCLOCAL_READ_REPAIR_CHANCE_ISSET_ID = 8;
+  private BitSet __isset_bit_vector = new BitSet(9);
 
   public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
   static {
@@ -290,6 +296,8 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
             new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING            , true))));
     tmpMap.put(_Fields.VALUE_ALIAS, new org.apache.thrift.meta_data.FieldMetaData("value_alias", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
         new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING        , true)));
+    tmpMap.put(_Fields.DCLOCAL_READ_REPAIR_CHANCE, new org.apache.thrift.meta_data.FieldMetaData("dclocal_read_repair_chance", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.DOUBLE)));
     metaDataMap = Collections.unmodifiableMap(tmpMap);
     org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(CfDef.class, metaDataMap);
   }
@@ -303,6 +311,8 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
 
     this.caching = "keys_only";
 
+    this.dclocal_read_repair_chance = 0;
+
   }
 
   public CfDef(
@@ -412,6 +422,7 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
       this.value_alias = org.apache.thrift.TBaseHelper.copyBinary(other.value_alias);
 ;
     }
+    this.dclocal_read_repair_chance = other.dclocal_read_repair_chance;
   }
 
   public CfDef deepCopy() {
@@ -455,6 +466,8 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
 
     this.column_aliases = null;
     this.value_alias = null;
+    this.dclocal_read_repair_chance = 0;
+
   }
 
   public String getKeyspace() {
@@ -1097,6 +1110,29 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
     }
   }
 
+  public double getDclocal_read_repair_chance() {
+    return this.dclocal_read_repair_chance;
+  }
+
+  public CfDef setDclocal_read_repair_chance(double dclocal_read_repair_chance) {
+    this.dclocal_read_repair_chance = dclocal_read_repair_chance;
+    setDclocal_read_repair_chanceIsSet(true);
+    return this;
+  }
+
+  public void unsetDclocal_read_repair_chance() {
+    __isset_bit_vector.clear(__DCLOCAL_READ_REPAIR_CHANCE_ISSET_ID);
+  }
+
+  /** Returns true if field dclocal_read_repair_chance is set (has been assigned a value) and false otherwise */
+  public boolean isSetDclocal_read_repair_chance() {
+    return __isset_bit_vector.get(__DCLOCAL_READ_REPAIR_CHANCE_ISSET_ID);
+  }
+
+  public void setDclocal_read_repair_chanceIsSet(boolean value) {
+    __isset_bit_vector.set(__DCLOCAL_READ_REPAIR_CHANCE_ISSET_ID, value);
+  }
+
   public void setFieldValue(_Fields field, Object value) {
     switch (field) {
     case KEYSPACE:
@@ -1291,6 +1327,14 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
       }
       break;
 
+    case DCLOCAL_READ_REPAIR_CHANCE:
+      if (value == null) {
+        unsetDclocal_read_repair_chance();
+      } else {
+        setDclocal_read_repair_chance((Double)value);
+      }
+      break;
+
     }
   }
 
@@ -1368,6 +1412,9 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
     case VALUE_ALIAS:
       return getValue_alias();
 
+    case DCLOCAL_READ_REPAIR_CHANCE:
+      return Double.valueOf(getDclocal_read_repair_chance());
+
     }
     throw new IllegalStateException();
   }
@@ -1427,6 +1474,8 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
       return isSetColumn_aliases();
     case VALUE_ALIAS:
       return isSetValue_alias();
+    case DCLOCAL_READ_REPAIR_CHANCE:
+      return isSetDclocal_read_repair_chance();
     }
     throw new IllegalStateException();
   }
@@ -1660,6 +1709,15 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
         return false;
     }
 
+    boolean this_present_dclocal_read_repair_chance = true && this.isSetDclocal_read_repair_chance();
+    boolean that_present_dclocal_read_repair_chance = true && that.isSetDclocal_read_repair_chance();
+    if (this_present_dclocal_read_repair_chance || that_present_dclocal_read_repair_chance) {
+      if (!(this_present_dclocal_read_repair_chance && that_present_dclocal_read_repair_chance))
+        return false;
+      if (this.dclocal_read_repair_chance != that.dclocal_read_repair_chance)
+        return false;
+    }
+
     return true;
   }
 
@@ -1787,6 +1845,11 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
     if (present_value_alias)
       builder.append(value_alias);
 
+    boolean present_dclocal_read_repair_chance = true && (isSetDclocal_read_repair_chance());
+    builder.append(present_dclocal_read_repair_chance);
+    if (present_dclocal_read_repair_chance)
+      builder.append(dclocal_read_repair_chance);
+
     return builder.toHashCode();
   }
 
@@ -2038,6 +2101,16 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
         return lastComparison;
       }
     }
+    lastComparison = Boolean.valueOf(isSetDclocal_read_repair_chance()).compareTo(typedOther.isSetDclocal_read_repair_chance());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetDclocal_read_repair_chance()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.dclocal_read_repair_chance, typedOther.dclocal_read_repair_chance);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
     return 0;
   }
 
@@ -2276,6 +2349,14 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
             org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
           }
           break;
+        case 37: // DCLOCAL_READ_REPAIR_CHANCE
+          if (field.type == org.apache.thrift.protocol.TType.DOUBLE) {
+            this.dclocal_read_repair_chance = iprot.readDouble();
+            setDclocal_read_repair_chanceIsSet(true);
+          } else { 
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+          }
+          break;
         default:
           org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
       }
@@ -2469,6 +2550,11 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
         oprot.writeFieldEnd();
       }
     }
+    if (isSetDclocal_read_repair_chance()) {
+      oprot.writeFieldBegin(DCLOCAL_READ_REPAIR_CHANCE_FIELD_DESC);
+      oprot.writeDouble(this.dclocal_read_repair_chance);
+      oprot.writeFieldEnd();
+    }
     oprot.writeFieldStop();
     oprot.writeStructEnd();
   }
@@ -2681,6 +2767,12 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
       }
       first = false;
     }
+    if (isSetDclocal_read_repair_chance()) {
+      if (!first) sb.append(", ");
+      sb.append("dclocal_read_repair_chance:");
+      sb.append(this.dclocal_read_repair_chance);
+      first = false;
+    }
     sb.append(")");
     return sb.toString();
   }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e6a4c01/src/avro/internode.genavro
----------------------------------------------------------------------
diff --git a/src/avro/internode.genavro b/src/avro/internode.genavro
index 36c2cba..d060d6e 100644
--- a/src/avro/internode.genavro
+++ b/src/avro/internode.genavro
@@ -69,6 +69,7 @@ protocol InterNode {
         union { null, string } caching = null;
         union { null, array<bytes> } column_aliases = null;
         union { null, bytes } value_alias = null;
+        union { double, null } dclocal_read_repair_chance = 0.0;
     }
 
     @aliases(["org.apache.cassandra.config.avro.KsDef"])

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e6a4c01/src/java/org/apache/cassandra/cli/CliClient.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cli/CliClient.java b/src/java/org/apache/cassandra/cli/CliClient.java
index c132a77..05409e5 100644
--- a/src/java/org/apache/cassandra/cli/CliClient.java
+++ b/src/java/org/apache/cassandra/cli/CliClient.java
@@ -124,6 +124,7 @@ public class CliClient
         KEYS_CACHED,
         KEY_CACHE_SAVE_PERIOD,
         READ_REPAIR_CHANCE,
+        DCLOCAL_READ_REPAIR_CHANCE,
         GC_GRACE,
         COLUMN_METADATA,
         MEMTABLE_OPERATIONS,
@@ -1196,6 +1197,14 @@ public class CliClient
 
                 cfDef.setRead_repair_chance(chance);
                 break;
+            case DCLOCAL_READ_REPAIR_CHANCE:
+                double localChance = Double.parseDouble(mValue);
+
+                if (localChance < 0 || localChance > 1)
+                    throw new RuntimeException("Error: dclocal_read_repair_chance must be between 0 and 1.");
+
+                cfDef.setDclocal_read_repair_chance(localChance);
+                break;
             case GC_GRACE:
                 cfDef.setGc_grace_seconds(Integer.parseInt(mValue));
                 break;
@@ -1622,6 +1631,7 @@ public class CliClient
                     normaliseType(cfDef.key_validation_class, "org.apache.cassandra.db.marshal"));
 
         writeAttr(output, false, "read_repair_chance", cfDef.read_repair_chance);
+        writeAttr(output, false, "dclocal_read_repair_chance", cfDef.dclocal_read_repair_chance);
         writeAttr(output, false, "gc_grace", cfDef.gc_grace_seconds);
         writeAttr(output, false, "min_compaction_threshold", cfDef.min_compaction_threshold);
         writeAttr(output, false, "max_compaction_threshold", cfDef.max_compaction_threshold);
@@ -1975,6 +1985,7 @@ public class CliClient
         sessionState.out.printf("      GC grace seconds: %s%n", cf_def.gc_grace_seconds);
         sessionState.out.printf("      Compaction min/max thresholds: %s/%s%n", cf_def.min_compaction_threshold, cf_def.max_compaction_threshold);
         sessionState.out.printf("      Read repair chance: %s%n", cf_def.read_repair_chance);
+        sessionState.out.printf("      DC Local Read repair chance: %s%n", cf_def.dclocal_read_repair_chance);
         sessionState.out.printf("      Replicate on write: %s%n", cf_def.replicate_on_write);
         sessionState.out.printf("      Caching: %s%n", cf_def.caching);
         sessionState.out.printf("      Bloom Filter FP chance: %s%n", cf_def.isSetBloom_filter_fp_chance() ? cf_def.bloom_filter_fp_chance : "default");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e6a4c01/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index d879a2c..defa6cf 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -63,6 +63,7 @@ public final class CFMetaData
     private static Logger logger = LoggerFactory.getLogger(CFMetaData.class);
 
     public final static double DEFAULT_READ_REPAIR_CHANCE = 0.1;
+    public final static double DEFAULT_DCLOCAL_READ_REPAIR_CHANCE = 0.0;
     public final static boolean DEFAULT_REPLICATE_ON_WRITE = true;
     public final static int DEFAULT_GC_GRACE_SECONDS = 864000;
     public final static int DEFAULT_MIN_COMPACTION_THRESHOLD = 4;
@@ -148,6 +149,7 @@ public final class CFMetaData
     //OPTIONAL
     private String comment;                           // default none, for humans only
     private double readRepairChance;                  // default 1.0 (always), chance [0.0,1.0] of read repair
+    private double dcLocalReadRepairChance;           // default 0.0
     private boolean replicateOnWrite;                 // default false
     private int gcGraceSeconds;                       // default 864000 (ten days)
     private AbstractType<?> defaultValidator;         // default BytesType (no-op), use comparator types
@@ -176,6 +178,7 @@ public final class CFMetaData
 
     public CFMetaData comment(String prop) { comment = enforceCommentNotNull(prop); return this;}
     public CFMetaData readRepairChance(double prop) {readRepairChance = prop; return this;}
+    public CFMetaData dclocalReadRepairChance(double prop) {dcLocalReadRepairChance = prop; return this;}
     public CFMetaData replicateOnWrite(boolean prop) {replicateOnWrite = prop; return this;}
     public CFMetaData gcGraceSeconds(int prop) {gcGraceSeconds = prop; return this;}
     public CFMetaData defaultValidator(AbstractType<?> prop) {defaultValidator = prop; updateCfDef(); return this;}
@@ -229,6 +232,7 @@ public final class CFMetaData
     {
         // Set a bunch of defaults
         readRepairChance             = DEFAULT_READ_REPAIR_CHANCE;
+        dcLocalReadRepairChance      = DEFAULT_DCLOCAL_READ_REPAIR_CHANCE;
         replicateOnWrite             = DEFAULT_REPLICATE_ON_WRITE;
         gcGraceSeconds               = DEFAULT_GC_GRACE_SECONDS;
         minCompactionThreshold       = DEFAULT_MIN_COMPACTION_THRESHOLD;
@@ -264,6 +268,7 @@ public final class CFMetaData
 
         return newCFMD.comment(comment)
                       .readRepairChance(0)
+                      .dclocalReadRepairChance(0)
                       .gcGraceSeconds(0)
                       .mergeShardsChance(0.0);
     }
@@ -273,6 +278,7 @@ public final class CFMetaData
         return new CFMetaData(parent.ksName, parent.indexColumnFamilyName(info), ColumnFamilyType.Standard, columnComparator, null)
                              .keyValidator(info.getValidator())
                              .readRepairChance(0.0)
+                             .dclocalReadRepairChance(0.0)
                              .gcGraceSeconds(parent.gcGraceSeconds)
                              .minCompactionThreshold(parent.minCompactionThreshold)
                              .maxCompactionThreshold(parent.maxCompactionThreshold);
@@ -294,6 +300,7 @@ public final class CFMetaData
     {
         return newCFMD.comment(oldCFMD.comment)
                       .readRepairChance(oldCFMD.readRepairChance)
+                      .dclocalReadRepairChance(oldCFMD.dcLocalReadRepairChance)
                       .replicateOnWrite(oldCFMD.replicateOnWrite)
                       .gcGraceSeconds(oldCFMD.gcGraceSeconds)
                       .defaultValidator(oldCFMD.defaultValidator)
@@ -407,6 +414,7 @@ public final class CFMetaData
 
         return newCFMD.comment(cf.comment.toString())
                       .readRepairChance(cf.read_repair_chance)
+                      .dclocalReadRepairChance(cf.dclocal_read_repair_chance)
                       .replicateOnWrite(cf.replicate_on_write)
                       .gcGraceSeconds(cf.gc_grace_seconds)
                       .defaultValidator(validator)
@@ -439,6 +447,11 @@ public final class CFMetaData
         return readRepairChance;
     }
 
+    public double getDcLocalReadRepair()
+    {
+        return dcLocalReadRepairChance;
+    }
+
     public double getMergeShardsChance()
     {
         return mergeShardsChance;
@@ -539,6 +552,7 @@ public final class CFMetaData
             .append(subcolumnComparator, rhs.subcolumnComparator)
             .append(comment, rhs.comment)
             .append(readRepairChance, rhs.readRepairChance)
+            .append(dcLocalReadRepairChance, rhs.dcLocalReadRepairChance)
             .append(replicateOnWrite, rhs.replicateOnWrite)
             .append(gcGraceSeconds, rhs.gcGraceSeconds)
             .append(defaultValidator, rhs.defaultValidator)
@@ -569,6 +583,7 @@ public final class CFMetaData
             .append(subcolumnComparator)
             .append(comment)
             .append(readRepairChance)
+            .append(dcLocalReadRepairChance)
             .append(replicateOnWrite)
             .append(gcGraceSeconds)
             .append(defaultValidator)
@@ -625,6 +640,8 @@ public final class CFMetaData
                 put(CompressionParameters.SSTABLE_COMPRESSION, SnappyCompressor.class.getCanonicalName());
             }});
         }
+        if (!cf_def.isSetDclocal_read_repair_chance())
+            cf_def.setDclocal_read_repair_chance(CFMetaData.DEFAULT_DCLOCAL_READ_REPAIR_CHANCE);
     }
 
     public static CFMetaData fromThrift(org.apache.cassandra.thrift.CfDef cf_def) throws InvalidRequestException, ConfigurationException
@@ -660,6 +677,8 @@ public final class CFMetaData
             newCFMD.bloomFilterFpChance(cf_def.bloom_filter_fp_chance);
         if (cf_def.isSetCaching())
             newCFMD.caching(Caching.fromString(cf_def.caching));
+        if (cf_def.isSetDclocal_read_repair_chance())
+            newCFMD.dclocalReadRepairChance(cf_def.dclocal_read_repair_chance);
 
         CompressionParameters cp = CompressionParameters.create(cf_def.compression_options);
 
@@ -740,6 +759,8 @@ public final class CFMetaData
 
         comment = enforceCommentNotNull(cf_def.comment);
         readRepairChance = cf_def.read_repair_chance;
+        if (cf_def.isSetDclocal_read_repair_chance())
+            dcLocalReadRepairChance = cf_def.dclocal_read_repair_chance;
         replicateOnWrite = cf_def.replicate_on_write;
         gcGraceSeconds = cf_def.gc_grace_seconds;
         defaultValidator = TypeParser.parse(cf_def.default_validation_class);
@@ -870,6 +891,7 @@ public final class CFMetaData
         }
         def.setComment(enforceCommentNotNull(comment));
         def.setRead_repair_chance(readRepairChance);
+        def.setDclocal_read_repair_chance(dcLocalReadRepairChance);
         def.setReplicate_on_write(replicateOnWrite);
         def.setGc_grace_seconds(gcGraceSeconds);
         def.setDefault_validation_class(defaultValidator == null ? null : defaultValidator.toString());
@@ -1220,6 +1242,7 @@ public final class CFMetaData
             .append("subcolumncomparator", subcolumnComparator)
             .append("comment", comment)
             .append("readRepairChance", readRepairChance)
+            .append("dclocalReadRepairChance", dcLocalReadRepairChance)
             .append("replicateOnWrite", replicateOnWrite)
             .append("gcGraceSeconds", gcGraceSeconds)
             .append("defaultValidator", defaultValidator)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e6a4c01/src/java/org/apache/cassandra/cql/AlterTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/AlterTableStatement.java b/src/java/org/apache/cassandra/cql/AlterTableStatement.java
index 5ca92bb..0557397 100644
--- a/src/java/org/apache/cassandra/cql/AlterTableStatement.java
+++ b/src/java/org/apache/cassandra/cql/AlterTableStatement.java
@@ -182,6 +182,7 @@ public class AlterTableStatement
         }
 
         cfDef.read_repair_chance = cfProps.getPropertyDouble(CFPropDefs.KW_READREPAIRCHANCE, cfDef.read_repair_chance);
+        cfDef.dclocal_read_repair_chance = cfProps.getPropertyDouble(CFPropDefs.KW_DCLOCALREADREPAIRCHANCE, cfDef.dclocal_read_repair_chance);
         cfDef.gc_grace_seconds = cfProps.getPropertyInt(CFPropDefs.KW_GCGRACESECONDS, cfDef.gc_grace_seconds);
         cfDef.replicate_on_write = cfProps.getPropertyBoolean(CFPropDefs.KW_REPLICATEONWRITE, cfDef.replicate_on_write);
         cfDef.min_compaction_threshold = cfProps.getPropertyInt(CFPropDefs.KW_MINCOMPACTIONTHRESHOLD, cfDef.min_compaction_threshold);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e6a4c01/src/java/org/apache/cassandra/cql/CFPropDefs.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/CFPropDefs.java b/src/java/org/apache/cassandra/cql/CFPropDefs.java
index 896915a..8bcbcf2 100644
--- a/src/java/org/apache/cassandra/cql/CFPropDefs.java
+++ b/src/java/org/apache/cassandra/cql/CFPropDefs.java
@@ -44,6 +44,7 @@ public class CFPropDefs {
     public static final String KW_COMPARATOR = "comparator";
     public static final String KW_COMMENT = "comment";
     public static final String KW_READREPAIRCHANCE = "read_repair_chance";
+    public static final String KW_DCLOCALREADREPAIRCHANCE = "dclocal_read_repair_chance";
     public static final String KW_GCGRACESECONDS = "gc_grace_seconds";
     public static final String KW_DEFAULTVALIDATION = "default_validation";
     public static final String KW_MINCOMPACTIONTHRESHOLD = "min_compaction_threshold";
@@ -81,6 +82,7 @@ public class CFPropDefs {
         keywords.add(KW_COMPARATOR);
         keywords.add(KW_COMMENT);
         keywords.add(KW_READREPAIRCHANCE);
+        keywords.add(KW_DCLOCALREADREPAIRCHANCE);
         keywords.add(KW_GCGRACESECONDS);
         keywords.add(KW_DEFAULTVALIDATION);
         keywords.add(KW_MINCOMPACTIONTHRESHOLD);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e6a4c01/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java b/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
index 960cc9d..062cd90 100644
--- a/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
+++ b/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
@@ -177,6 +177,7 @@ public class CreateColumnFamilyStatement
 
             newCFMD.comment(cfProps.getProperty(CFPropDefs.KW_COMMENT))
                    .readRepairChance(getPropertyDouble(CFPropDefs.KW_READREPAIRCHANCE, CFMetaData.DEFAULT_READ_REPAIR_CHANCE))
+                   .dclocalReadRepairChance(getPropertyDouble(CFPropDefs.KW_DCLOCALREADREPAIRCHANCE, CFMetaData.DEFAULT_DCLOCAL_READ_REPAIR_CHANCE))
                    .replicateOnWrite(getPropertyBoolean(CFPropDefs.KW_REPLICATEONWRITE, CFMetaData.DEFAULT_REPLICATE_ON_WRITE))
                    .gcGraceSeconds(getPropertyInt(CFPropDefs.KW_GCGRACESECONDS, CFMetaData.DEFAULT_GC_GRACE_SECONDS))
                    .defaultValidator(cfProps.getValidator())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e6a4c01/src/java/org/apache/cassandra/cql3/CFPropDefs.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/CFPropDefs.java b/src/java/org/apache/cassandra/cql3/CFPropDefs.java
index 63f74b0..7ac2167 100644
--- a/src/java/org/apache/cassandra/cql3/CFPropDefs.java
+++ b/src/java/org/apache/cassandra/cql3/CFPropDefs.java
@@ -40,6 +40,7 @@ public class CFPropDefs
 
     public static final String KW_COMMENT = "comment";
     public static final String KW_READREPAIRCHANCE = "read_repair_chance";
+    public static final String KW_DCLOCALREADREPAIRCHANCE = "dclocal_read_repair_chance";
     public static final String KW_GCGRACESECONDS = "gc_grace_seconds";
     public static final String KW_MINCOMPACTIONTHRESHOLD = "min_compaction_threshold";
     public static final String KW_MAXCOMPACTIONTHRESHOLD = "max_compaction_threshold";
@@ -75,6 +76,7 @@ public class CFPropDefs
 
         keywords.add(KW_COMMENT);
         keywords.add(KW_READREPAIRCHANCE);
+        keywords.add(KW_DCLOCALREADREPAIRCHANCE);
         keywords.add(KW_GCGRACESECONDS);
         keywords.add(KW_MINCOMPACTIONTHRESHOLD);
         keywords.add(KW_MAXCOMPACTIONTHRESHOLD);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e6a4c01/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
index 4114773..368eb6d 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
@@ -154,6 +154,7 @@ public class AlterTableStatement extends SchemaAlteringStatement
         }
 
         cfDef.read_repair_chance = cfProps.getDouble(CFPropDefs.KW_READREPAIRCHANCE, cfDef.read_repair_chance);
+        cfDef.dclocal_read_repair_chance = cfProps.getDouble(CFPropDefs.KW_DCLOCALREADREPAIRCHANCE, cfDef.dclocal_read_repair_chance);
         cfDef.gc_grace_seconds = cfProps.getInt(CFPropDefs.KW_GCGRACESECONDS, cfDef.gc_grace_seconds);
         cfDef.replicate_on_write = cfProps.getBoolean(CFPropDefs.KW_REPLICATEONWRITE, cfDef.replicate_on_write);
         cfDef.min_compaction_threshold = cfProps.getInt(CFPropDefs.KW_MINCOMPACTIONTHRESHOLD, cfDef.min_compaction_threshold);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e6a4c01/src/java/org/apache/cassandra/cql3/statements/CreateColumnFamilyStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateColumnFamilyStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateColumnFamilyStatement.java
index 44d187c..767437e 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateColumnFamilyStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateColumnFamilyStatement.java
@@ -107,6 +107,7 @@ public class CreateColumnFamilyStatement extends SchemaAlteringStatement
 
             newCFMD.comment(properties.get(CFPropDefs.KW_COMMENT))
                    .readRepairChance(properties.getDouble(CFPropDefs.KW_READREPAIRCHANCE, CFMetaData.DEFAULT_READ_REPAIR_CHANCE))
+                   .dclocalReadRepairChance(properties.getDouble(CFPropDefs.KW_DCLOCALREADREPAIRCHANCE, CFMetaData.DEFAULT_DCLOCAL_READ_REPAIR_CHANCE))
                    .replicateOnWrite(properties.getBoolean(CFPropDefs.KW_REPLICATEONWRITE, CFMetaData.DEFAULT_REPLICATE_ON_WRITE))
                    .gcGraceSeconds(properties.getInt(CFPropDefs.KW_GCGRACESECONDS, CFMetaData.DEFAULT_GC_GRACE_SECONDS))
                    .defaultValidator(defaultValidator)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e6a4c01/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DatacenterReadCallback.java b/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
index eaca5ef..00540ca 100644
--- a/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
+++ b/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
@@ -22,7 +22,6 @@ package org.apache.cassandra.service;
 
 
 import java.net.InetAddress;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
@@ -35,15 +34,12 @@ import org.apache.cassandra.locator.NetworkTopologyStrategy;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.thrift.ConsistencyLevel;
 import org.apache.cassandra.thrift.UnavailableException;
-import org.apache.cassandra.utils.FBUtilities;
 
 /**
  * Datacenter Quorum response handler blocks for a quorum of responses from the local DC
  */
 public class DatacenterReadCallback<T> extends ReadCallback<T>
 {
-    private static final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
-    private static final String localdc = snitch.getDatacenter(FBUtilities.getBroadcastAddress());
     private static final Comparator<InetAddress> localComparator = new Comparator<InetAddress>()
     {
         public int compare(InetAddress endpoint1, InetAddress endpoint2)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e6a4c01/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index b02c5cd..3f30a2d 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -37,6 +37,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.ReadResponse;
 import org.apache.cassandra.db.Table;
+import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.net.IAsyncCallback;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
@@ -46,10 +47,15 @@ import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.SimpleCondition;
 import org.apache.cassandra.utils.WrappedRunnable;
 
+import com.google.common.collect.Lists;
+
 public class ReadCallback<T> implements IAsyncCallback
 {
     protected static final Logger logger = LoggerFactory.getLogger( ReadCallback.class );
 
+    protected static final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+    protected static final String localdc = snitch.getDatacenter(FBUtilities.getBroadcastAddress());
+
     public final IResponseResolver<T> resolver;
     protected final SimpleCondition condition = new SimpleCondition();
     private final long startTime;
@@ -67,15 +73,10 @@ public class ReadCallback<T> implements IAsyncCallback
         this.blockfor = determineBlockFor(consistencyLevel, command.getKeyspace());
         this.resolver = resolver;
         this.startTime = System.currentTimeMillis();
-        boolean repair = randomlyReadRepair();
         sortForConsistencyLevel(endpoints);
-        this.endpoints = repair || resolver instanceof RowRepairResolver
-                       ? endpoints
-                       : endpoints.subList(0, Math.min(endpoints.size(), blockfor));
-
+        this.endpoints = resolver instanceof RowRepairResolver ? endpoints : filterEndpoints(endpoints);
         if (logger.isDebugEnabled())
-            logger.debug(String.format("Blockfor/repair is %s/%s; setting up requests to %s",
-                                       blockfor, repair, StringUtils.join(this.endpoints, ",")));
+            logger.debug(String.format("Blockfor is %s; setting up requests to %s", blockfor, StringUtils.join(this.endpoints, ",")));
     }
 
     /**
@@ -89,7 +90,7 @@ public class ReadCallback<T> implements IAsyncCallback
         // no-op except in DRC
     }
 
-    private boolean randomlyReadRepair()
+    private List<InetAddress> filterEndpoints(List<InetAddress> ep)
     {
         if (resolver instanceof RowDigestResolver)
         {
@@ -97,10 +98,32 @@ public class ReadCallback<T> implements IAsyncCallback
             String table = ((RowDigestResolver) resolver).table;
             String columnFamily = ((ReadCommand) command).getColumnFamilyName();
             CFMetaData cfmd = Schema.instance.getTableMetaData(table).get(columnFamily);
-            return cfmd.getReadRepairChance() > FBUtilities.threadLocalRandom().nextDouble();
+            double chance = FBUtilities.threadLocalRandom().nextDouble();
+
+            // if global repair then just return all the ep's
+            if (cfmd.getReadRepairChance() > chance)
+                return ep;
+
+            // if local repair then just return localDC ep's
+            if (cfmd.getDcLocalReadRepair() > chance)
+            {
+                List<InetAddress> local = Lists.newArrayList();
+                List<InetAddress> other = Lists.newArrayList();
+                for (InetAddress add : ep)
+                {
+                    if (snitch.getDatacenter(add).equals(localdc))
+                        local.add(add);
+                    else
+                        other.add(add);
+                }
+                // check if blockfor more than we have localep's
+                if (local.size() < blockfor)
+                    local.addAll(other.subList(0, Math.min(blockfor - local.size(), other.size())));
+                return local;
+            }
         }
         // we don't read repair on range scans
-        return false;
+        return ep.subList(0, Math.min(ep.size(), blockfor));
     }
 
     public T get() throws TimeoutException, DigestMismatchException, IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e6a4c01/src/resources/org/apache/cassandra/cli/CliHelp.yaml
----------------------------------------------------------------------
diff --git a/src/resources/org/apache/cassandra/cli/CliHelp.yaml b/src/resources/org/apache/cassandra/cli/CliHelp.yaml
index 873cc41..25a9f59 100644
--- a/src/resources/org/apache/cassandra/cli/CliHelp.yaml
+++ b/src/resources/org/apache/cassandra/cli/CliHelp.yaml
@@ -665,6 +665,20 @@ commands:
           will not have any latency information from all the replicas to recognize
           when one is performing worse than usual.
 
+        - dclocal_read_repair_chance: Probability (0.0-1.0) with which to
+          perform read repairs against the node from the local data-center. If
+          this is lower than read_repair_chance, this will be ignored.
+
+          Example:
+            update column family Standard2
+                 with read_repair_chance=0.1
+                 and dclocal_read_repair_chance=0.5;
+
+            For 10 read queries, 1 will do read repair on all replicas (and
+            thus in particular on all replica of the local DC), 4 will only do
+            read repair on replica of the local DC and 5 will not do any read
+            repair.
+
         - subcomparator:  Validator to use to validate and compare sub column names
           in this column family. Only applied to Super column families. Default is
           BytesType, which is a straight forward lexical comparison of the bytes in