You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/02/16 21:12:39 UTC
svn commit: r1071380 - in /cassandra/trunk: ./ contrib/
interface/thrift/gen-java/org/apache/cassandra/thrift/
src/java/org/apache/cassandra/concurrent/ src/java/org/apache/cassandra/db/
src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/...
Author: jbellis
Date: Wed Feb 16 20:12:37 2011
New Revision: 1071380
URL: http://svn.apache.org/viewvc?rev=1071380&view=rev
Log:
merge from 0.7
Added:
cassandra/trunk/src/java/org/apache/cassandra/service/AbstractRowResolver.java
cassandra/trunk/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
cassandra/trunk/src/java/org/apache/cassandra/service/IReadCommand.java
cassandra/trunk/src/java/org/apache/cassandra/service/RowDigestResolver.java
cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java
cassandra/trunk/test/unit/org/apache/cassandra/service/RowResolverTest.java
Removed:
cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
cassandra/trunk/test/unit/org/apache/cassandra/service/ReadResponseResolverTest.java
Modified:
cassandra/trunk/ (props changed)
cassandra/trunk/CHANGES.txt
cassandra/trunk/contrib/ (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java (props changed)
cassandra/trunk/src/java/org/apache/cassandra/concurrent/Stage.java
cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java
cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java
cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java
cassandra/trunk/src/java/org/apache/cassandra/net/IMessageCallback.java
cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
cassandra/trunk/src/java/org/apache/cassandra/service/ReadCallback.java
cassandra/trunk/src/java/org/apache/cassandra/service/RepairCallback.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
cassandra/trunk/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java
cassandra/trunk/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java
Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Feb 16 20:12:37 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1068009,1068978
-/cassandra/branches/cassandra-0.7:1026516-1070530
+/cassandra/branches/cassandra-0.7:1026516-1070530,1070977
/cassandra/branches/cassandra-0.7.0:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3:774578-796573
Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1071380&r1=1071379&r2=1071380&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Wed Feb 16 20:12:37 2011
@@ -14,6 +14,7 @@
* copy DecoratedKey.key when inserting into caches to avoid retaining
a reference to the underlying buffer (CASSANDRA-2102)
* format subcolumn names with subcomparator (CASSANDRA-2136)
+ * lower-latency read repair (CASSANDRA-2069)
0.7.1
Propchange: cassandra/trunk/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Feb 16 20:12:37 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/contrib:922689-1052356,1052358-1053452,1053454,1053456-1068009
-/cassandra/branches/cassandra-0.7/contrib:1026516-1070530
+/cassandra/branches/cassandra-0.7/contrib:1026516-1070530,1070977
/cassandra/branches/cassandra-0.7.0/contrib:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/contrib:774578-796573
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Feb 16 20:12:37 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1068009,1068978
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1070530
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1070530,1070977
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Feb 16 20:12:37 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1068009,1068978
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1070530
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1070530,1070977
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Feb 16 20:12:37 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1068009,1068978
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1070530
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1070530,1070977
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Feb 16 20:12:37 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1068009,1068978
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1070530
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1070530,1070977
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Feb 16 20:12:37 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1068009,1068978
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1070530
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1070530,1070977
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
Modified: cassandra/trunk/src/java/org/apache/cassandra/concurrent/Stage.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/Stage.java?rev=1071380&r1=1071379&r2=1071380&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/concurrent/Stage.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/concurrent/Stage.java Wed Feb 16 20:12:37 2011
@@ -32,6 +32,7 @@ public enum Stage
MIGRATION,
MISC,
INTERNAL_RESPONSE,
+ READ_REPAIR,
REPLICATE_ON_WRITE;
public String getJmxType()
@@ -49,6 +50,7 @@ public enum Stage
case READ:
case REQUEST_RESPONSE:
case REPLICATE_ON_WRITE:
+ case READ_REPAIR:
return "request";
default:
throw new AssertionError("Unknown stage " + this);
Modified: cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java?rev=1071380&r1=1071379&r2=1071380&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java Wed Feb 16 20:12:37 2011
@@ -50,6 +50,7 @@ public class StageManager
stages.put(Stage.ANTI_ENTROPY, new JMXEnabledThreadPoolExecutor(Stage.ANTI_ENTROPY));
stages.put(Stage.MIGRATION, new JMXEnabledThreadPoolExecutor(Stage.MIGRATION));
stages.put(Stage.MISC, new JMXEnabledThreadPoolExecutor(Stage.MISC));
+ stages.put(Stage.READ_REPAIR, multiThreadedStage(Stage.READ_REPAIR, Runtime.getRuntime().availableProcessors()));
}
private static ThreadPoolExecutor multiThreadedStage(Stage stage, int numThreads)
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java?rev=1071380&r1=1071379&r2=1071380&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java Wed Feb 16 20:12:37 2011
@@ -48,6 +48,7 @@ import org.apache.cassandra.io.ICompactS
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessageProducer;
+import org.apache.cassandra.service.IReadCommand;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.thrift.SlicePredicate;
@@ -57,7 +58,7 @@ import org.apache.thrift.TDeserializer;
import org.apache.thrift.TSerializer;
import org.apache.cassandra.thrift.TBinaryProtocol;
-public class RangeSliceCommand implements MessageProducer
+public class RangeSliceCommand implements MessageProducer, IReadCommand
{
private static final RangeSliceCommandSerializer serializer = new RangeSliceCommandSerializer();
@@ -114,6 +115,11 @@ public class RangeSliceCommand implement
ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
return serializer.deserialize(new DataInputStream(bis), message.getVersion());
}
+
+ public String getKeyspace()
+ {
+ return keyspace;
+ }
}
class RangeSliceCommandSerializer implements ICompactSerializer<RangeSliceCommand>
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java?rev=1071380&r1=1071379&r2=1071380&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java Wed Feb 16 20:12:37 2011
@@ -31,11 +31,12 @@ import org.apache.cassandra.db.marshal.A
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessageProducer;
+import org.apache.cassandra.service.IReadCommand;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
-public abstract class ReadCommand implements MessageProducer
+public abstract class ReadCommand implements MessageProducer, IReadCommand
{
public static final byte CMD_TYPE_GET_SLICE_BY_NAMES = 1;
public static final byte CMD_TYPE_GET_SLICE = 2;
@@ -92,6 +93,11 @@ public abstract class ReadCommand implem
{
return ColumnFamily.getComparatorFor(table, getColumnFamilyName(), queryPath.superColumnName);
}
+
+ public String getKeyspace()
+ {
+ return table;
+ }
}
class ReadCommandSerializer implements ICompactSerializer<ReadCommand>
Modified: cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=1071380&r1=1071379&r2=1071380&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java Wed Feb 16 20:12:37 2011
@@ -287,5 +287,10 @@ public class BootStrapper
token = StorageService.getPartitioner().getTokenFactory().fromString(new String(msg.getMessageBody(), Charsets.UTF_8));
condition.signalAll();
}
+
+ public boolean isLatencyForSnitch()
+ {
+ return false;
+ }
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java?rev=1071380&r1=1071379&r2=1071380&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java Wed Feb 16 20:12:37 2011
@@ -96,6 +96,11 @@ class AsyncResult implements IAsyncResul
}
}
+ public boolean isLatencyForSnitch()
+ {
+ return false;
+ }
+
public InetAddress getFrom()
{
return from;
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/IMessageCallback.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/IMessageCallback.java?rev=1071380&r1=1071379&r2=1071380&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/IMessageCallback.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/IMessageCallback.java Wed Feb 16 20:12:37 2011
@@ -23,4 +23,9 @@ package org.apache.cassandra.net;
public interface IMessageCallback
{
+ /**
+ * @return true if this callback is on the read path and its latency should be
+ * given as input to the dynamic snitch.
+ */
+ public boolean isLatencyForSnitch();
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=1071380&r1=1071379&r2=1071380&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Wed Feb 16 20:12:37 2011
@@ -143,7 +143,7 @@ public final class MessagingService impl
*/
public void maybeAddLatency(IMessageCallback cb, InetAddress address, double latency)
{
- if (cb instanceof ReadCallback || cb instanceof AsyncResult)
+ if (cb.isLatencyForSnitch())
addLatency(address, latency);
}
Added: cassandra/trunk/src/java/org/apache/cassandra/service/AbstractRowResolver.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AbstractRowResolver.java?rev=1071380&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/AbstractRowResolver.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/AbstractRowResolver.java Wed Feb 16 20:12:37 2011
@@ -0,0 +1,70 @@
+package org.apache.cassandra.service;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOError;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.ReadResponse;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.utils.FBUtilities;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
+
+public abstract class AbstractRowResolver implements IResponseResolver<Row>
+{
+ protected static Logger logger = LoggerFactory.getLogger(AbstractRowResolver.class);
+
+ private static final Message FAKE_MESSAGE = new Message(FBUtilities.getLocalAddress(), StorageService.Verb.INTERNAL_RESPONSE, ArrayUtils.EMPTY_BYTE_ARRAY, -1);
+
+ protected final String table;
+ protected final ConcurrentMap<Message, ReadResponse> replies = new NonBlockingHashMap<Message, ReadResponse>();
+ protected final DecoratedKey key;
+
+ public AbstractRowResolver(ByteBuffer key, String table)
+ {
+ this.key = StorageService.getPartitioner().decorateKey(key);
+ this.table = table;
+ }
+
+ public void preprocess(Message message)
+ {
+ byte[] body = message.getMessageBody();
+ ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
+ try
+ {
+ ReadResponse result = ReadResponse.serializer().deserialize(new DataInputStream(bufIn), message.getVersion());
+ if (logger.isDebugEnabled())
+ logger.debug("Preprocessed {} response", result.isDigestQuery() ? "digest" : "data");
+ replies.put(message, result);
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ }
+
+ /** hack so local reads don't force de/serialization of an extra real Message */
+ public void injectPreProcessed(ReadResponse result)
+ {
+ assert replies.get(FAKE_MESSAGE) == null; // should only be one local reply
+ replies.put(FAKE_MESSAGE, result);
+ }
+
+ public Iterable<Message> getMessages()
+ {
+ return replies.keySet();
+ }
+
+ public int getMessageCount()
+ {
+ return replies.size();
+ }
+}
Added: cassandra/trunk/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AsyncRepairCallback.java?rev=1071380&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/AsyncRepairCallback.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/AsyncRepairCallback.java Wed Feb 16 20:12:37 2011
@@ -0,0 +1,41 @@
+package org.apache.cassandra.service;
+
+import java.io.IOException;
+
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.utils.WrappedRunnable;
+
+public class AsyncRepairCallback implements IAsyncCallback
+{
+ private final RowRepairResolver repairResolver;
+ private final int count;
+
+ public AsyncRepairCallback(RowRepairResolver repairResolver, int count)
+ {
+ this.repairResolver = repairResolver;
+ this.count = count;
+ }
+
+ public void response(Message message)
+ {
+ repairResolver.preprocess(message);
+ if (repairResolver.getMessageCount() == count)
+ {
+ StageManager.getStage(Stage.READ_REPAIR).execute(new WrappedRunnable()
+ {
+ protected void runMayThrow() throws DigestMismatchException, IOException
+ {
+ repairResolver.resolve();
+ }
+ });
+ }
+ }
+
+ public boolean isLatencyForSnitch()
+ {
+ return true;
+ }
+}
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterReadCallback.java?rev=1071380&r1=1071379&r2=1071380&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterReadCallback.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterReadCallback.java Wed Feb 16 20:12:37 2011
@@ -22,12 +22,12 @@ package org.apache.cassandra.service;
import java.net.InetAddress;
-import java.util.Collection;
+import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.ReadResponse;
+import org.apache.cassandra.db.Table;
import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.locator.NetworkTopologyStrategy;
import org.apache.cassandra.net.Message;
@@ -44,12 +44,12 @@ public class DatacenterReadCallback<T> e
private static final String localdc = snitch.getDatacenter(FBUtilities.getLocalAddress());
private AtomicInteger localResponses;
- public DatacenterReadCallback(IResponseResolver<T> resolver, ConsistencyLevel consistencyLevel, String table)
+ public DatacenterReadCallback(IResponseResolver resolver, ConsistencyLevel consistencyLevel, IReadCommand command, List<InetAddress> endpoints)
{
- super(resolver, consistencyLevel, table);
+ super(resolver, consistencyLevel, command, endpoints);
localResponses = new AtomicInteger(blockfor);
}
-
+
@Override
public void response(Message message)
{
@@ -68,14 +68,15 @@ public class DatacenterReadCallback<T> e
@Override
public void response(ReadResponse result)
{
- ((ReadResponseResolver) resolver).injectPreProcessed(result);
+ ((RowDigestResolver) resolver).injectPreProcessed(result);
int n = localResponses.decrementAndGet();
-
if (n == 0 && resolver.isDataPresent())
{
condition.signal();
}
+
+ maybeResolveForRepair();
}
@Override
@@ -86,7 +87,7 @@ public class DatacenterReadCallback<T> e
}
@Override
- public void assureSufficientLiveNodes(Collection<InetAddress> endpoints) throws UnavailableException
+ public void assureSufficientLiveNodes() throws UnavailableException
{
int localEndpoints = 0;
for (InetAddress endpoint : endpoints)
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java?rev=1071380&r1=1071379&r2=1071380&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java Wed Feb 16 20:12:37 2011
@@ -115,4 +115,9 @@ public class DatacenterSyncWriteResponse
throw new UnavailableException();
}
}
+
+ public boolean isLatencyForSnitch()
+ {
+ return false;
+ }
}
Added: cassandra/trunk/src/java/org/apache/cassandra/service/IReadCommand.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/IReadCommand.java?rev=1071380&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/IReadCommand.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/IReadCommand.java Wed Feb 16 20:12:37 2011
@@ -0,0 +1,6 @@
+package org.apache.cassandra.service;
+
+public interface IReadCommand
+{
+ public String getKeyspace();
+}
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java?rev=1071380&r1=1071379&r2=1071380&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java Wed Feb 16 20:12:37 2011
@@ -49,7 +49,6 @@ public class RangeSliceResponseResolver
public RangeSliceResponseResolver(String table, List<InetAddress> sources)
{
- assert sources.size() > 0;
this.sources = sources;
this.table = table;
}
@@ -103,8 +102,8 @@ public class RangeSliceResponseResolver
protected Row getReduced()
{
- ColumnFamily resolved = ReadResponseResolver.resolveSuperset(versions);
- ReadResponseResolver.maybeScheduleRepairs(resolved, table, key, versions, versionSources);
+ ColumnFamily resolved = RowRepairResolver.resolveSuperset(versions);
+ RowRepairResolver.maybeScheduleRepairs(resolved, table, key, versions, versionSources);
versions.clear();
versionSources.clear();
return new Row(key, resolved);
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/ReadCallback.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/ReadCallback.java?rev=1071380&r1=1071379&r2=1071380&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/ReadCallback.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/ReadCallback.java Wed Feb 16 20:12:37 2011
@@ -20,14 +20,20 @@ package org.apache.cassandra.service;
import java.io.IOException;
import java.net.InetAddress;
-import java.util.Collection;
+import java.util.List;
+import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.ReadResponse;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.net.IAsyncCallback;
@@ -36,28 +42,61 @@ import org.apache.cassandra.net.Messagin
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.UnavailableException;
import org.apache.cassandra.utils.SimpleCondition;
+import org.apache.cassandra.utils.WrappedRunnable;
public class ReadCallback<T> implements IAsyncCallback
{
protected static final Logger logger = LoggerFactory.getLogger( ReadCallback.class );
+ private static final ThreadLocal<Random> random = new ThreadLocal<Random>()
+ {
+ @Override
+ protected Random initialValue()
+ {
+ return new Random();
+ }
+ };
+
public final IResponseResolver<T> resolver;
protected final SimpleCondition condition = new SimpleCondition();
private final long startTime;
protected final int blockfor;
-
+ final List<InetAddress> endpoints;
+ private final IReadCommand command;
+
/**
* Constructor when response count has to be calculated and blocked for.
*/
- public ReadCallback(IResponseResolver<T> resolver, ConsistencyLevel consistencyLevel, String table)
+ public ReadCallback(IResponseResolver<T> resolver, ConsistencyLevel consistencyLevel, IReadCommand command, List<InetAddress> endpoints)
{
- this.blockfor = determineBlockFor(consistencyLevel, table);
+ this.command = command;
+ this.blockfor = determineBlockFor(consistencyLevel, command.getKeyspace());
this.resolver = resolver;
this.startTime = System.currentTimeMillis();
-
- logger.debug("ReadCallback blocking for {} responses", blockfor);
+ boolean repair = randomlyReadRepair();
+ this.endpoints = repair || resolver instanceof RowRepairResolver
+ ? endpoints
+ : endpoints.subList(0, Math.min(endpoints.size(), blockfor)); // min so as to not throw exception until assureSufficient is called
+
+ if (logger.isDebugEnabled())
+ logger.debug(String.format("Blockfor/repair is %s/%s; setting up requests to %s",
+ blockfor, repair, StringUtils.join(this.endpoints, ",")));
}
+ private boolean randomlyReadRepair()
+ {
+ if (resolver instanceof RowDigestResolver)
+ {
+ assert command instanceof ReadCommand : command;
+ String table = ((RowDigestResolver) resolver).table;
+ String columnFamily = ((ReadCommand) command).getColumnFamilyName();
+ CFMetaData cfmd = DatabaseDescriptor.getTableMetaData(table).get(columnFamily);
+ return cfmd.getReadRepairChance() > random.get().nextDouble();
+ }
+ // we don't read repair on range scans
+ return false;
+ }
+
public T get() throws TimeoutException, DigestMismatchException, IOException
{
long timeout = DatabaseDescriptor.getRpcTimeout() - (System.currentTimeMillis() - startTime);
@@ -85,21 +124,42 @@ public class ReadCallback<T> implements
public void response(Message message)
{
resolver.preprocess(message);
+ assert resolver.getMessageCount() <= endpoints.size();
if (resolver.getMessageCount() < blockfor)
return;
if (resolver.isDataPresent())
+ {
condition.signal();
+ maybeResolveForRepair();
+ }
}
public void response(ReadResponse result)
{
- ((ReadResponseResolver) resolver).injectPreProcessed(result);
+ ((RowDigestResolver) resolver).injectPreProcessed(result);
+ assert resolver.getMessageCount() <= endpoints.size();
if (resolver.getMessageCount() < blockfor)
return;
if (resolver.isDataPresent())
+ {
condition.signal();
+ maybeResolveForRepair();
+ }
}
-
+
+ /**
+ * Check digests in the background on the Repair stage if we've received replies
+ * too all the requests we sent.
+ */
+ protected void maybeResolveForRepair()
+ {
+ if (blockfor < endpoints.size() && resolver.getMessageCount() == endpoints.size())
+ {
+ assert resolver.isDataPresent();
+ StageManager.getStage(Stage.READ_REPAIR).execute(new AsyncRepairRunner());
+ }
+ }
+
public int determineBlockFor(ConsistencyLevel consistencyLevel, String table)
{
switch (consistencyLevel)
@@ -116,9 +176,37 @@ public class ReadCallback<T> implements
}
}
- public void assureSufficientLiveNodes(Collection<InetAddress> endpoints) throws UnavailableException
+ public void assureSufficientLiveNodes() throws UnavailableException
{
if (endpoints.size() < blockfor)
throw new UnavailableException();
}
+
+ public boolean isLatencyForSnitch()
+ {
+ return true;
+ }
+
+ private class AsyncRepairRunner extends WrappedRunnable
+ {
+ protected void runMayThrow() throws IOException
+ {
+ try
+ {
+ resolver.resolve();
+ }
+ catch (DigestMismatchException e)
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("Digest mismatch:", e);
+
+ ReadCommand readCommand = (ReadCommand) command;
+ final RowRepairResolver repairResolver = new RowRepairResolver(readCommand.table, readCommand.key);
+ IAsyncCallback repairHandler = new AsyncRepairCallback(repairResolver, endpoints.size());
+
+ for (InetAddress endpoint : endpoints)
+ MessagingService.instance().sendRR(readCommand, endpoint, repairHandler);
+ }
+ }
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/RepairCallback.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/RepairCallback.java?rev=1071380&r1=1071379&r2=1071380&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/RepairCallback.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/RepairCallback.java Wed Feb 16 20:12:37 2011
@@ -39,6 +39,14 @@ public class RepairCallback<T> implement
private final SimpleCondition condition = new SimpleCondition();
private final long startTime;
+ /**
+ * The main difference between this and ReadCallback is, ReadCallback has a ConsistencyLevel
+ * it needs to achieve. Repair on the other hand is happy to repair whoever replies within the timeout.
+ *
+ * (The other main difference of course is, this is only created once we know we have a digest
+ * mismatch, and we're going to do full-data reads from everyone -- that is, this is the final
+ * stage in the read process.)
+ */
public RepairCallback(IResponseResolver<T> resolver, List<InetAddress> endpoints)
{
this.resolver = resolver;
@@ -46,10 +54,6 @@ public class RepairCallback<T> implement
this.startTime = System.currentTimeMillis();
}
- /**
- * The main difference between this and ReadCallback is, ReadCallback has a ConsistencyLevel
- * it needs to achieve. Repair on the other hand is happy to repair whoever replies within the timeout.
- */
public T get() throws TimeoutException, DigestMismatchException, IOException
{
long timeout = DatabaseDescriptor.getRpcTimeout() - (System.currentTimeMillis() - startTime);
@@ -71,4 +75,9 @@ public class RepairCallback<T> implement
if (resolver.getMessageCount() == endpoints.size())
condition.signal();
}
+
+ public boolean isLatencyForSnitch()
+ {
+ return true;
+ }
}
Added: cassandra/trunk/src/java/org/apache/cassandra/service/RowDigestResolver.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/RowDigestResolver.java?rev=1071380&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/RowDigestResolver.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/RowDigestResolver.java Wed Feb 16 20:12:37 2011
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ReadResponse;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.net.Message;
+
+public class RowDigestResolver extends AbstractRowResolver
+{
+ public RowDigestResolver(String table, ByteBuffer key)
+ {
+ super(key, table);
+ }
+
+ public Row getData() throws IOException
+ {
+ for (Map.Entry<Message, ReadResponse> entry : replies.entrySet())
+ {
+ ReadResponse result = entry.getValue();
+ if (!result.isDigestQuery())
+ return result.row();
+ }
+
+ throw new AssertionError("getData should not be invoked when no data is present");
+ }
+
+ /*
+ * This method handles two different scenarios:
+ *
+ * 1a)we're handling the initial read, of data from the closest replica + digests
+ * from the rest. In this case we check the digests against each other,
+ * throw an exception if there is a mismatch, otherwise return the data row.
+ *
+ * 1b)we're checking additional digests that arrived after the minimum to handle
+ * the requested ConsistencyLevel, i.e. asynchronouse read repair check
+ */
+ public Row resolve() throws DigestMismatchException, IOException
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("resolving " + replies.size() + " responses");
+
+ long startTime = System.currentTimeMillis();
+ ColumnFamily data = null;
+
+ // case 1: validate digests against each other; throw immediately on mismatch.
+ // also, collects data results into versions/endpoints lists.
+ //
+ // results are cleared as we process them, to avoid unnecessary duplication of work
+ // when resolve() is called a second time for read repair on responses that were not
+ // necessary to satisfy ConsistencyLevel.
+ ByteBuffer digest = null;
+ for (Map.Entry<Message, ReadResponse> entry : replies.entrySet())
+ {
+ ReadResponse response = entry.getValue();
+ if (response.isDigestQuery())
+ {
+ if (digest == null)
+ {
+ digest = response.digest();
+ }
+ else
+ {
+ ByteBuffer digest2 = response.digest();
+ if (!digest.equals(digest2))
+ throw new DigestMismatchException(key, digest, digest2);
+ }
+ }
+ else
+ {
+ data = response.row().cf;
+ }
+ }
+
+ // If there was a digest query compare it with all the data digests
+ // If there is a mismatch then throw an exception so that read repair can happen.
+ //
+ // It's important to note that we do not compare the digests of multiple data responses --
+ // if we are in that situation we know there was a previous mismatch and now we're doing a repair,
+ // so our job is now case 2: figure out what the most recent version is and update everyone to that version.
+ if (digest != null)
+ {
+ ByteBuffer digest2 = ColumnFamily.digest(data);
+ if (!digest.equals(digest2))
+ throw new DigestMismatchException(key, digest, digest2);
+ if (logger.isDebugEnabled())
+ logger.debug("digests verified");
+ }
+
+ if (logger.isDebugEnabled())
+ logger.debug("resolve: " + (System.currentTimeMillis() - startTime) + " ms.");
+ return new Row(key, data);
+ }
+
+ public boolean isDataPresent()
+ {
+ for (ReadResponse result : replies.values())
+ {
+ if (!result.isDigestQuery())
+ return true;
+ }
+ return false;
+ }
+}
Added: cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java?rev=1071380&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java Wed Feb 16 20:12:37 2011
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.io.IOError;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+
+public class RowRepairResolver extends AbstractRowResolver
+{
+ public RowRepairResolver(String table, ByteBuffer key)
+ {
+ super(key, table);
+ }
+
+ /*
+ * This method handles the following scenario:
+ *
+ * there was a mismatch on the initial read (1a or 1b), so we redid the digest requests
+ * as full data reads. In this case we need to compute the most recent version
+ * of each column, and send diffs to out-of-date replicas.
+ */
+ public Row resolve() throws DigestMismatchException, IOException
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("resolving " + replies.size() + " responses");
+
+ long startTime = System.currentTimeMillis();
+ List<ColumnFamily> versions = new ArrayList<ColumnFamily>();
+ List<InetAddress> endpoints = new ArrayList<InetAddress>();
+
+ // case 1: validate digests against each other; throw immediately on mismatch.
+ // also, collects data results into versions/endpoints lists.
+ //
+ // results are cleared as we process them, to avoid unnecessary duplication of work
+ // when resolve() is called a second time for read repair on responses that were not
+ // necessary to satisfy ConsistencyLevel.
+ for (Map.Entry<Message, ReadResponse> entry : replies.entrySet())
+ {
+ Message message = entry.getKey();
+ ReadResponse response = entry.getValue();
+ assert !response.isDigestQuery();
+ versions.add(response.row().cf);
+ endpoints.add(message.getFrom());
+ }
+
+ ColumnFamily resolved;
+ if (versions.size() > 1)
+ {
+ resolved = resolveSuperset(versions);
+ if (logger.isDebugEnabled())
+ logger.debug("versions merged");
+ maybeScheduleRepairs(resolved, table, key, versions, endpoints);
+ }
+ else
+ {
+ resolved = versions.get(0);
+ }
+
+ if (logger.isDebugEnabled())
+ logger.debug("resolve: " + (System.currentTimeMillis() - startTime) + " ms.");
+ return new Row(key, resolved);
+ }
+
+ /**
+ * For each row version, compare with resolved (the superset of all row versions);
+ * if it is missing anything, send a mutation to the endpoint it come from.
+ */
+ public static void maybeScheduleRepairs(ColumnFamily resolved, String table, DecoratedKey key, List<ColumnFamily> versions, List<InetAddress> endpoints)
+ {
+ for (int i = 0; i < versions.size(); i++)
+ {
+ ColumnFamily diffCf = ColumnFamily.diff(versions.get(i), resolved);
+ if (diffCf == null) // no repair needs to happen
+ continue;
+
+ // create and send the row mutation message based on the diff
+ RowMutation rowMutation = new RowMutation(table, key.key);
+ rowMutation.add(diffCf);
+ Message repairMessage;
+ try
+ {
+ repairMessage = rowMutation.getMessage(Gossiper.instance.getVersion(endpoints.get(i)));
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ MessagingService.instance().sendOneWay(repairMessage, endpoints.get(i));
+ }
+ }
+
+ static ColumnFamily resolveSuperset(List<ColumnFamily> versions)
+ {
+ assert versions.size() > 0;
+
+ ColumnFamily resolved = null;
+ for (ColumnFamily cf : versions)
+ {
+ if (cf != null)
+ {
+ resolved = cf.cloneMe();
+ break;
+ }
+ }
+ if (resolved == null)
+ return null;
+
+ for (ColumnFamily cf : versions)
+ resolved.resolve(cf);
+
+ return resolved;
+ }
+
+ public Row getData() throws IOException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public boolean isDataPresent()
+ {
+ throw new UnsupportedOperationException();
+ }
+}
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1071380&r1=1071379&r2=1071380&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Wed Feb 16 20:12:37 2011
@@ -39,7 +39,6 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.QueryFilter;
@@ -66,17 +65,6 @@ public class StorageProxy implements Sto
{
private static final Logger logger = LoggerFactory.getLogger(StorageProxy.class);
- private static ScheduledExecutorService repairExecutor = new ScheduledThreadPoolExecutor(1); // TODO JMX-enable this
-
- private static final ThreadLocal<Random> random = new ThreadLocal<Random>()
- {
- @Override
- protected Random initialValue()
- {
- return new Random();
- }
- };
-
// mbean stuff
private static final LatencyTracker readStats = new LatencyTracker();
private static final LatencyTracker rangeStats = new LatencyTracker();
@@ -91,7 +79,10 @@ public class StorageProxy implements Sto
private static final WritePerformer standardWritePerformer;
private static final WritePerformer counterWritePerformer;
+ public static final StorageProxy instance = new StorageProxy();
+
private StorageProxy() {}
+
static
{
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
@@ -208,7 +199,7 @@ public class StorageProxy implements Sto
{
// Multimap that holds onto all the messages and addresses meant for a specific datacenter
Map<String, Multimap<Message, InetAddress>> dcMessages = new HashMap<String, Multimap<Message, InetAddress>>(hintedEndpoints.size());
- MessageProducer prod = new CachingMessageProducer(rm);
+ MessageProducer producer = new CachingMessageProducer(rm);
for (Map.Entry<InetAddress, Collection<InetAddress>> entry : hintedEndpoints.asMap().entrySet())
{
@@ -238,7 +229,7 @@ public class StorageProxy implements Sto
dcMessages.put(dc, messages);
}
- messages.put(prod.getMessage(Gossiper.instance.getVersion(destination)), destination);
+ messages.put(producer.getMessage(Gossiper.instance.getVersion(destination)), destination);
}
}
else
@@ -506,109 +497,97 @@ public class StorageProxy implements Sto
private static List<Row> fetchRows(List<ReadCommand> commands, ConsistencyLevel consistency_level) throws IOException, UnavailableException, TimeoutException
{
List<ReadCallback<Row>> readCallbacks = new ArrayList<ReadCallback<Row>>();
- List<List<InetAddress>> commandEndpoints = new ArrayList<List<InetAddress>>();
List<Row> rows = new ArrayList<Row>();
- Set<ReadCommand> repairs = new HashSet<ReadCommand>();
// send out read requests
for (ReadCommand command: commands)
{
assert !command.isDigestQuery();
+ logger.debug("Command/ConsistencyLevel is {}/{}", command, consistency_level);
List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(command.table, command.key);
DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getLocalAddress(), endpoints);
- ReadResponseResolver resolver = new ReadResponseResolver(command.table, command.key);
- ReadCallback<Row> handler = getReadCallback(resolver, command.table, consistency_level);
- handler.assureSufficientLiveNodes(endpoints);
-
- // if we're not going to read repair, cut the endpoints list down to the ones required to satisfy ConsistencyLevel
- if (randomlyReadRepair(command))
- {
- if (endpoints.size() > handler.blockfor)
- repairs.add(command);
- }
- else
- {
- endpoints = endpoints.subList(0, handler.blockfor);
- }
+ RowDigestResolver resolver = new RowDigestResolver(command.table, command.key);
+ ReadCallback<Row> handler = getReadCallback(resolver, command, consistency_level, endpoints);
+ handler.assureSufficientLiveNodes();
+ assert !handler.endpoints.isEmpty();
// The data-request message is sent to dataPoint, the node that will actually get
// the data for us. The other replicas are only sent a digest query.
ReadCommand digestCommand = null;
- if (endpoints.size() > 1)
+ if (handler.endpoints.size() > 1)
{
digestCommand = command.copy();
digestCommand.setDigestQuery(true);
}
- InetAddress dataPoint = endpoints.get(0);
+ InetAddress dataPoint = handler.endpoints.get(0);
if (dataPoint.equals(FBUtilities.getLocalAddress()))
{
if (logger.isDebugEnabled())
- logger.debug("reading data for " + command + " locally");
+ logger.debug("reading data locally");
StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(command, handler));
}
else
{
- Message message = command.getMessage(Gossiper.instance.getVersion(dataPoint));
if (logger.isDebugEnabled())
- logger.debug("reading data for " + command + " from " + dataPoint);
- MessagingService.instance().sendRR(message, dataPoint, handler);
+ logger.debug("reading data from " + dataPoint);
+ MessagingService.instance().sendRR(command, dataPoint, handler);
}
// We lazy-construct the digest Message object since it may not be necessary if we
// are doing a local digest read, or no digest reads at all.
- MessageProducer prod = new CachingMessageProducer(digestCommand);
- for (InetAddress digestPoint : endpoints.subList(1, endpoints.size()))
+ MessageProducer producer = new CachingMessageProducer(digestCommand);
+ for (InetAddress digestPoint : handler.endpoints.subList(1, handler.endpoints.size()))
{
if (digestPoint.equals(FBUtilities.getLocalAddress()))
{
if (logger.isDebugEnabled())
- logger.debug("reading digest for " + command + " locally");
+ logger.debug("reading digest locally");
StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(digestCommand, handler));
}
else
{
if (logger.isDebugEnabled())
- logger.debug("reading digest for " + command + " from " + digestPoint);
- MessagingService.instance().sendRR(prod, digestPoint, handler);
+ logger.debug("reading digest for from " + digestPoint);
+ MessagingService.instance().sendRR(producer, digestPoint, handler);
}
}
readCallbacks.add(handler);
- commandEndpoints.add(endpoints);
}
// read results and make a second pass for any digest mismatches
List<RepairCallback<Row>> repairResponseHandlers = null;
for (int i = 0; i < commands.size(); i++)
{
- ReadCallback<Row> readCallback = readCallbacks.get(i);
+ ReadCallback<Row> handler = readCallbacks.get(i);
Row row;
ReadCommand command = commands.get(i);
- List<InetAddress> endpoints = commandEndpoints.get(i);
try
{
long startTime2 = System.currentTimeMillis();
- row = readCallback.get(); // CL.ONE is special cased here to ignore digests even if some have arrived
+ row = handler.get(); // CL.ONE is special cased here to ignore digests even if some have arrived
if (row != null)
rows.add(row);
if (logger.isDebugEnabled())
logger.debug("Read: " + (System.currentTimeMillis() - startTime2) + " ms.");
-
- if (repairs.contains(command))
- repairExecutor.schedule(new RepairRunner(readCallback.resolver, command, endpoints), DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
}
catch (DigestMismatchException ex)
{
if (logger.isDebugEnabled())
logger.debug("Digest mismatch:", ex);
- RepairCallback<Row> handler = repair(command, endpoints);
+
+ RowRepairResolver resolver = new RowRepairResolver(command.table, command.key);
+ RepairCallback<Row> repairHandler = new RepairCallback<Row>(resolver, handler.endpoints);
+ for (InetAddress endpoint : handler.endpoints)
+ MessagingService.instance().sendRR(command, endpoint, repairHandler);
+
if (repairResponseHandlers == null)
repairResponseHandlers = new ArrayList<RepairCallback<Row>>();
- repairResponseHandlers.add(handler);
+ repairResponseHandlers.add(repairHandler);
}
}
@@ -657,24 +636,13 @@ public class StorageProxy implements Sto
}
}
- static <T> ReadCallback<T> getReadCallback(IResponseResolver<T> resolver, String table, ConsistencyLevel consistencyLevel)
+ static <T> ReadCallback<T> getReadCallback(IResponseResolver<T> resolver, IReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddress> endpoints)
{
if (consistencyLevel.equals(ConsistencyLevel.LOCAL_QUORUM) || consistencyLevel.equals(ConsistencyLevel.EACH_QUORUM))
{
- return new DatacenterReadCallback(resolver, consistencyLevel, table);
+ return new DatacenterReadCallback(resolver, consistencyLevel, command, endpoints);
}
- return new ReadCallback(resolver, consistencyLevel, table);
- }
-
- private static RepairCallback<Row> repair(ReadCommand command, List<InetAddress> endpoints)
- throws IOException
- {
- ReadResponseResolver resolver = new ReadResponseResolver(command.table, command.key);
- RepairCallback<Row> handler = new RepairCallback<Row>(resolver, endpoints);
- MessageProducer prod = new CachingMessageProducer(command);
- for (InetAddress endpoint : endpoints)
- MessagingService.instance().sendRR(prod, endpoint, handler);
- return handler;
+ return new ReadCallback(resolver, consistencyLevel, command, endpoints);
}
/*
@@ -725,16 +693,14 @@ public class StorageProxy implements Sto
// collect replies and resolve according to consistency level
RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(command.keyspace, liveEndpoints);
- ReadCallback<List<Row>> handler = getReadCallback(resolver, command.keyspace, consistency_level);
- MessageProducer prod = new CachingMessageProducer(c2);
- // TODO bail early if live endpoints can't satisfy requested consistency level
+ ReadCallback<List<Row>> handler = getReadCallback(resolver, command, consistency_level, liveEndpoints);
+ handler.assureSufficientLiveNodes();
for (InetAddress endpoint : liveEndpoints)
{
- MessagingService.instance().sendRR(prod, endpoint, handler);
+ MessagingService.instance().sendRR(c2, endpoint, handler);
if (logger.isDebugEnabled())
logger.debug("reading " + c2 + " from " + endpoint);
}
- // TODO read repair on remaining replicas?
// if we're done, great, otherwise, move to the next range
try
@@ -787,6 +753,11 @@ public class StorageProxy implements Sto
versions.put(message.getFrom(), theirVersion);
latch.countDown();
}
+
+ public boolean isLatencyForSnitch()
+ {
+ return false;
+ }
};
// an empty message acts as a request to the SchemaCheckVerbHandler.
for (InetAddress endpoint : liveHosts)
@@ -881,12 +852,6 @@ public class StorageProxy implements Sto
return ranges;
}
- private static boolean randomlyReadRepair(ReadCommand command)
- {
- CFMetaData cfmd = DatabaseDescriptor.getTableMetaData(command.table).get(command.getColumnFamilyName());
- return cfmd.getReadRepairChance() > random.get().nextDouble();
- }
-
public long getReadOperations()
{
return readStats.getOpCount();
@@ -987,7 +952,7 @@ public class StorageProxy implements Sto
return counterWriteStats.getRecentLatencyHistogramMicros();
}
- public static List<Row> scan(String keyspace, String column_family, IndexClause index_clause, SlicePredicate column_predicate, ConsistencyLevel consistency_level)
+ public static List<Row> scan(final String keyspace, String column_family, IndexClause index_clause, SlicePredicate column_predicate, ConsistencyLevel consistency_level)
throws IOException, TimeoutException, UnavailableException
{
IPartitioner p = StorageService.getPartitioner();
@@ -1005,17 +970,21 @@ public class StorageProxy implements Sto
// collect replies and resolve according to consistency level
RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(keyspace, liveEndpoints);
- ReadCallback<List<Row>> handler = getReadCallback(resolver, keyspace, consistency_level);
-
- // bail early if live endpoints can't satisfy requested consistency level
- if(handler.blockfor > liveEndpoints.size())
- throw new UnavailableException();
+ IReadCommand iCommand = new IReadCommand()
+ {
+ public String getKeyspace()
+ {
+ return keyspace;
+ }
+ };
+ ReadCallback<List<Row>> handler = getReadCallback(resolver, iCommand, consistency_level, liveEndpoints);
+ handler.assureSufficientLiveNodes();
IndexScanCommand command = new IndexScanCommand(keyspace, column_family, index_clause, column_predicate, range);
- MessageProducer prod = new CachingMessageProducer(command);
+ MessageProducer producer = new CachingMessageProducer(command);
for (InetAddress endpoint : liveEndpoints)
{
- MessagingService.instance().sendRR(prod, endpoint, handler);
+ MessagingService.instance().sendRR(producer, endpoint, handler);
if (logger.isDebugEnabled())
logger.debug("reading " + command + " from " + endpoint);
}
@@ -1100,11 +1069,9 @@ public class StorageProxy implements Sto
// Send out the truncate calls and track the responses with the callbacks.
logger.debug("Starting to send truncate messages to hosts {}", allEndpoints);
final Truncation truncation = new Truncation(keyspace, cfname);
- MessageProducer prod = new CachingMessageProducer(truncation);
+ MessageProducer producer = new CachingMessageProducer(truncation);
for (InetAddress endpoint : allEndpoints)
- {
- MessagingService.instance().sendRR(prod, endpoint, responseHandler);
- }
+ MessagingService.instance().sendRR(producer, endpoint, responseHandler);
// Wait for all
logger.debug("Sent all truncate messages, now waiting for {} responses", blockFor);
@@ -1121,42 +1088,6 @@ public class StorageProxy implements Sto
return !Gossiper.instance.getUnreachableMembers().isEmpty();
}
- private static class RepairRunner extends WrappedRunnable
- {
- private final IResponseResolver<Row> resolver;
- private final ReadCommand command;
- private final List<InetAddress> endpoints;
-
- public RepairRunner(IResponseResolver<Row> resolver, ReadCommand command, List<InetAddress> endpoints)
- {
- this.resolver = resolver;
- this.command = command;
- this.endpoints = endpoints;
- }
-
- protected void runMayThrow() throws IOException
- {
- try
- {
- resolver.resolve();
- }
- catch (DigestMismatchException e)
- {
- if (logger.isDebugEnabled())
- logger.debug("Digest mismatch:", e);
- final RepairCallback<Row> callback = repair(command, endpoints);
- Runnable runnable = new WrappedRunnable()
- {
- public void runMayThrow() throws DigestMismatchException, IOException, TimeoutException
- {
- callback.get();
- }
- };
- repairExecutor.schedule(runnable, DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
- }
- }
- }
-
private interface WritePerformer
{
public void apply(IMutation mutation, Multimap<InetAddress, InetAddress> hintedEndpoints, IWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) throws IOException;
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/TruncateResponseHandler.java?rev=1071380&r1=1071379&r2=1071380&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/TruncateResponseHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/TruncateResponseHandler.java Wed Feb 16 20:12:37 2011
@@ -73,4 +73,9 @@ public class TruncateResponseHandler imp
if (responses.get() >= responseCount)
condition.signal();
}
+
+ public boolean isLatencyForSnitch()
+ {
+ return false;
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java?rev=1071380&r1=1071379&r2=1071380&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java Wed Feb 16 20:12:37 2011
@@ -121,4 +121,9 @@ public class WriteResponseHandler extend
throw new UnavailableException();
}
}
+
+ public boolean isLatencyForSnitch()
+ {
+ return false;
+ }
}
Modified: cassandra/trunk/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java?rev=1071380&r1=1071379&r2=1071380&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java Wed Feb 16 20:12:37 2011
@@ -71,7 +71,7 @@ public class ConsistencyLevelTest extend
AbstractReplicationStrategy strategy;
- for (String table : DatabaseDescriptor.getNonSystemTables())
+ for (final String table : DatabaseDescriptor.getNonSystemTables())
{
strategy = getStrategy(table, tmd);
StorageService.calculatePendingRanges(strategy, table);
@@ -96,7 +96,15 @@ public class ConsistencyLevelTest extend
IWriteResponseHandler writeHandler = strategy.getWriteResponseHandler(hosts, hintedNodes, c);
- ReadCallback<Row> readHandler = StorageProxy.getReadCallback(new ReadResponseResolver(table, ByteBufferUtil.bytes("foo")), table, c);
+ IReadCommand command = new IReadCommand()
+ {
+ public String getKeyspace()
+ {
+ return table;
+ }
+ };
+ RowRepairResolver resolver = new RowRepairResolver(table, ByteBufferUtil.bytes("foo"));
+ ReadCallback<Row> readHandler = StorageProxy.getReadCallback(resolver, command, c, new ArrayList<InetAddress>(hintedNodes.keySet()));
boolean isWriteUnavailable = false;
boolean isReadUnavailable = false;
@@ -111,7 +119,7 @@ public class ConsistencyLevelTest extend
try
{
- readHandler.assureSufficientLiveNodes(hintedNodes.asMap().keySet());
+ readHandler.assureSufficientLiveNodes();
}
catch (UnavailableException e)
{
Added: cassandra/trunk/test/unit/org/apache/cassandra/service/RowResolverTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/RowResolverTest.java?rev=1071380&view=auto
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/service/RowResolverTest.java (added)
+++ cassandra/trunk/test/unit/org/apache/cassandra/service/RowResolverTest.java Wed Feb 16 20:12:37 2011
@@ -0,0 +1,96 @@
+package org.apache.cassandra.service;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.util.Arrays;
+
+import org.apache.cassandra.SchemaLoader;
+import org.junit.Test;
+
+import org.apache.cassandra.db.ColumnFamily;
+
+import static org.apache.cassandra.db.TableTest.assertColumns;
+import static org.apache.cassandra.Util.column;
+import static junit.framework.Assert.assertNull;
+
+public class RowResolverTest extends SchemaLoader
+{
+ @Test
+ public void testResolveSupersetNewer()
+ {
+ ColumnFamily cf1 = ColumnFamily.create("Keyspace1", "Standard1");
+ cf1.addColumn(column("c1", "v1", 0));
+
+ ColumnFamily cf2 = ColumnFamily.create("Keyspace1", "Standard1");
+ cf2.addColumn(column("c1", "v2", 1));
+
+ ColumnFamily resolved = RowRepairResolver.resolveSuperset(Arrays.asList(cf1, cf2));
+ assertColumns(resolved, "c1");
+ assertColumns(ColumnFamily.diff(cf1, resolved), "c1");
+ assertNull(ColumnFamily.diff(cf2, resolved));
+ }
+
+ @Test
+ public void testResolveSupersetDisjoint()
+ {
+ ColumnFamily cf1 = ColumnFamily.create("Keyspace1", "Standard1");
+ cf1.addColumn(column("c1", "v1", 0));
+
+ ColumnFamily cf2 = ColumnFamily.create("Keyspace1", "Standard1");
+ cf2.addColumn(column("c2", "v2", 1));
+
+ ColumnFamily resolved = RowRepairResolver.resolveSuperset(Arrays.asList(cf1, cf2));
+ assertColumns(resolved, "c1", "c2");
+ assertColumns(ColumnFamily.diff(cf1, resolved), "c2");
+ assertColumns(ColumnFamily.diff(cf2, resolved), "c1");
+ }
+
+ @Test
+ public void testResolveSupersetNullOne()
+ {
+ ColumnFamily cf2 = ColumnFamily.create("Keyspace1", "Standard1");
+ cf2.addColumn(column("c2", "v2", 1));
+
+ ColumnFamily resolved = RowRepairResolver.resolveSuperset(Arrays.asList(null, cf2));
+ assertColumns(resolved, "c2");
+ assertColumns(ColumnFamily.diff(null, resolved), "c2");
+ assertNull(ColumnFamily.diff(cf2, resolved));
+ }
+
+ @Test
+ public void testResolveSupersetNullTwo()
+ {
+ ColumnFamily cf1 = ColumnFamily.create("Keyspace1", "Standard1");
+ cf1.addColumn(column("c1", "v1", 0));
+
+ ColumnFamily resolved = RowRepairResolver.resolveSuperset(Arrays.asList(cf1, null));
+ assertColumns(resolved, "c1");
+ assertNull(ColumnFamily.diff(cf1, resolved));
+ assertColumns(ColumnFamily.diff(null, resolved), "c1");
+ }
+
+ @Test
+ public void testResolveSupersetNullBoth()
+ {
+ assertNull(RowRepairResolver.resolveSuperset(Arrays.<ColumnFamily>asList(null, null)));
+ }
+}