You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/02/15 18:16:53 UTC
svn commit: r1070977 - in /cassandra/branches/cassandra-0.7: ./
src/java/org/apache/cassandra/concurrent/ src/java/org/apache/cassandra/db/
src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/net/
src/java/org/apache/cassandra/service/ tes...
Author: jbellis
Date: Tue Feb 15 17:16:52 2011
New Revision: 1070977
URL: http://svn.apache.org/viewvc?rev=1070977&view=rev
Log:
no more waiting for RPC_TIMEOUT before finishing RR
patch by jbellis; reviewed by brandonwilliams for CASSANDRA-2069
Added:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AbstractRowResolver.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IReadCommand.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RowDigestResolver.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RowRepairResolver.java
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/RowResolverTest.java
Modified:
cassandra/branches/cassandra-0.7/CHANGES.txt
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/Stage.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/StageManager.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RangeSliceCommand.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ReadCommand.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/dht/BootStrapper.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/AsyncResult.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IMessageCallback.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RepairCallback.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/WriteResponseHandler.java
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ReadResponseResolverTest.java
Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1070977&r1=1070976&r2=1070977&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Tue Feb 15 17:16:52 2011
@@ -2,6 +2,7 @@
* copy DecoratedKey.key when inserting into caches to avoid retaining
a reference to the underlying buffer (CASSANDRA-2102)
* format subcolumn names with subcomparator (CASSANDRA-2136)
+ * lower-latency read repair (CASSANDRA-2069)
0.7.1
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/Stage.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/Stage.java?rev=1070977&r1=1070976&r2=1070977&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/Stage.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/Stage.java Tue Feb 15 17:16:52 2011
@@ -31,7 +31,8 @@ public enum Stage
ANTI_ENTROPY,
MIGRATION,
MISC,
- INTERNAL_RESPONSE;
+ INTERNAL_RESPONSE,
+ READ_REPAIR;
public String getJmxType()
{
@@ -47,6 +48,7 @@ public enum Stage
case MUTATION:
case READ:
case REQUEST_RESPONSE:
+ case READ_REPAIR:
return "request";
default:
throw new AssertionError("Unknown stage " + this);
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/StageManager.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/StageManager.java?rev=1070977&r1=1070976&r2=1070977&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/StageManager.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/StageManager.java Tue Feb 15 17:16:52 2011
@@ -50,6 +50,7 @@ public class StageManager
stages.put(Stage.ANTI_ENTROPY, new JMXEnabledThreadPoolExecutor(Stage.ANTI_ENTROPY));
stages.put(Stage.MIGRATION, new JMXEnabledThreadPoolExecutor(Stage.MIGRATION));
stages.put(Stage.MISC, new JMXEnabledThreadPoolExecutor(Stage.MISC));
+ stages.put(Stage.READ_REPAIR, multiThreadedStage(Stage.READ_REPAIR, Runtime.getRuntime().availableProcessors()));
}
private static ThreadPoolExecutor multiThreadedStage(Stage stage, int numThreads)
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RangeSliceCommand.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RangeSliceCommand.java?rev=1070977&r1=1070976&r2=1070977&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RangeSliceCommand.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RangeSliceCommand.java Tue Feb 15 17:16:52 2011
@@ -47,6 +47,7 @@ import org.apache.cassandra.dht.Abstract
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.net.Message;
+import org.apache.cassandra.service.IReadCommand;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.thrift.SlicePredicate;
@@ -56,7 +57,7 @@ import org.apache.thrift.TDeserializer;
import org.apache.thrift.TSerializer;
import org.apache.cassandra.thrift.TBinaryProtocol;
-public class RangeSliceCommand
+public class RangeSliceCommand implements IReadCommand
{
private static final RangeSliceCommandSerializer serializer = new RangeSliceCommandSerializer();
@@ -113,6 +114,11 @@ public class RangeSliceCommand
ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
return serializer.deserialize(new DataInputStream(bis));
}
+
+ public String getKeyspace()
+ {
+ return keyspace;
+ }
}
class RangeSliceCommandSerializer implements ICompactSerializer<RangeSliceCommand>
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ReadCommand.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ReadCommand.java?rev=1070977&r1=1070976&r2=1070977&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ReadCommand.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ReadCommand.java Tue Feb 15 17:16:52 2011
@@ -30,11 +30,12 @@ import org.apache.cassandra.db.filter.Qu
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.net.Message;
+import org.apache.cassandra.service.IReadCommand;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
-public abstract class ReadCommand
+public abstract class ReadCommand implements IReadCommand
{
public static final byte CMD_TYPE_GET_SLICE_BY_NAMES = 1;
public static final byte CMD_TYPE_GET_SLICE = 2;
@@ -91,6 +92,11 @@ public abstract class ReadCommand
{
return ColumnFamily.getComparatorFor(table, getColumnFamilyName(), queryPath.superColumnName);
}
+
+ public String getKeyspace()
+ {
+ return table;
+ }
}
class ReadCommandSerializer implements ICompactSerializer<ReadCommand>
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=1070977&r1=1070976&r2=1070977&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/dht/BootStrapper.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/dht/BootStrapper.java Tue Feb 15 17:16:52 2011
@@ -282,5 +282,10 @@ public class BootStrapper
token = StorageService.getPartitioner().getTokenFactory().fromString(new String(msg.getMessageBody(), Charsets.UTF_8));
condition.signalAll();
}
+
+ public boolean isLatencyForSnitch()
+ {
+ return false;
+ }
}
}
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/AsyncResult.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/AsyncResult.java?rev=1070977&r1=1070976&r2=1070977&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/AsyncResult.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/AsyncResult.java Tue Feb 15 17:16:52 2011
@@ -96,6 +96,11 @@ class AsyncResult implements IAsyncResul
}
}
+ public boolean isLatencyForSnitch()
+ {
+ return false;
+ }
+
public InetAddress getFrom()
{
return from;
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IMessageCallback.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IMessageCallback.java?rev=1070977&r1=1070976&r2=1070977&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IMessageCallback.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IMessageCallback.java Tue Feb 15 17:16:52 2011
@@ -23,4 +23,9 @@ package org.apache.cassandra.net;
public interface IMessageCallback
{
+ /**
+ * @return true if this callback is on the read path and its latency should be
+ * given as input to the dynamic snitch.
+ */
+ public boolean isLatencyForSnitch();
}
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java?rev=1070977&r1=1070976&r2=1070977&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java Tue Feb 15 17:16:52 2011
@@ -138,7 +138,7 @@ public final class MessagingService impl
*/
public void maybeAddLatency(IMessageCallback cb, InetAddress address, double latency)
{
- if (cb instanceof ReadCallback || cb instanceof AsyncResult)
+ if (cb.isLatencyForSnitch())
addLatency(address, latency);
}
Added: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AbstractRowResolver.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AbstractRowResolver.java?rev=1070977&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AbstractRowResolver.java (added)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AbstractRowResolver.java Tue Feb 15 17:16:52 2011
@@ -0,0 +1,70 @@
+package org.apache.cassandra.service;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOError;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.ReadResponse;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.utils.FBUtilities;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
+
+public abstract class AbstractRowResolver implements IResponseResolver<Row>
+{
+ protected static Logger logger = LoggerFactory.getLogger(AbstractRowResolver.class);
+
+ private static final Message FAKE_MESSAGE = new Message(FBUtilities.getLocalAddress(), StorageService.Verb.INTERNAL_RESPONSE, ArrayUtils.EMPTY_BYTE_ARRAY);
+
+ protected final String table;
+ protected final ConcurrentMap<Message, ReadResponse> replies = new NonBlockingHashMap<Message, ReadResponse>();
+ protected final DecoratedKey key;
+
+ public AbstractRowResolver(ByteBuffer key, String table)
+ {
+ this.key = StorageService.getPartitioner().decorateKey(key);
+ this.table = table;
+ }
+
+ public void preprocess(Message message)
+ {
+ byte[] body = message.getMessageBody();
+ ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
+ try
+ {
+ ReadResponse result = ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
+ if (logger.isDebugEnabled())
+ logger.debug("Preprocessed {} response", result.isDigestQuery() ? "digest" : "data");
+ replies.put(message, result);
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ }
+
+ /** hack so local reads don't force de/serialization of an extra real Message */
+ public void injectPreProcessed(ReadResponse result)
+ {
+ assert replies.get(FAKE_MESSAGE) == null; // should only be one local reply
+ replies.put(FAKE_MESSAGE, result);
+ }
+
+ public Iterable<Message> getMessages()
+ {
+ return replies.keySet();
+ }
+
+ public int getMessageCount()
+ {
+ return replies.size();
+ }
+}
Added: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AsyncRepairCallback.java?rev=1070977&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AsyncRepairCallback.java (added)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AsyncRepairCallback.java Tue Feb 15 17:16:52 2011
@@ -0,0 +1,41 @@
+package org.apache.cassandra.service;
+
+import java.io.IOException;
+
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.utils.WrappedRunnable;
+
+public class AsyncRepairCallback implements IAsyncCallback
+{
+ private final RowRepairResolver repairResolver;
+ private final int count;
+
+ public AsyncRepairCallback(RowRepairResolver repairResolver, int count)
+ {
+ this.repairResolver = repairResolver;
+ this.count = count;
+ }
+
+ public void response(Message message)
+ {
+ repairResolver.preprocess(message);
+ if (repairResolver.getMessageCount() == count)
+ {
+ StageManager.getStage(Stage.READ_REPAIR).execute(new WrappedRunnable()
+ {
+ protected void runMayThrow() throws DigestMismatchException, IOException
+ {
+ repairResolver.resolve();
+ }
+ });
+ }
+ }
+
+ public boolean isLatencyForSnitch()
+ {
+ return true;
+ }
+}
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterReadCallback.java?rev=1070977&r1=1070976&r2=1070977&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterReadCallback.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterReadCallback.java Tue Feb 15 17:16:52 2011
@@ -22,12 +22,12 @@ package org.apache.cassandra.service;
import java.net.InetAddress;
-import java.util.Collection;
+import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.ReadResponse;
+import org.apache.cassandra.db.Table;
import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.locator.NetworkTopologyStrategy;
import org.apache.cassandra.net.Message;
@@ -44,12 +44,12 @@ public class DatacenterReadCallback<T> e
private static final String localdc = snitch.getDatacenter(FBUtilities.getLocalAddress());
private AtomicInteger localResponses;
- public DatacenterReadCallback(IResponseResolver<T> resolver, ConsistencyLevel consistencyLevel, String table)
+ public DatacenterReadCallback(IResponseResolver resolver, ConsistencyLevel consistencyLevel, IReadCommand command, List<InetAddress> endpoints)
{
- super(resolver, consistencyLevel, table);
+ super(resolver, consistencyLevel, command, endpoints);
localResponses = new AtomicInteger(blockfor);
}
-
+
@Override
public void response(Message message)
{
@@ -68,14 +68,15 @@ public class DatacenterReadCallback<T> e
@Override
public void response(ReadResponse result)
{
- ((ReadResponseResolver) resolver).injectPreProcessed(result);
+ ((RowDigestResolver) resolver).injectPreProcessed(result);
int n = localResponses.decrementAndGet();
-
if (n == 0 && resolver.isDataPresent())
{
condition.signal();
}
+
+ maybeResolveForRepair();
}
@Override
@@ -86,7 +87,7 @@ public class DatacenterReadCallback<T> e
}
@Override
- public void assureSufficientLiveNodes(Collection<InetAddress> endpoints) throws UnavailableException
+ public void assureSufficientLiveNodes() throws UnavailableException
{
int localEndpoints = 0;
for (InetAddress endpoint : endpoints)
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java?rev=1070977&r1=1070976&r2=1070977&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java Tue Feb 15 17:16:52 2011
@@ -115,4 +115,9 @@ public class DatacenterSyncWriteResponse
throw new UnavailableException();
}
}
+
+ public boolean isLatencyForSnitch()
+ {
+ return false;
+ }
}
Added: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IReadCommand.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IReadCommand.java?rev=1070977&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IReadCommand.java (added)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IReadCommand.java Tue Feb 15 17:16:52 2011
@@ -0,0 +1,6 @@
+package org.apache.cassandra.service;
+
+public interface IReadCommand
+{
+ public String getKeyspace();
+}
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java?rev=1070977&r1=1070976&r2=1070977&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java Tue Feb 15 17:16:52 2011
@@ -49,7 +49,6 @@ public class RangeSliceResponseResolver
public RangeSliceResponseResolver(String table, List<InetAddress> sources)
{
- assert sources.size() > 0;
this.sources = sources;
this.table = table;
}
@@ -103,8 +102,8 @@ public class RangeSliceResponseResolver
protected Row getReduced()
{
- ColumnFamily resolved = ReadResponseResolver.resolveSuperset(versions);
- ReadResponseResolver.maybeScheduleRepairs(resolved, table, key, versions, versionSources);
+ ColumnFamily resolved = RowRepairResolver.resolveSuperset(versions);
+ RowRepairResolver.maybeScheduleRepairs(resolved, table, key, versions, versionSources);
versions.clear();
versionSources.clear();
return new Row(key, resolved);
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java?rev=1070977&r1=1070976&r2=1070977&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java Tue Feb 15 17:16:52 2011
@@ -20,14 +20,20 @@ package org.apache.cassandra.service;
import java.io.IOException;
import java.net.InetAddress;
-import java.util.Collection;
+import java.util.List;
+import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.ReadResponse;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.net.IAsyncCallback;
@@ -36,28 +42,61 @@ import org.apache.cassandra.net.Messagin
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.UnavailableException;
import org.apache.cassandra.utils.SimpleCondition;
+import org.apache.cassandra.utils.WrappedRunnable;
public class ReadCallback<T> implements IAsyncCallback
{
protected static final Logger logger = LoggerFactory.getLogger( ReadCallback.class );
+ private static final ThreadLocal<Random> random = new ThreadLocal<Random>()
+ {
+ @Override
+ protected Random initialValue()
+ {
+ return new Random();
+ }
+ };
+
public final IResponseResolver<T> resolver;
protected final SimpleCondition condition = new SimpleCondition();
private final long startTime;
protected final int blockfor;
-
+ final List<InetAddress> endpoints;
+ private final IReadCommand command;
+
/**
* Constructor when response count has to be calculated and blocked for.
*/
- public ReadCallback(IResponseResolver<T> resolver, ConsistencyLevel consistencyLevel, String table)
+ public ReadCallback(IResponseResolver<T> resolver, ConsistencyLevel consistencyLevel, IReadCommand command, List<InetAddress> endpoints)
{
- this.blockfor = determineBlockFor(consistencyLevel, table);
+ this.command = command;
+ this.blockfor = determineBlockFor(consistencyLevel, command.getKeyspace());
this.resolver = resolver;
this.startTime = System.currentTimeMillis();
-
- logger.debug("ReadCallback blocking for {} responses", blockfor);
+ boolean repair = randomlyReadRepair();
+ this.endpoints = repair || resolver instanceof RowRepairResolver
+ ? endpoints
+ : endpoints.subList(0, Math.min(endpoints.size(), blockfor)); // min so as to not throw exception until assureSufficient is called
+
+ if (logger.isDebugEnabled())
+ logger.debug(String.format("Blockfor/repair is %s/%s; setting up requests to %s",
+ blockfor, repair, StringUtils.join(this.endpoints, ",")));
}
+ private boolean randomlyReadRepair()
+ {
+ if (resolver instanceof RowDigestResolver)
+ {
+ assert command instanceof ReadCommand : command;
+ String table = ((RowDigestResolver) resolver).table;
+ String columnFamily = ((ReadCommand) command).getColumnFamilyName();
+ CFMetaData cfmd = DatabaseDescriptor.getTableMetaData(table).get(columnFamily);
+ return cfmd.getReadRepairChance() > random.get().nextDouble();
+ }
+ // we don't read repair on range scans
+ return false;
+ }
+
public T get() throws TimeoutException, DigestMismatchException, IOException
{
long timeout = DatabaseDescriptor.getRpcTimeout() - (System.currentTimeMillis() - startTime);
@@ -85,21 +124,42 @@ public class ReadCallback<T> implements
public void response(Message message)
{
resolver.preprocess(message);
+ assert resolver.getMessageCount() <= endpoints.size();
if (resolver.getMessageCount() < blockfor)
return;
if (resolver.isDataPresent())
+ {
condition.signal();
+ maybeResolveForRepair();
+ }
}
public void response(ReadResponse result)
{
- ((ReadResponseResolver) resolver).injectPreProcessed(result);
+ ((RowDigestResolver) resolver).injectPreProcessed(result);
+ assert resolver.getMessageCount() <= endpoints.size();
if (resolver.getMessageCount() < blockfor)
return;
if (resolver.isDataPresent())
+ {
condition.signal();
+ maybeResolveForRepair();
+ }
}
-
+
+ /**
+ * Check digests in the background on the Repair stage if we've received replies
+ * too all the requests we sent.
+ */
+ protected void maybeResolveForRepair()
+ {
+ if (blockfor < endpoints.size() && resolver.getMessageCount() == endpoints.size())
+ {
+ assert resolver.isDataPresent();
+ StageManager.getStage(Stage.READ_REPAIR).execute(new AsyncRepairRunner());
+ }
+ }
+
public int determineBlockFor(ConsistencyLevel consistencyLevel, String table)
{
switch (consistencyLevel)
@@ -116,9 +176,38 @@ public class ReadCallback<T> implements
}
}
- public void assureSufficientLiveNodes(Collection<InetAddress> endpoints) throws UnavailableException
+ public void assureSufficientLiveNodes() throws UnavailableException
{
if (endpoints.size() < blockfor)
throw new UnavailableException();
}
+
+ public boolean isLatencyForSnitch()
+ {
+ return true;
+ }
+
+ private class AsyncRepairRunner extends WrappedRunnable
+ {
+ protected void runMayThrow() throws IOException
+ {
+ try
+ {
+ resolver.resolve();
+ }
+ catch (DigestMismatchException e)
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("Digest mismatch:", e);
+
+ ReadCommand readCommand = (ReadCommand) command;
+ final RowRepairResolver repairResolver = new RowRepairResolver(readCommand.table, readCommand.key);
+ IAsyncCallback repairHandler = new AsyncRepairCallback(repairResolver, endpoints.size());
+
+ Message messageRepair = readCommand.makeReadMessage();
+ for (InetAddress endpoint : endpoints)
+ MessagingService.instance().sendRR(messageRepair, endpoint, repairHandler);
+ }
+ }
+ }
}
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=1070977&r1=1070976&r2=1070977&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java Tue Feb 15 17:16:52 2011
@@ -1,257 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.service;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOError;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.commons.lang.ArrayUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.FBUtilities;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
-
-/**
- * Turns ReadResponse messages into Row objects, resolving to the most recent
- * version and setting up read repairs as necessary.
- */
-public class ReadResponseResolver implements IResponseResolver<Row>
-{
- private static Logger logger_ = LoggerFactory.getLogger(ReadResponseResolver.class);
- private final String table;
- private final ConcurrentMap<Message, ReadResponse> results = new NonBlockingHashMap<Message, ReadResponse>();
- private DecoratedKey key;
- private ByteBuffer digest;
- private static final Message FAKE_MESSAGE = new Message(FBUtilities.getLocalAddress(), StorageService.Verb.INTERNAL_RESPONSE, ArrayUtils.EMPTY_BYTE_ARRAY);;
-
- public ReadResponseResolver(String table, ByteBuffer key)
- {
- this.table = table;
- this.key = StorageService.getPartitioner().decorateKey(key);
- }
-
- public Row getData() throws IOException
- {
- for (Map.Entry<Message, ReadResponse> entry : results.entrySet())
- {
- ReadResponse result = entry.getValue();
- if (!result.isDigestQuery())
- return result.row();
- }
-
- throw new AssertionError("getData should not be invoked when no data is present");
- }
-
- /*
- * This method handles three different scenarios:
- *
- * 1a)we're handling the initial read, of data from the closest replica + digests
- * from the rest. In this case we check the digests against each other,
- * throw an exception if there is a mismatch, otherwise return the data row.
- *
- * 1b)we're checking additional digests that arrived after the minimum to handle
- * the requested ConsistencyLevel, i.e. asynchronouse read repair check
- *
- * 2) there was a mismatch on the initial read (1a or 1b), so we redid the digest requests
- * as full data reads. In this case we need to compute the most recent version
- * of each column, and send diffs to out-of-date replicas.
- */
- public Row resolve() throws DigestMismatchException, IOException
- {
- if (logger_.isDebugEnabled())
- logger_.debug("resolving " + results.size() + " responses");
-
- long startTime = System.currentTimeMillis();
- List<ColumnFamily> versions = new ArrayList<ColumnFamily>();
- List<InetAddress> endpoints = new ArrayList<InetAddress>();
-
- // case 1: validate digests against each other; throw immediately on mismatch.
- // also, collects data results into versions/endpoints lists.
- //
- // results are cleared as we process them, to avoid unnecessary duplication of work
- // when resolve() is called a second time for read repair on responses that were not
- // necessary to satisfy ConsistencyLevel.
- for (Map.Entry<Message, ReadResponse> entry : results.entrySet())
- {
- ReadResponse result = entry.getValue();
- Message message = entry.getKey();
- if (result.isDigestQuery())
- {
- if (digest == null)
- {
- digest = result.digest();
- }
- else
- {
- ByteBuffer digest2 = result.digest();
- if (!digest.equals(digest2))
- throw new DigestMismatchException(key, digest, digest2);
- }
- }
- else
- {
- versions.add(result.row().cf);
- endpoints.add(message.getFrom());
- }
-
- results.remove(message);
- }
-
- // If there was a digest query compare it with all the data digests
- // If there is a mismatch then throw an exception so that read repair can happen.
- //
- // It's important to note that we do not compare the digests of multiple data responses --
- // if we are in that situation we know there was a previous mismatch and now we're doing a repair,
- // so our job is now case 2: figure out what the most recent version is and update everyone to that version.
- if (digest != null)
- {
- for (ColumnFamily cf : versions)
- {
- ByteBuffer digest2 = ColumnFamily.digest(cf);
- if (!digest.equals(digest2))
- throw new DigestMismatchException(key, digest, digest2);
- }
- if (logger_.isDebugEnabled())
- logger_.debug("digests verified");
- }
-
- ColumnFamily resolved;
- if (versions.size() > 1)
- {
- resolved = resolveSuperset(versions);
- if (logger_.isDebugEnabled())
- logger_.debug("versions merged");
- maybeScheduleRepairs(resolved, table, key, versions, endpoints);
- }
- else
- {
- resolved = versions.get(0);
- }
-
- if (logger_.isDebugEnabled())
- logger_.debug("resolve: " + (System.currentTimeMillis() - startTime) + " ms.");
- return new Row(key, resolved);
- }
-
- /**
- * For each row version, compare with resolved (the superset of all row versions);
- * if it is missing anything, send a mutation to the endpoint it come from.
- */
- public static void maybeScheduleRepairs(ColumnFamily resolved, String table, DecoratedKey key, List<ColumnFamily> versions, List<InetAddress> endpoints)
- {
- for (int i = 0; i < versions.size(); i++)
- {
- ColumnFamily diffCf = ColumnFamily.diff(versions.get(i), resolved);
- if (diffCf == null) // no repair needs to happen
- continue;
-
- // create and send the row mutation message based on the diff
- RowMutation rowMutation = new RowMutation(table, key.key);
- rowMutation.add(diffCf);
- Message repairMessage;
- try
- {
- repairMessage = rowMutation.makeRowMutationMessage(StorageService.Verb.READ_REPAIR);
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
- MessagingService.instance().sendOneWay(repairMessage, endpoints.get(i));
- }
- }
-
- static ColumnFamily resolveSuperset(List<ColumnFamily> versions)
- {
- assert versions.size() > 0;
-
- ColumnFamily resolved = null;
- for (ColumnFamily cf : versions)
- {
- if (cf != null)
- {
- resolved = cf.cloneMe();
- break;
- }
- }
- if (resolved == null)
- return null;
-
- for (ColumnFamily cf : versions)
- resolved.resolve(cf);
-
- return resolved;
- }
-
- public void preprocess(Message message)
- {
- byte[] body = message.getMessageBody();
- ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
- try
- {
- ReadResponse result = ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
- if (logger_.isDebugEnabled())
- logger_.debug("Preprocessed {} response", result.isDigestQuery() ? "digest" : "data");
- results.put(message, result);
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
- }
-
- /** hack so local reads don't force de/serialization of an extra real Message */
- public void injectPreProcessed(ReadResponse result)
- {
- assert results.get(FAKE_MESSAGE) == null; // should only be one local reply
- results.put(FAKE_MESSAGE, result);
- }
-
- public boolean isDataPresent()
- {
- for (ReadResponse result : results.values())
- {
- if (!result.isDigestQuery())
- return true;
- }
- return false;
- }
-
- public Iterable<Message> getMessages()
- {
- return results.keySet();
- }
-
- public int getMessageCount()
- {
- return results.size();
- }
-}
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RepairCallback.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RepairCallback.java?rev=1070977&r1=1070976&r2=1070977&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RepairCallback.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RepairCallback.java Tue Feb 15 17:16:52 2011
@@ -39,6 +39,14 @@ public class RepairCallback<T> implement
private final SimpleCondition condition = new SimpleCondition();
private final long startTime;
+ /**
+ * The main difference between this and ReadCallback is, ReadCallback has a ConsistencyLevel
+ * it needs to achieve. Repair on the other hand is happy to repair whoever replies within the timeout.
+ *
+ * (The other main difference of course is, this is only created once we know we have a digest
+ * mismatch, and we're going to do full-data reads from everyone -- that is, this is the final
+ * stage in the read process.)
+ */
public RepairCallback(IResponseResolver<T> resolver, List<InetAddress> endpoints)
{
this.resolver = resolver;
@@ -46,10 +54,6 @@ public class RepairCallback<T> implement
this.startTime = System.currentTimeMillis();
}
- /**
- * The main difference between this and ReadCallback is, ReadCallback has a ConsistencyLevel
- * it needs to achieve. Repair on the other hand is happy to repair whoever replies within the timeout.
- */
public T get() throws TimeoutException, DigestMismatchException, IOException
{
long timeout = DatabaseDescriptor.getRpcTimeout() - (System.currentTimeMillis() - startTime);
@@ -71,4 +75,9 @@ public class RepairCallback<T> implement
if (resolver.getMessageCount() == endpoints.size())
condition.signal();
}
+
+ public boolean isLatencyForSnitch()
+ {
+ return true;
+ }
}
Added: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RowDigestResolver.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RowDigestResolver.java?rev=1070977&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RowDigestResolver.java (added)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RowDigestResolver.java Tue Feb 15 17:16:52 2011
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ReadResponse;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.net.Message;
+
+public class RowDigestResolver extends AbstractRowResolver
+{
+ public RowDigestResolver(String table, ByteBuffer key)
+ {
+ super(key, table);
+ }
+
+ public Row getData() throws IOException
+ {
+ for (Map.Entry<Message, ReadResponse> entry : replies.entrySet())
+ {
+ ReadResponse result = entry.getValue();
+ if (!result.isDigestQuery())
+ return result.row();
+ }
+
+ throw new AssertionError("getData should not be invoked when no data is present");
+ }
+
+ /*
+ * This method handles two different scenarios:
+ *
+ * 1a)we're handling the initial read, of data from the closest replica + digests
+ * from the rest. In this case we check the digests against each other,
+ * throw an exception if there is a mismatch, otherwise return the data row.
+ *
+ * 1b)we're checking additional digests that arrived after the minimum to handle
+ * the requested ConsistencyLevel, i.e. asynchronouse read repair check
+ */
+ public Row resolve() throws DigestMismatchException, IOException
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("resolving " + replies.size() + " responses");
+
+ long startTime = System.currentTimeMillis();
+ ColumnFamily data = null;
+
+ // case 1: validate digests against each other; throw immediately on mismatch.
+ // also, collects data results into versions/endpoints lists.
+ //
+ // results are cleared as we process them, to avoid unnecessary duplication of work
+ // when resolve() is called a second time for read repair on responses that were not
+ // necessary to satisfy ConsistencyLevel.
+ ByteBuffer digest = null;
+ for (Map.Entry<Message, ReadResponse> entry : replies.entrySet())
+ {
+ ReadResponse response = entry.getValue();
+ if (response.isDigestQuery())
+ {
+ if (digest == null)
+ {
+ digest = response.digest();
+ }
+ else
+ {
+ ByteBuffer digest2 = response.digest();
+ if (!digest.equals(digest2))
+ throw new DigestMismatchException(key, digest, digest2);
+ }
+ }
+ else
+ {
+ data = response.row().cf;
+ }
+ }
+
+ // If there was a digest query compare it with all the data digests
+ // If there is a mismatch then throw an exception so that read repair can happen.
+ //
+ // It's important to note that we do not compare the digests of multiple data responses --
+ // if we are in that situation we know there was a previous mismatch and now we're doing a repair,
+ // so our job is now case 2: figure out what the most recent version is and update everyone to that version.
+ if (digest != null)
+ {
+ ByteBuffer digest2 = ColumnFamily.digest(data);
+ if (!digest.equals(digest2))
+ throw new DigestMismatchException(key, digest, digest2);
+ if (logger.isDebugEnabled())
+ logger.debug("digests verified");
+ }
+
+ if (logger.isDebugEnabled())
+ logger.debug("resolve: " + (System.currentTimeMillis() - startTime) + " ms.");
+ return new Row(key, data);
+ }
+
+ public boolean isDataPresent()
+ {
+ for (ReadResponse result : replies.values())
+ {
+ if (!result.isDigestQuery())
+ return true;
+ }
+ return false;
+ }
+}
Added: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RowRepairResolver.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RowRepairResolver.java?rev=1070977&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RowRepairResolver.java (added)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RowRepairResolver.java Tue Feb 15 17:16:52 2011
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.io.IOError;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+
+public class RowRepairResolver extends AbstractRowResolver
+{
+ public RowRepairResolver(String table, ByteBuffer key)
+ {
+ super(key, table);
+ }
+
+ /*
+ * This method handles the following scenario:
+ *
+ * there was a mismatch on the initial read (1a or 1b), so we redid the digest requests
+ * as full data reads. In this case we need to compute the most recent version
+ * of each column, and send diffs to out-of-date replicas.
+ */
+ public Row resolve() throws DigestMismatchException, IOException
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("resolving " + replies.size() + " responses");
+
+ long startTime = System.currentTimeMillis();
+ List<ColumnFamily> versions = new ArrayList<ColumnFamily>();
+ List<InetAddress> endpoints = new ArrayList<InetAddress>();
+
+ // case 1: validate digests against each other; throw immediately on mismatch.
+ // also, collects data results into versions/endpoints lists.
+ //
+ // results are cleared as we process them, to avoid unnecessary duplication of work
+ // when resolve() is called a second time for read repair on responses that were not
+ // necessary to satisfy ConsistencyLevel.
+ for (Map.Entry<Message, ReadResponse> entry : replies.entrySet())
+ {
+ Message message = entry.getKey();
+ ReadResponse response = entry.getValue();
+ assert !response.isDigestQuery();
+ versions.add(response.row().cf);
+ endpoints.add(message.getFrom());
+ }
+
+ ColumnFamily resolved;
+ if (versions.size() > 1)
+ {
+ resolved = resolveSuperset(versions);
+ if (logger.isDebugEnabled())
+ logger.debug("versions merged");
+ maybeScheduleRepairs(resolved, table, key, versions, endpoints);
+ }
+ else
+ {
+ resolved = versions.get(0);
+ }
+
+ if (logger.isDebugEnabled())
+ logger.debug("resolve: " + (System.currentTimeMillis() - startTime) + " ms.");
+ return new Row(key, resolved);
+ }
+
+ /**
+ * For each row version, compare with resolved (the superset of all row versions);
+ * if it is missing anything, send a mutation to the endpoint it come from.
+ */
+ public static void maybeScheduleRepairs(ColumnFamily resolved, String table, DecoratedKey key, List<ColumnFamily> versions, List<InetAddress> endpoints)
+ {
+ for (int i = 0; i < versions.size(); i++)
+ {
+ ColumnFamily diffCf = ColumnFamily.diff(versions.get(i), resolved);
+ if (diffCf == null) // no repair needs to happen
+ continue;
+
+ // create and send the row mutation message based on the diff
+ RowMutation rowMutation = new RowMutation(table, key.key);
+ rowMutation.add(diffCf);
+ Message repairMessage;
+ try
+ {
+ repairMessage = rowMutation.makeRowMutationMessage(StorageService.Verb.READ_REPAIR);
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ MessagingService.instance().sendOneWay(repairMessage, endpoints.get(i));
+ }
+ }
+
+ static ColumnFamily resolveSuperset(List<ColumnFamily> versions)
+ {
+ assert versions.size() > 0;
+
+ ColumnFamily resolved = null;
+ for (ColumnFamily cf : versions)
+ {
+ if (cf != null)
+ {
+ resolved = cf.cloneMe();
+ break;
+ }
+ }
+ if (resolved == null)
+ return null;
+
+ for (ColumnFamily cf : versions)
+ resolved.resolve(cf);
+
+ return resolved;
+ }
+
+ public Row getData() throws IOException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public boolean isDataPresent()
+ {
+ throw new UnsupportedOperationException();
+ }
+}
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1070977&r1=1070976&r2=1070977&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java Tue Feb 15 17:16:52 2011
@@ -36,7 +36,6 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.QueryFilter;
@@ -59,17 +58,6 @@ public class StorageProxy implements Sto
{
private static final Logger logger = LoggerFactory.getLogger(StorageProxy.class);
- private static ScheduledExecutorService repairExecutor = new ScheduledThreadPoolExecutor(1); // TODO JMX-enable this
-
- private static final ThreadLocal<Random> random = new ThreadLocal<Random>()
- {
- @Override
- protected Random initialValue()
- {
- return new Random();
- }
- };
-
// mbean stuff
private static final LatencyTracker readStats = new LatencyTracker();
private static final LatencyTracker rangeStats = new LatencyTracker();
@@ -78,6 +66,8 @@ public class StorageProxy implements Sto
private static int maxHintWindow = DatabaseDescriptor.getMaxHintWindow();
private static final String UNREACHABLE = "UNREACHABLE";
+ public static final StorageProxy instance = new StorageProxy();
+
private StorageProxy() {}
static
{
@@ -323,66 +313,55 @@ public class StorageProxy implements Sto
private static List<Row> fetchRows(List<ReadCommand> commands, ConsistencyLevel consistency_level) throws IOException, UnavailableException, TimeoutException
{
List<ReadCallback<Row>> readCallbacks = new ArrayList<ReadCallback<Row>>();
- List<List<InetAddress>> commandEndpoints = new ArrayList<List<InetAddress>>();
List<Row> rows = new ArrayList<Row>();
- Set<ReadCommand> repairs = new HashSet<ReadCommand>();
// send out read requests
for (ReadCommand command: commands)
{
assert !command.isDigestQuery();
+ logger.debug("Command/ConsistencyLevel is {}/{}", command, consistency_level);
List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(command.table, command.key);
DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getLocalAddress(), endpoints);
- ReadResponseResolver resolver = new ReadResponseResolver(command.table, command.key);
- ReadCallback<Row> handler = getReadCallback(resolver, command.table, consistency_level);
- handler.assureSufficientLiveNodes(endpoints);
+ RowDigestResolver resolver = new RowDigestResolver(command.table, command.key);
+ ReadCallback<Row> handler = getReadCallback(resolver, command, consistency_level, endpoints);
+ handler.assureSufficientLiveNodes();
+ assert !handler.endpoints.isEmpty();
- // if we're not going to read repair, cut the endpoints list down to the ones required to satisfy ConsistencyLevel
- if (randomlyReadRepair(command))
- {
- if (endpoints.size() > handler.blockfor)
- repairs.add(command);
- }
- else
- {
- endpoints = endpoints.subList(0, handler.blockfor);
- }
-
// The data-request message is sent to dataPoint, the node that will actually get
// the data for us. The other replicas are only sent a digest query.
ReadCommand digestCommand = null;
- if (endpoints.size() > 1)
+ if (handler.endpoints.size() > 1)
{
digestCommand = command.copy();
digestCommand.setDigestQuery(true);
}
- InetAddress dataPoint = endpoints.get(0);
+ InetAddress dataPoint = handler.endpoints.get(0);
if (dataPoint.equals(FBUtilities.getLocalAddress()))
{
if (logger.isDebugEnabled())
- logger.debug("reading data for " + command + " locally");
+ logger.debug("reading data locally");
StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(command, handler));
}
else
{
Message message = command.makeReadMessage();
if (logger.isDebugEnabled())
- logger.debug("reading data for " + command + " from " + dataPoint);
+ logger.debug("reading data from " + dataPoint);
MessagingService.instance().sendRR(message, dataPoint, handler);
}
// We lazy-construct the digest Message object since it may not be necessary if we
// are doing a local digest read, or no digest reads at all.
Message digestMessage = null;
- for (InetAddress digestPoint : endpoints.subList(1, endpoints.size()))
+ for (InetAddress digestPoint : handler.endpoints.subList(1, handler.endpoints.size()))
{
if (digestPoint.equals(FBUtilities.getLocalAddress()))
{
if (logger.isDebugEnabled())
- logger.debug("reading digest for " + command + " locally");
+ logger.debug("reading digest locally");
StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(digestCommand, handler));
}
else
@@ -390,44 +369,45 @@ public class StorageProxy implements Sto
if (digestMessage == null)
digestMessage = digestCommand.makeReadMessage();
if (logger.isDebugEnabled())
- logger.debug("reading digest for " + command + " from " + digestPoint);
+ logger.debug("reading digest for from " + digestPoint);
MessagingService.instance().sendRR(digestMessage, digestPoint, handler);
}
}
readCallbacks.add(handler);
- commandEndpoints.add(endpoints);
}
// read results and make a second pass for any digest mismatches
List<RepairCallback<Row>> repairResponseHandlers = null;
for (int i = 0; i < commands.size(); i++)
{
- ReadCallback<Row> readCallback = readCallbacks.get(i);
+ ReadCallback<Row> handler = readCallbacks.get(i);
Row row;
ReadCommand command = commands.get(i);
- List<InetAddress> endpoints = commandEndpoints.get(i);
try
{
long startTime2 = System.currentTimeMillis();
- row = readCallback.get(); // CL.ONE is special cased here to ignore digests even if some have arrived
+ row = handler.get(); // CL.ONE is special cased here to ignore digests even if some have arrived
if (row != null)
rows.add(row);
if (logger.isDebugEnabled())
logger.debug("Read: " + (System.currentTimeMillis() - startTime2) + " ms.");
-
- if (repairs.contains(command))
- repairExecutor.schedule(new RepairRunner(readCallback.resolver, command, endpoints), DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
}
catch (DigestMismatchException ex)
{
if (logger.isDebugEnabled())
logger.debug("Digest mismatch:", ex);
- RepairCallback<Row> handler = repair(command, endpoints);
+
+ RowRepairResolver resolver = new RowRepairResolver(command.table, command.key);
+ RepairCallback<Row> repairHandler = new RepairCallback<Row>(resolver, handler.endpoints);
+ Message messageRepair = command.makeReadMessage();
+ for (InetAddress endpoint : handler.endpoints)
+ MessagingService.instance().sendRR(messageRepair, endpoint, repairHandler);
+
if (repairResponseHandlers == null)
repairResponseHandlers = new ArrayList<RepairCallback<Row>>();
- repairResponseHandlers.add(handler);
+ repairResponseHandlers.add(repairHandler);
}
}
@@ -476,24 +456,13 @@ public class StorageProxy implements Sto
}
}
- static <T> ReadCallback<T> getReadCallback(IResponseResolver<T> resolver, String table, ConsistencyLevel consistencyLevel)
+ static <T> ReadCallback<T> getReadCallback(IResponseResolver<T> resolver, IReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddress> endpoints)
{
if (consistencyLevel.equals(ConsistencyLevel.LOCAL_QUORUM) || consistencyLevel.equals(ConsistencyLevel.EACH_QUORUM))
{
- return new DatacenterReadCallback(resolver, consistencyLevel, table);
+ return new DatacenterReadCallback(resolver, consistencyLevel, command, endpoints);
}
- return new ReadCallback(resolver, consistencyLevel, table);
- }
-
- private static RepairCallback<Row> repair(ReadCommand command, List<InetAddress> endpoints)
- throws IOException
- {
- ReadResponseResolver resolver = new ReadResponseResolver(command.table, command.key);
- RepairCallback<Row> handler = new RepairCallback<Row>(resolver, endpoints);
- Message messageRepair = command.makeReadMessage();
- for (InetAddress endpoint : endpoints)
- MessagingService.instance().sendRR(messageRepair, endpoint, handler);
- return handler;
+ return new ReadCallback(resolver, consistencyLevel, command, endpoints);
}
/*
@@ -545,16 +514,14 @@ public class StorageProxy implements Sto
// collect replies and resolve according to consistency level
RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(command.keyspace, liveEndpoints);
- AbstractReplicationStrategy rs = Table.open(command.keyspace).getReplicationStrategy();
- ReadCallback<List<Row>> handler = getReadCallback(resolver, command.keyspace, consistency_level);
- // TODO bail early if live endpoints can't satisfy requested consistency level
+ ReadCallback<List<Row>> handler = getReadCallback(resolver, command, consistency_level, liveEndpoints);
+ handler.assureSufficientLiveNodes();
for (InetAddress endpoint : liveEndpoints)
{
MessagingService.instance().sendRR(message, endpoint, handler);
if (logger.isDebugEnabled())
logger.debug("reading " + c2 + " from " + endpoint);
}
- // TODO read repair on remaining replicas?
// if we're done, great, otherwise, move to the next range
try
@@ -607,6 +574,11 @@ public class StorageProxy implements Sto
versions.put(message.getFrom(), theirVersion);
latch.countDown();
}
+
+ public boolean isLatencyForSnitch()
+ {
+ return false;
+ }
};
// an empty message acts as a request to the SchemaCheckVerbHandler.
for (InetAddress endpoint : liveHosts)
@@ -699,12 +671,6 @@ public class StorageProxy implements Sto
return ranges;
}
-
- private static boolean randomlyReadRepair(ReadCommand command)
- {
- CFMetaData cfmd = DatabaseDescriptor.getTableMetaData(command.table).get(command.getColumnFamilyName());
- return cfmd.getReadRepairChance() > random.get().nextDouble();
- }
public long getReadOperations()
{
@@ -781,7 +747,7 @@ public class StorageProxy implements Sto
return writeStats.getRecentLatencyHistogramMicros();
}
- public static List<Row> scan(String keyspace, String column_family, IndexClause index_clause, SlicePredicate column_predicate, ConsistencyLevel consistency_level)
+ public static List<Row> scan(final String keyspace, String column_family, IndexClause index_clause, SlicePredicate column_predicate, ConsistencyLevel consistency_level)
throws IOException, TimeoutException, UnavailableException
{
IPartitioner p = StorageService.getPartitioner();
@@ -799,12 +765,16 @@ public class StorageProxy implements Sto
// collect replies and resolve according to consistency level
RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(keyspace, liveEndpoints);
- ReadCallback<List<Row>> handler = getReadCallback(resolver, keyspace, consistency_level);
-
- // bail early if live endpoints can't satisfy requested consistency level
- if(handler.blockfor > liveEndpoints.size())
- throw new UnavailableException();
-
+ IReadCommand iCommand = new IReadCommand()
+ {
+ public String getKeyspace()
+ {
+ return keyspace;
+ }
+ };
+ ReadCallback<List<Row>> handler = getReadCallback(resolver, iCommand, consistency_level, liveEndpoints);
+ handler.assureSufficientLiveNodes();
+
IndexScanCommand command = new IndexScanCommand(keyspace, column_family, index_clause, column_predicate, range);
Message message = command.getMessage();
for (InetAddress endpoint : liveEndpoints)
@@ -912,40 +882,4 @@ public class StorageProxy implements Sto
{
return !Gossiper.instance.getUnreachableMembers().isEmpty();
}
-
- private static class RepairRunner extends WrappedRunnable
- {
- private final IResponseResolver<Row> resolver;
- private final ReadCommand command;
- private final List<InetAddress> endpoints;
-
- public RepairRunner(IResponseResolver<Row> resolver, ReadCommand command, List<InetAddress> endpoints)
- {
- this.resolver = resolver;
- this.command = command;
- this.endpoints = endpoints;
- }
-
- protected void runMayThrow() throws IOException
- {
- try
- {
- resolver.resolve();
- }
- catch (DigestMismatchException e)
- {
- if (logger.isDebugEnabled())
- logger.debug("Digest mismatch:", e);
- final RepairCallback<Row> callback = repair(command, endpoints);
- Runnable runnable = new WrappedRunnable()
- {
- public void runMayThrow() throws DigestMismatchException, IOException, TimeoutException
- {
- callback.get();
- }
- };
- repairExecutor.schedule(runnable, DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
- }
- }
- }
}
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/TruncateResponseHandler.java?rev=1070977&r1=1070976&r2=1070977&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/TruncateResponseHandler.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/TruncateResponseHandler.java Tue Feb 15 17:16:52 2011
@@ -73,4 +73,9 @@ public class TruncateResponseHandler imp
if (responses.get() >= responseCount)
condition.signal();
}
+
+ public boolean isLatencyForSnitch()
+ {
+ return false;
+ }
}
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/WriteResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/WriteResponseHandler.java?rev=1070977&r1=1070976&r2=1070977&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/WriteResponseHandler.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/WriteResponseHandler.java Tue Feb 15 17:16:52 2011
@@ -121,4 +121,9 @@ public class WriteResponseHandler extend
throw new UnavailableException();
}
}
+
+ public boolean isLatencyForSnitch()
+ {
+ return false;
+ }
}
Modified: cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java?rev=1070977&r1=1070976&r2=1070977&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java (original)
+++ cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java Tue Feb 15 17:16:52 2011
@@ -71,7 +71,7 @@ public class ConsistencyLevelTest extend
AbstractReplicationStrategy strategy;
- for (String table : DatabaseDescriptor.getNonSystemTables())
+ for (final String table : DatabaseDescriptor.getNonSystemTables())
{
strategy = getStrategy(table, tmd);
StorageService.calculatePendingRanges(strategy, table);
@@ -96,7 +96,15 @@ public class ConsistencyLevelTest extend
IWriteResponseHandler writeHandler = strategy.getWriteResponseHandler(hosts, hintedNodes, c);
- ReadCallback<Row> readHandler = StorageProxy.getReadCallback(new ReadResponseResolver(table, ByteBufferUtil.bytes("foo")), table, c);
+ IReadCommand command = new IReadCommand()
+ {
+ public String getKeyspace()
+ {
+ return table;
+ }
+ };
+ RowRepairResolver resolver = new RowRepairResolver(table, ByteBufferUtil.bytes("foo"));
+ ReadCallback<Row> readHandler = StorageProxy.getReadCallback(resolver, command, c, new ArrayList<InetAddress>(hintedNodes.keySet()));
boolean isWriteUnavailable = false;
boolean isReadUnavailable = false;
@@ -111,7 +119,7 @@ public class ConsistencyLevelTest extend
try
{
- readHandler.assureSufficientLiveNodes(hintedNodes.asMap().keySet());
+ readHandler.assureSufficientLiveNodes();
}
catch (UnavailableException e)
{
Modified: cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ReadResponseResolverTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ReadResponseResolverTest.java?rev=1070977&r1=1070976&r2=1070977&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ReadResponseResolverTest.java (original)
+++ cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ReadResponseResolverTest.java Tue Feb 15 17:16:52 2011
@@ -1,96 +0,0 @@
-package org.apache.cassandra.service;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-import java.util.Arrays;
-
-import org.apache.cassandra.SchemaLoader;
-import org.junit.Test;
-
-import org.apache.cassandra.db.ColumnFamily;
-
-import static org.apache.cassandra.db.TableTest.assertColumns;
-import static org.apache.cassandra.Util.column;
-import static junit.framework.Assert.assertNull;
-
-public class ReadResponseResolverTest extends SchemaLoader
-{
- @Test
- public void testResolveSupersetNewer()
- {
- ColumnFamily cf1 = ColumnFamily.create("Keyspace1", "Standard1");
- cf1.addColumn(column("c1", "v1", 0));
-
- ColumnFamily cf2 = ColumnFamily.create("Keyspace1", "Standard1");
- cf2.addColumn(column("c1", "v2", 1));
-
- ColumnFamily resolved = ReadResponseResolver.resolveSuperset(Arrays.asList(cf1, cf2));
- assertColumns(resolved, "c1");
- assertColumns(ColumnFamily.diff(cf1, resolved), "c1");
- assertNull(ColumnFamily.diff(cf2, resolved));
- }
-
- @Test
- public void testResolveSupersetDisjoint()
- {
- ColumnFamily cf1 = ColumnFamily.create("Keyspace1", "Standard1");
- cf1.addColumn(column("c1", "v1", 0));
-
- ColumnFamily cf2 = ColumnFamily.create("Keyspace1", "Standard1");
- cf2.addColumn(column("c2", "v2", 1));
-
- ColumnFamily resolved = ReadResponseResolver.resolveSuperset(Arrays.asList(cf1, cf2));
- assertColumns(resolved, "c1", "c2");
- assertColumns(ColumnFamily.diff(cf1, resolved), "c2");
- assertColumns(ColumnFamily.diff(cf2, resolved), "c1");
- }
-
- @Test
- public void testResolveSupersetNullOne()
- {
- ColumnFamily cf2 = ColumnFamily.create("Keyspace1", "Standard1");
- cf2.addColumn(column("c2", "v2", 1));
-
- ColumnFamily resolved = ReadResponseResolver.resolveSuperset(Arrays.asList(null, cf2));
- assertColumns(resolved, "c2");
- assertColumns(ColumnFamily.diff(null, resolved), "c2");
- assertNull(ColumnFamily.diff(cf2, resolved));
- }
-
- @Test
- public void testResolveSupersetNullTwo()
- {
- ColumnFamily cf1 = ColumnFamily.create("Keyspace1", "Standard1");
- cf1.addColumn(column("c1", "v1", 0));
-
- ColumnFamily resolved = ReadResponseResolver.resolveSuperset(Arrays.asList(cf1, null));
- assertColumns(resolved, "c1");
- assertNull(ColumnFamily.diff(cf1, resolved));
- assertColumns(ColumnFamily.diff(null, resolved), "c1");
- }
-
- @Test
- public void testResolveSupersetNullBoth()
- {
- assertNull(ReadResponseResolver.resolveSuperset(Arrays.<ColumnFamily>asList(null, null)));
- }
-}
Added: cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/RowResolverTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/RowResolverTest.java?rev=1070977&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/RowResolverTest.java (added)
+++ cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/RowResolverTest.java Tue Feb 15 17:16:52 2011
@@ -0,0 +1,96 @@
+package org.apache.cassandra.service;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.util.Arrays;
+
+import org.apache.cassandra.SchemaLoader;
+import org.junit.Test;
+
+import org.apache.cassandra.db.ColumnFamily;
+
+import static org.apache.cassandra.db.TableTest.assertColumns;
+import static org.apache.cassandra.Util.column;
+import static junit.framework.Assert.assertNull;
+
+public class RowResolverTest extends SchemaLoader
+{
+ @Test
+ public void testResolveSupersetNewer()
+ {
+ ColumnFamily cf1 = ColumnFamily.create("Keyspace1", "Standard1");
+ cf1.addColumn(column("c1", "v1", 0));
+
+ ColumnFamily cf2 = ColumnFamily.create("Keyspace1", "Standard1");
+ cf2.addColumn(column("c1", "v2", 1));
+
+ ColumnFamily resolved = RowRepairResolver.resolveSuperset(Arrays.asList(cf1, cf2));
+ assertColumns(resolved, "c1");
+ assertColumns(ColumnFamily.diff(cf1, resolved), "c1");
+ assertNull(ColumnFamily.diff(cf2, resolved));
+ }
+
+ @Test
+ public void testResolveSupersetDisjoint()
+ {
+ ColumnFamily cf1 = ColumnFamily.create("Keyspace1", "Standard1");
+ cf1.addColumn(column("c1", "v1", 0));
+
+ ColumnFamily cf2 = ColumnFamily.create("Keyspace1", "Standard1");
+ cf2.addColumn(column("c2", "v2", 1));
+
+ ColumnFamily resolved = RowRepairResolver.resolveSuperset(Arrays.asList(cf1, cf2));
+ assertColumns(resolved, "c1", "c2");
+ assertColumns(ColumnFamily.diff(cf1, resolved), "c2");
+ assertColumns(ColumnFamily.diff(cf2, resolved), "c1");
+ }
+
+ @Test
+ public void testResolveSupersetNullOne()
+ {
+ ColumnFamily cf2 = ColumnFamily.create("Keyspace1", "Standard1");
+ cf2.addColumn(column("c2", "v2", 1));
+
+ ColumnFamily resolved = RowRepairResolver.resolveSuperset(Arrays.asList(null, cf2));
+ assertColumns(resolved, "c2");
+ assertColumns(ColumnFamily.diff(null, resolved), "c2");
+ assertNull(ColumnFamily.diff(cf2, resolved));
+ }
+
+ @Test
+ public void testResolveSupersetNullTwo()
+ {
+ ColumnFamily cf1 = ColumnFamily.create("Keyspace1", "Standard1");
+ cf1.addColumn(column("c1", "v1", 0));
+
+ ColumnFamily resolved = RowRepairResolver.resolveSuperset(Arrays.asList(cf1, null));
+ assertColumns(resolved, "c1");
+ assertNull(ColumnFamily.diff(cf1, resolved));
+ assertColumns(ColumnFamily.diff(null, resolved), "c1");
+ }
+
+ @Test
+ public void testResolveSupersetNullBoth()
+ {
+ assertNull(RowRepairResolver.resolveSuperset(Arrays.<ColumnFamily>asList(null, null)));
+ }
+}