You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2013/06/06 04:46:50 UTC

[1/3] git commit: More detailed read repair metrics. Patch by Jingsi Zhu, reviewed by brandonwilliams for CASSANDRA-5618

Updated Branches:
  refs/heads/cassandra-1.2 b853630e3 -> e301c38d3
  refs/heads/trunk c1332890a -> b4b30cf87


More detailed read repair metrics.
Patch by Jingsi Zhu, reviewed by brandonwilliams for CASSANDRA-5618


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e301c38d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e301c38d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e301c38d

Branch: refs/heads/cassandra-1.2
Commit: e301c38d3a592ff943ea0703403542dd2b7f499d
Parents: b853630
Author: Brandon Williams <br...@apache.org>
Authored: Wed Jun 5 15:11:56 2013 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Wed Jun 5 15:11:56 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../cassandra/metrics/ReadRepairMetrics.java       |   25 +++++++++++++++
 .../org/apache/cassandra/service/ReadCallback.java |    5 ++-
 .../org/apache/cassandra/service/StorageProxy.java |   25 ++++++++++++++-
 .../cassandra/service/StorageProxyMBean.java       |    6 +++
 src/java/org/apache/cassandra/tools/NodeCmd.java   |    2 +
 src/java/org/apache/cassandra/tools/NodeProbe.java |   15 +++++++++
 7 files changed, 77 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e301c38d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index edbb94d..75b3173 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -19,6 +19,7 @@
  * cqlsh: fix COPY FROM with ReversedType (CASSANDRA-5610)
  * Allow creating CUSTOM indexes on collections (CASSANDRA-5615)
  * Evaluate now() function at execution time (CASSANDRA-5616)
+ * Expose detailed read repair metrics (CASSANDRA-5618)
 Merged from 1.1:
  * Remove buggy thrift max message length option (CASSANDRA-5529)
  * Fix NPE in Pig's widerow mode (CASSANDRA-5488)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e301c38d/src/java/org/apache/cassandra/metrics/ReadRepairMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/ReadRepairMetrics.java b/src/java/org/apache/cassandra/metrics/ReadRepairMetrics.java
new file mode 100644
index 0000000..3f48fee
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/ReadRepairMetrics.java
@@ -0,0 +1,25 @@
+package org.apache.cassandra.metrics;
+
+import java.util.concurrent.TimeUnit;
+
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Counter;
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.util.RatioGauge;
+
+/**
+ * Metrics related to Read Repair.
+ */
+public class ReadRepairMetrics {
+    public static final String GROUP_NAME = "org.apache.cassandra.metrics";
+    public static final String TYPE_NAME = "ReadRepair";
+    
+    public static final Meter repairedBlocking =
+            Metrics.newMeter(new MetricName(GROUP_NAME, TYPE_NAME, "RepairedBlocking"), "RepairedBlocking", TimeUnit.SECONDS);
+    public static final Meter repairedBackground =
+            Metrics.newMeter(new MetricName(GROUP_NAME, TYPE_NAME, "RepairedBackground"), "RepairedBackground", TimeUnit.SECONDS);
+    public static final Meter attempted = 
+            Metrics.newMeter(new MetricName(GROUP_NAME, TYPE_NAME, "Attempted"), "Attempted", TimeUnit.SECONDS);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e301c38d/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index a19df5f..92032f2 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -34,6 +34,7 @@ import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
 import org.apache.cassandra.exceptions.UnavailableException;
+import org.apache.cassandra.metrics.ReadRepairMetrics;
 import org.apache.cassandra.net.IAsyncCallback;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
@@ -174,7 +175,9 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
 
                 if (logger.isDebugEnabled())
                     logger.debug("Digest mismatch:", e);
-
+                
+                ReadRepairMetrics.repairedBackground.mark();
+                
                 ReadCommand readCommand = (ReadCommand) command;
                 final RowDataResolver repairResolver = new RowDataResolver(readCommand.table, readCommand.key, readCommand.filter());
                 IAsyncCallback repairHandler = new AsyncRepairCallback(repairResolver, endpoints.size());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e301c38d/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index e8440c4..5517387 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.ReadRepairDecision;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.Table;
@@ -59,6 +60,7 @@ import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.metrics.ClientRequestMetrics;
+import org.apache.cassandra.metrics.ReadRepairMetrics;
 import org.apache.cassandra.net.*;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -888,7 +890,13 @@ public class StorageProxy implements StorageProxyMBean
 
                 List<InetAddress> endpoints = getLiveSortedEndpoints(table, command.key);
                 CFMetaData cfm = Schema.instance.getCFMetaData(command.getKeyspace(), command.getColumnFamilyName());
-                endpoints = consistency_level.filterForQuery(table, endpoints, cfm.newReadRepairDecision());
+
+                ReadRepairDecision rrDecision = cfm.newReadRepairDecision();
+                endpoints = consistency_level.filterForQuery(table, endpoints, rrDecision);
+                
+                if (rrDecision != ReadRepairDecision.NONE) {
+                    ReadRepairMetrics.attempted.mark();
+                }
 
                 RowDigestResolver resolver = new RowDigestResolver(command.table, command.key);
                 ReadCallback<ReadResponse, Row> handler = new ReadCallback(resolver, consistency_level, command, endpoints);
@@ -960,6 +968,9 @@ public class StorageProxy implements StorageProxyMBean
                 catch (DigestMismatchException ex)
                 {
                     logger.debug("Digest mismatch: {}", ex.toString());
+                    
+                    ReadRepairMetrics.repairedBlocking.mark();
+                    
                     // Do a full data read to resolve the correct response (and repair node that need be)
                     RowDataResolver resolver = new RowDataResolver(command.table, command.key, command.filter());
                     ReadCallback<ReadResponse, Row> repairHandler = handler.withNewResolver(resolver);
@@ -1702,4 +1713,16 @@ public class StorageProxy implements StorageProxyMBean
 
     public Long getTruncateRpcTimeout() { return DatabaseDescriptor.getTruncateRpcTimeout(); }
     public void setTruncateRpcTimeout(Long timeoutInMillis) { DatabaseDescriptor.setTruncateRpcTimeout(timeoutInMillis); }
+    
+    public long getReadRepairAttempted() {
+        return ReadRepairMetrics.attempted.count();
+    }
+    
+    public long getReadRepairRepairedBlocking() {
+        return ReadRepairMetrics.repairedBlocking.count();
+    }
+    
+    public long getReadRepairRepairedBackground() {
+        return ReadRepairMetrics.repairedBackground.count();
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e301c38d/src/java/org/apache/cassandra/service/StorageProxyMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxyMBean.java b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
index 1bbfd1f..e8f5b4a 100644
--- a/src/java/org/apache/cassandra/service/StorageProxyMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
@@ -17,6 +17,8 @@
  */
 package org.apache.cassandra.service;
 
+import org.apache.cassandra.metrics.ReadRepairMetrics;
+
 public interface StorageProxyMBean
 {
     /**
@@ -86,4 +88,8 @@ public interface StorageProxyMBean
     public void setRangeRpcTimeout(Long timeoutInMillis);
     public Long getTruncateRpcTimeout();
     public void setTruncateRpcTimeout(Long timeoutInMillis);
+    
+    public long getReadRepairAttempted();
+    public long getReadRepairRepairedBlocking();
+    public long getReadRepairRepairedBackground();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e301c38d/src/java/org/apache/cassandra/tools/NodeCmd.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java b/src/java/org/apache/cassandra/tools/NodeCmd.java
index bca0fcd..1cb30ad 100644
--- a/src/java/org/apache/cassandra/tools/NodeCmd.java
+++ b/src/java/org/apache/cassandra/tools/NodeCmd.java
@@ -669,6 +669,8 @@ public class NodeCmd
                 outs.printf("   Error retrieving file data for %s%n", host);
             }
         }
+        
+        outs.printf("Read Repair Statistics:%nAttempted: %d%nMismatch (Blocking): %d%nMismatch (Background): %d%n", probe.getReadRepairAttempted(), probe.getReadRepairRepairedBlocking(), probe.getReadRepairRepairedBackground());
 
         MessagingServiceMBean ms = probe.msProxy;
         outs.printf("%-25s", "Pool Name");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e301c38d/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 5db8f1c..dbcb66e 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -825,6 +825,21 @@ public class NodeProbe
     {
         return failed;
     }
+    
+    public long getReadRepairAttempted()
+    {
+        return spProxy.getReadRepairAttempted();
+    }
+    
+    public long getReadRepairRepairedBlocking()
+    {
+        return spProxy.getReadRepairRepairedBlocking();
+    }
+    
+    public long getReadRepairRepairedBackground()
+    {
+        return spProxy.getReadRepairRepairedBackground();
+    }
 }
 
 class ColumnFamilyStoreMBeanIterator implements Iterator<Map.Entry<String, ColumnFamilyStoreMBean>>


[3/3] git commit: More detailed read repair metrics. Patch by Jingsi Zhu, reviewed by brandonwilliams for CASSANDRA-5618

Posted by br...@apache.org.
More detailed read repair metrics.
Patch by Jingsi Zhu, reviewed by brandonwilliams for CASSANDRA-5618


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b4b30cf8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b4b30cf8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b4b30cf8

Branch: refs/heads/trunk
Commit: b4b30cf87cbf3f7957c64f625f0f90d24b43b49c
Parents: 6193fef
Author: Brandon Williams <br...@apache.org>
Authored: Wed Jun 5 21:41:54 2013 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Wed Jun 5 21:46:27 2013 -0500

----------------------------------------------------------------------
 .../cassandra/service/AbstractReadExecutor.java    |   11 ++++++++++-
 .../org/apache/cassandra/service/StorageProxy.java |    5 ++++-
 .../cassandra/service/StorageProxyMBean.java       |    2 +-
 3 files changed, 15 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4b30cf8/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
index 1c7c6f9..f944507 100644
--- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
@@ -25,6 +25,7 @@ import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.config.ReadRepairDecision;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.ReadCommand;
@@ -33,6 +34,7 @@ import org.apache.cassandra.db.Row;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
 import org.apache.cassandra.exceptions.UnavailableException;
+import org.apache.cassandra.metrics.ReadRepairMetrics;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageProxy.LocalReadRunnable;
@@ -125,7 +127,14 @@ public abstract class AbstractReadExecutor
         Table table = Table.open(command.table);
         List<InetAddress> allReplicas = StorageProxy.getLiveSortedEndpoints(table, command.key);
         CFMetaData metaData = Schema.instance.getCFMetaData(command.table, command.cfName);
-        List<InetAddress> queryTargets = consistency_level.filterForQuery(table, allReplicas, metaData.newReadRepairDecision());
+
+        ReadRepairDecision rrDecision = metaData.newReadRepairDecision();
+         
+        if (rrDecision != ReadRepairDecision.NONE) {
+            ReadRepairMetrics.attempted.mark();
+        }
+
+        List<InetAddress> queryTargets = consistency_level.filterForQuery(table, allReplicas, rrDecision);
 
         if (StorageService.instance.isClientMode())
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4b30cf8/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 852e8ca..62e4d86 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1247,7 +1247,6 @@ public class StorageProxy implements StorageProxyMBean
                     logger.trace("Digest mismatch: {}", ex);
                     
                     ReadRepairMetrics.repairedBlocking.mark();
-                    
                     // Do a full data read to resolve the correct response (and repair node that need be)
                     RowDataResolver resolver = new RowDataResolver(exec.command.table, exec.command.key, exec.command.filter());
                     ReadCallback<ReadResponse, Row> repairHandler = exec.handler.withNewResolver(resolver);
@@ -1991,6 +1990,10 @@ public class StorageProxy implements StorageProxyMBean
     public Long getTruncateRpcTimeout() { return DatabaseDescriptor.getTruncateRpcTimeout(); }
     public void setTruncateRpcTimeout(Long timeoutInMillis) { DatabaseDescriptor.setTruncateRpcTimeout(timeoutInMillis); }
     public void reloadTriggerClass() { TriggerExecutor.instance.reloadClasses(); }
+<<<<<<< HEAD
+=======
+
+>>>>>>> More detailed read repair metrics.
     
     public long getReadRepairAttempted() {
         return ReadRepairMetrics.attempted.count();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4b30cf8/src/java/org/apache/cassandra/service/StorageProxyMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxyMBean.java b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
index 3cb4f17..32fa1be 100644
--- a/src/java/org/apache/cassandra/service/StorageProxyMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
@@ -92,7 +92,7 @@ public interface StorageProxyMBean
     public void setTruncateRpcTimeout(Long timeoutInMillis);
 
     public void reloadTriggerClass();
-    
+
     public long getReadRepairAttempted();
     public long getReadRepairRepairedBlocking();
     public long getReadRepairRepairedBackground();


[2/3] git commit: More detailed read repair metrics. Patch by Jingsi Zhu, reviewed by brandonwilliams for CASSANDRA-5618

Posted by br...@apache.org.
More detailed read repair metrics.
Patch by Jingsi Zhu, reviewed by brandonwilliams for CASSANDRA-5618


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6193fefb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6193fefb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6193fefb

Branch: refs/heads/trunk
Commit: 6193fefbf7d92c0b909dccf5a2519b9266983091
Parents: c133289
Author: Brandon Williams <br...@apache.org>
Authored: Wed Jun 5 15:11:56 2013 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Wed Jun 5 21:44:44 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../cassandra/metrics/ReadRepairMetrics.java       |   25 +++++
 .../org/apache/cassandra/service/ReadCallback.java |    5 +-
 .../org/apache/cassandra/service/StorageProxy.java |   72 +++++++++++++++
 .../cassandra/service/StorageProxyMBean.java       |    6 +
 src/java/org/apache/cassandra/tools/NodeCmd.java   |    2 +
 src/java/org/apache/cassandra/tools/NodeProbe.java |   15 +++
 7 files changed, 125 insertions(+), 1 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6193fefb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 04122b9..4b84bdf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -78,6 +78,7 @@
  * cqlsh: fix COPY FROM with ReversedType (CASSANDRA-5610)
  * Allow creating CUSTOM indexes on collections (CASSANDRA-5615)
  * Evaluate now() function at execution time (CASSANDRA-5616)
+ * Expose detailed read repair metrics (CASSANDRA-5618)
 Merged from 1.1:
  * Remove buggy thrift max message length option (CASSANDRA-5529)
  * Fix NPE in Pig's widerow mode (CASSANDRA-5488)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6193fefb/src/java/org/apache/cassandra/metrics/ReadRepairMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/ReadRepairMetrics.java b/src/java/org/apache/cassandra/metrics/ReadRepairMetrics.java
new file mode 100644
index 0000000..3f48fee
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/ReadRepairMetrics.java
@@ -0,0 +1,25 @@
+package org.apache.cassandra.metrics;
+
+import java.util.concurrent.TimeUnit;
+
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Counter;
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.util.RatioGauge;
+
+/**
+ * Metrics related to Read Repair.
+ */
+public class ReadRepairMetrics {
+    public static final String GROUP_NAME = "org.apache.cassandra.metrics";
+    public static final String TYPE_NAME = "ReadRepair";
+    
+    public static final Meter repairedBlocking =
+            Metrics.newMeter(new MetricName(GROUP_NAME, TYPE_NAME, "RepairedBlocking"), "RepairedBlocking", TimeUnit.SECONDS);
+    public static final Meter repairedBackground =
+            Metrics.newMeter(new MetricName(GROUP_NAME, TYPE_NAME, "RepairedBackground"), "RepairedBackground", TimeUnit.SECONDS);
+    public static final Meter attempted = 
+            Metrics.newMeter(new MetricName(GROUP_NAME, TYPE_NAME, "Attempted"), "Attempted", TimeUnit.SECONDS);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6193fefb/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index 79e15b1..fe7f4d7 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -34,6 +34,7 @@ import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
 import org.apache.cassandra.exceptions.UnavailableException;
+import org.apache.cassandra.metrics.ReadRepairMetrics;
 import org.apache.cassandra.net.IAsyncCallback;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
@@ -181,7 +182,9 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
 
                 if (logger.isDebugEnabled())
                     logger.debug("Digest mismatch:", e);
-
+                
+                ReadRepairMetrics.repairedBackground.mark();
+                
                 ReadCommand readCommand = (ReadCommand) command;
                 final RowDataResolver repairResolver = new RowDataResolver(readCommand.table, readCommand.key, readCommand.filter());
                 AsyncRepairCallback repairHandler = new AsyncRepairCallback(repairResolver, endpoints.size());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6193fefb/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 3da923b..852e8ca 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.ReadRepairDecision;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.Table;
@@ -59,6 +60,7 @@ import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.metrics.ClientRequestMetrics;
+import org.apache.cassandra.metrics.ReadRepairMetrics;
 import org.apache.cassandra.net.*;
 import org.apache.cassandra.service.paxos.*;
 import org.apache.cassandra.tracing.Tracing;
@@ -1161,9 +1163,64 @@ public class StorageProxy implements StorageProxyMBean
                 assert !command.isDigestQuery();
                 logger.trace("Command/ConsistencyLevel is {}/{}", command, consistency_level);
 
+<<<<<<< HEAD
                 AbstractReadExecutor exec = AbstractReadExecutor.getReadExecutor(command, consistency_level);
                 exec.executeAsync();
                 readExecutors[i] = exec;
+=======
+                List<InetAddress> endpoints = getLiveSortedEndpoints(table, command.key);
+                CFMetaData cfm = Schema.instance.getCFMetaData(command.getKeyspace(), command.getColumnFamilyName());
+
+                ReadRepairDecision rrDecision = cfm.newReadRepairDecision();
+                endpoints = consistency_level.filterForQuery(table, endpoints, rrDecision);
+                
+                if (rrDecision != ReadRepairDecision.NONE) {
+                    ReadRepairMetrics.attempted.mark();
+                }
+
+                RowDigestResolver resolver = new RowDigestResolver(command.table, command.key);
+                ReadCallback<ReadResponse, Row> handler = new ReadCallback(resolver, consistency_level, command, endpoints);
+                handler.assureSufficientLiveNodes();
+                assert !endpoints.isEmpty();
+                readCallbacks[i] = handler;
+
+                // The data-request message is sent to dataPoint, the node that will actually get the data for us
+                InetAddress dataPoint = endpoints.get(0);
+                if (dataPoint.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
+                {
+                    logger.trace("reading data locally");
+                    StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(command, handler));
+                }
+                else
+                {
+                    logger.trace("reading data from {}", dataPoint);
+                    MessagingService.instance().sendRR(command.createMessage(), dataPoint, handler);
+                }
+
+                if (endpoints.size() == 1)
+                    continue;
+
+                // send the other endpoints a digest request
+                ReadCommand digestCommand = command.copy();
+                digestCommand.setDigestQuery(true);
+                MessageOut message = null;
+                for (InetAddress digestPoint : endpoints.subList(1, endpoints.size()))
+                {
+                    if (digestPoint.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
+                    {
+                        logger.trace("reading digest locally");
+                        StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(digestCommand, handler));
+                    }
+                    else
+                    {
+                        logger.trace("reading digest from {}", digestPoint);
+                        // (We lazy-construct the digest Message object since it may not be necessary if we
+                        // are doing a local digest read, or no digest reads at all.)
+                        if (message == null)
+                            message = digestCommand.createMessage();
+                        MessagingService.instance().sendRR(message, digestPoint, handler);
+                    }
+                }
             }
 
             for (AbstractReadExecutor exec: readExecutors)
@@ -1188,6 +1245,9 @@ public class StorageProxy implements StorageProxyMBean
                 catch (DigestMismatchException ex)
                 {
                     logger.trace("Digest mismatch: {}", ex);
+                    
+                    ReadRepairMetrics.repairedBlocking.mark();
+                    
                     // Do a full data read to resolve the correct response (and repair node that need be)
                     RowDataResolver resolver = new RowDataResolver(exec.command.table, exec.command.key, exec.command.filter());
                     ReadCallback<ReadResponse, Row> repairHandler = exec.handler.withNewResolver(resolver);
@@ -1931,4 +1991,16 @@ public class StorageProxy implements StorageProxyMBean
     public Long getTruncateRpcTimeout() { return DatabaseDescriptor.getTruncateRpcTimeout(); }
     public void setTruncateRpcTimeout(Long timeoutInMillis) { DatabaseDescriptor.setTruncateRpcTimeout(timeoutInMillis); }
     public void reloadTriggerClass() { TriggerExecutor.instance.reloadClasses(); }
+    
+    public long getReadRepairAttempted() {
+        return ReadRepairMetrics.attempted.count();
+    }
+    
+    public long getReadRepairRepairedBlocking() {
+        return ReadRepairMetrics.repairedBlocking.count();
+    }
+    
+    public long getReadRepairRepairedBackground() {
+        return ReadRepairMetrics.repairedBackground.count();
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6193fefb/src/java/org/apache/cassandra/service/StorageProxyMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxyMBean.java b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
index f482fc7..3cb4f17 100644
--- a/src/java/org/apache/cassandra/service/StorageProxyMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
@@ -17,6 +17,8 @@
  */
 package org.apache.cassandra.service;
 
+import org.apache.cassandra.metrics.ReadRepairMetrics;
+
 public interface StorageProxyMBean
 {
     /**
@@ -90,4 +92,8 @@ public interface StorageProxyMBean
     public void setTruncateRpcTimeout(Long timeoutInMillis);
 
     public void reloadTriggerClass();
+    
+    public long getReadRepairAttempted();
+    public long getReadRepairRepairedBlocking();
+    public long getReadRepairRepairedBackground();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6193fefb/src/java/org/apache/cassandra/tools/NodeCmd.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java b/src/java/org/apache/cassandra/tools/NodeCmd.java
index ca81857..9f524fc 100644
--- a/src/java/org/apache/cassandra/tools/NodeCmd.java
+++ b/src/java/org/apache/cassandra/tools/NodeCmd.java
@@ -665,6 +665,8 @@ public class NodeCmd
                 outs.printf("   Error retrieving file data for %s%n", host);
             }
         }
+        
+        outs.printf("Read Repair Statistics:%nAttempted: %d%nMismatch (Blocking): %d%nMismatch (Background): %d%n", probe.getReadRepairAttempted(), probe.getReadRepairRepairedBlocking(), probe.getReadRepairRepairedBackground());
 
         MessagingServiceMBean ms = probe.msProxy;
         outs.printf("%-25s", "Pool Name");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6193fefb/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 31df3b9..af94d14 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -828,6 +828,21 @@ public class NodeProbe
     {
         return failed;
     }
+    
+    public long getReadRepairAttempted()
+    {
+        return spProxy.getReadRepairAttempted();
+    }
+    
+    public long getReadRepairRepairedBlocking()
+    {
+        return spProxy.getReadRepairRepairedBlocking();
+    }
+    
+    public long getReadRepairRepairedBackground()
+    {
+        return spProxy.getReadRepairRepairedBackground();
+    }
 }
 
 class ColumnFamilyStoreMBeanIterator implements Iterator<Map.Entry<String, ColumnFamilyStoreMBean>>