You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/01/05 07:58:17 UTC
svn commit: r1055319 -
/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/
Author: jbellis
Date: Wed Jan 5 06:58:16 2011
New Revision: 1055319
URL: http://svn.apache.org/viewvc?rev=1055319&view=rev
Log:
r/m SP.weakRead, rename strongRead to fetchRows. implement read repair as a second resolve after the initial one for the data
patch by jbellis; reviewed by Daniel Doubleday for CASSANDRA-982
Removed:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ConsistencyChecker.java
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IResponseResolver.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IResponseResolver.java?rev=1055319&r1=1055318&r2=1055319&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IResponseResolver.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IResponseResolver.java Wed Jan 5 06:58:16 2011
@@ -24,7 +24,7 @@ import org.apache.cassandra.net.Message;
public interface IResponseResolver<T> {
- /*
+ /**
* This Method resolves the responses that are passed in . for example : if
* its write response then all we get is true or false return values which
* implies if the writes were successful but for reads its more complicated
@@ -33,8 +33,14 @@ public interface IResponseResolver<T> {
* needs from this interface.
*/
public T resolve() throws DigestMismatchException, IOException;
+
public boolean isDataPresent();
+ /**
+ * returns the data response without comparing with any digests
+ */
+ public T getData() throws IOException;
+
public void preprocess(Message message);
public Iterable<Message> getMessages();
public int getMessageCount();
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/QuorumResponseHandler.java?rev=1055319&r1=1055318&r2=1055319&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/QuorumResponseHandler.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/QuorumResponseHandler.java Wed Jan 5 06:58:16 2011
@@ -39,8 +39,9 @@ import org.apache.cassandra.utils.Simple
public class QuorumResponseHandler<T> implements IAsyncCallback
{
protected static final Logger logger = LoggerFactory.getLogger( QuorumResponseHandler.class );
+
+ public final IResponseResolver<T> resolver;
protected final SimpleCondition condition = new SimpleCondition();
- protected final IResponseResolver<T> resolver;
private final long startTime;
protected final int blockfor;
@@ -58,38 +59,34 @@ public class QuorumResponseHandler<T> im
public T get() throws TimeoutException, DigestMismatchException, IOException
{
+ long timeout = DatabaseDescriptor.getRpcTimeout() - (System.currentTimeMillis() - startTime);
+ boolean success;
try
{
- long timeout = DatabaseDescriptor.getRpcTimeout() - (System.currentTimeMillis() - startTime);
- boolean success;
- try
- {
- success = condition.await(timeout, TimeUnit.MILLISECONDS);
- }
- catch (InterruptedException ex)
- {
- throw new AssertionError(ex);
- }
-
- if (!success)
- {
- StringBuilder sb = new StringBuilder("");
- for (Message message : resolver.getMessages())
- {
- sb.append(message.getFrom());
- }
- throw new TimeoutException("Operation timed out - received only " + resolver.getMessageCount() + " responses from " + sb.toString() + " .");
- }
+ success = condition.await(timeout, TimeUnit.MILLISECONDS);
}
- finally
+ catch (InterruptedException ex)
{
- for (Message response : resolver.getMessages())
- {
- MessagingService.removeRegisteredCallback(response.getMessageId());
- }
+ throw new AssertionError(ex);
}
- return resolver.resolve();
+ if (!success)
+ {
+ StringBuilder sb = new StringBuilder("");
+ for (Message message : resolver.getMessages())
+ sb.append(message.getFrom()).append(", ");
+ throw new TimeoutException("Operation timed out - received only " + resolver.getMessageCount() + " responses from " + sb.toString() + " .");
+ }
+
+ return blockfor == 1 ? resolver.getData() : resolver.resolve();
+ }
+
+ public void close()
+ {
+ for (Message response : resolver.getMessages())
+ {
+ MessagingService.removeRegisteredCallback(response.getMessageId());
+ }
}
public void response(Message message)
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java?rev=1055319&r1=1055318&r2=1055319&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java Wed Jan 5 06:58:16 2011
@@ -54,6 +54,15 @@ public class RangeSliceResponseResolver
this.table = table;
}
+ public List<Row> getData() throws IOException
+ {
+ Message response = responses.iterator().next();
+ RangeSliceReply reply = RangeSliceReply.read(response.getMessageBody());
+ return reply.rows;
+ }
+
+ // Note: this deserializes the response a 2nd time if getData was called first
+ // (this is not currently an issue since we don't do read repair for range queries.)
public List<Row> resolve() throws IOException
{
CollatingIterator collator = new CollatingIterator(new Comparator<Pair<Row,InetAddress>>()
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=1055319&r1=1055318&r2=1055319&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java Wed Jan 5 06:58:16 2011
@@ -46,6 +46,7 @@ public class ReadResponseResolver implem
private final String table;
private final Map<Message, ReadResponse> results = new NonBlockingHashMap<Message, ReadResponse>();
private DecoratedKey key;
+ private ByteBuffer digest;
public ReadResponseResolver(String table, ByteBuffer key)
{
@@ -53,14 +54,29 @@ public class ReadResponseResolver implem
this.key = StorageService.getPartitioner().decorateKey(key);
}
+ public Row getData() throws IOException
+ {
+ for (Map.Entry<Message, ReadResponse> entry : results.entrySet())
+ {
+ ReadResponse result = entry.getValue();
+ if (!result.isDigestQuery())
+ return result.row();
+ }
+
+ throw new AssertionError("getData should not be invoked when no data is present");
+ }
+
/*
- * This method handles two different scenarios:
+ * This method handles three different scenarios:
*
- * 1) we're handling the initial read, of data from the closest replica + digests
+ * 1a)we're handling the initial read, of data from the closest replica + digests
* from the rest. In this case we check the digests against each other,
* throw an exception if there is a mismatch, otherwise return the data row.
*
- * 2) there was a mismatch on the initial read, so we redid the digest requests
+ * 1b)we're checking additional digests that arrived after the minimum to handle
+ * the requested ConsistencyLevel, i.e. asynchronouse read repair check
+ *
+ * 2) there was a mismatch on the initial read (1a or 1b), so we redid the digest requests
* as full data reads. In this case we need to compute the most recent version
* of each column, and send diffs to out-of-date replicas.
*/
@@ -72,7 +88,6 @@ public class ReadResponseResolver implem
long startTime = System.currentTimeMillis();
List<ColumnFamily> versions = new ArrayList<ColumnFamily>();
List<InetAddress> endpoints = new ArrayList<InetAddress>();
- ByteBuffer digest = null;
// validate digests against each other; throw immediately on mismatch.
// also, collects data results into versions/endpoints lists.
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1055319&r1=1055318&r2=1055319&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java Wed Jan 5 06:58:16 2011
@@ -48,7 +48,6 @@ import org.apache.cassandra.gms.Gossiper
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.net.IAsyncCallback;
-import org.apache.cassandra.net.IAsyncResult;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.thrift.*;
@@ -63,6 +62,8 @@ public class StorageProxy implements Sto
{
private static final Logger logger = LoggerFactory.getLogger(StorageProxy.class);
+ private static ScheduledExecutorService repairExecutor = new ScheduledThreadPoolExecutor(1); // TODO JMX-enable this
+
private static final Random random = new Random();
// mbean stuff
private static final LatencyTracker readStats = new LatencyTracker();
@@ -305,15 +306,7 @@ public class StorageProxy implements Sto
List<Row> rows;
try
{
- if (consistency_level == ConsistencyLevel.ONE)
- {
- rows = weakRead(commands);
- }
- else
- {
- assert consistency_level.getValue() >= ConsistencyLevel.QUORUM.getValue();
- rows = strongRead(commands, consistency_level);
- }
+ rows = fetchRows(commands, consistency_level);
}
finally
{
@@ -322,91 +315,23 @@ public class StorageProxy implements Sto
return rows;
}
- private static List<Row> weakRead(List<ReadCommand> commands) throws IOException, UnavailableException, TimeoutException
- {
- List<Row> rows = new ArrayList<Row>();
-
- // send off all the commands asynchronously
- List<Future<Object>> localFutures = null;
- HashMap<ReadCommand, IAsyncResult> remoteResults = null;
- for (ReadCommand command: commands)
- {
- InetAddress endPoint = StorageService.instance.findSuitableEndpoint(command.table, command.key);
- if (endPoint.equals(FBUtilities.getLocalAddress()))
- {
- if (logger.isDebugEnabled())
- logger.debug("weakread reading " + command + " locally");
-
- if (localFutures == null)
- localFutures = new ArrayList<Future<Object>>();
- Callable<Object> callable = new weakReadLocalCallable(command);
- localFutures.add(StageManager.getStage(Stage.READ).submit(callable));
- }
- else
- {
- if (remoteResults == null)
- remoteResults = new HashMap<ReadCommand, IAsyncResult>();
- Message message = command.makeReadMessage();
- if (logger.isDebugEnabled())
- logger.debug("weakread reading " + command + " from " + message.getMessageId() + "@" + endPoint);
- remoteResults.put(command, MessagingService.instance().sendRR(message, endPoint));
- }
- }
-
- // wait for results
- if (localFutures != null)
- {
- for (Future<Object> future : localFutures)
- {
- Row row;
- try
- {
- row = (Row) future.get();
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- rows.add(row);
- }
- }
- if (remoteResults != null)
- {
- for (Map.Entry<ReadCommand, IAsyncResult> entry : remoteResults.entrySet())
- {
- ReadCommand command = entry.getKey();
- IAsyncResult iar = entry.getValue();
- byte[] body;
- body = iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
- ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
- ReadResponse response = ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
- assert response.row() != null;
- rows.add(response.row());
- if (randomlyReadRepair(command))
- StorageService.instance.doConsistencyCheck(response.row(), command, iar.getFrom());
- }
- }
-
- return rows;
- }
-
- /*
- * This function executes the read protocol.
- // 1. Get the N nodes from storage service where the data needs to be
- // replicated
- // 2. Construct a message for read\write
- * 3. Set one of the messages to get the data and the rest to get the digest
- // 4. SendRR ( to all the nodes above )
- // 5. Wait for a response from at least X nodes where X <= N and the data node
- * 6. If the digest matches return the data.
- * 7. else carry out read repair by getting data from all the nodes.
- // 5. return success
+ /**
+ * This function executes local and remote reads, and blocks for the results:
+ *
+ * 1. Get the replica locations, sorted by response time according to the snitch
+ * 2. Send a data request to the closest replica, and digest requests to either
+ * a) all the replicas, if read repair is enabled
+ * b) the closest R-1 replicas, where R is the number required to satisfy the ConsistencyLevel
+ * 3. Wait for a response from R replicas
+ * 4. If the digests (if any) match the data return the data
+ * 5. else carry out read repair by getting data from all the nodes.
*/
- private static List<Row> strongRead(List<ReadCommand> commands, ConsistencyLevel consistency_level) throws IOException, UnavailableException, TimeoutException
+ private static List<Row> fetchRows(List<ReadCommand> commands, ConsistencyLevel consistency_level) throws IOException, UnavailableException, TimeoutException
{
List<QuorumResponseHandler<Row>> quorumResponseHandlers = new ArrayList<QuorumResponseHandler<Row>>();
List<List<InetAddress>> commandEndpoints = new ArrayList<List<InetAddress>>();
List<Row> rows = new ArrayList<Row>();
+ Set<ReadCommand> repairs = new HashSet<ReadCommand>();
// send out read requests
for (ReadCommand command: commands)
@@ -425,16 +350,28 @@ public class StorageProxy implements Sto
QuorumResponseHandler<Row> handler = rs.getQuorumResponseHandler(resolver, consistency_level);
handler.assureSufficientLiveNodes(endpoints);
- Message messages[] = new Message[endpoints.size()];
+ int targets;
+ if (randomlyReadRepair(command))
+ {
+ targets = endpoints.size();
+ if (targets > handler.blockfor)
+ repairs.add(command);
+ }
+ else
+ {
+ targets = handler.blockfor;
+ }
+ Message[] messages = new Message[targets];
+
// data-request message is sent to dataPoint, the node that will actually get
// the data for us. The other replicas are only sent a digest query.
- int n = 0;
- for (InetAddress endpoint : endpoints)
+ for (int i = 0; i < messages.length; i++)
{
+ InetAddress endpoint = endpoints.get(i);
Message m = endpoint.equals(dataPoint) ? message : messageDigestOnly;
- messages[n++] = m;
+ messages[i] = m;
if (logger.isDebugEnabled())
- logger.debug("strongread reading " + (m == message ? "data" : "digest") + " for " + command + " from " + m.getMessageId() + "@" + endpoint);
+ logger.debug("reading " + (m == message ? "data" : "digest") + " for " + command + " from " + m.getMessageId() + "@" + endpoint);
}
MessagingService.instance().sendRR(messages, endpoints, handler);
quorumResponseHandlers.add(handler);
@@ -442,12 +379,13 @@ public class StorageProxy implements Sto
}
// read results and make a second pass for any digest mismatches
- List<QuorumResponseHandler<Row>> repairResponseHandlers = null;
+ List<RepairCallback<Row>> repairResponseHandlers = null;
for (int i = 0; i < commands.size(); i++)
{
QuorumResponseHandler<Row> quorumResponseHandler = quorumResponseHandlers.get(i);
Row row;
ReadCommand command = commands.get(i);
+ List<InetAddress> endpoints = commandEndpoints.get(i);
try
{
long startTime2 = System.currentTimeMillis();
@@ -457,18 +395,17 @@ public class StorageProxy implements Sto
if (logger.isDebugEnabled())
logger.debug("quorumResponseHandler: " + (System.currentTimeMillis() - startTime2) + " ms.");
+
+ if (repairs.contains(command))
+ repairExecutor.schedule(new RepairRunner(quorumResponseHandler.resolver, command, endpoints), DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
}
catch (DigestMismatchException ex)
{
- AbstractReplicationStrategy rs = Table.open(command.table).getReplicationStrategy();
- ReadResponseResolver resolver = new ReadResponseResolver(command.table, command.key);
- QuorumResponseHandler<Row> handler = rs.getQuorumResponseHandler(resolver, consistency_level);
if (logger.isDebugEnabled())
logger.debug("Digest mismatch:", ex);
- Message messageRepair = command.makeReadMessage();
- MessagingService.instance().sendRR(messageRepair, commandEndpoints.get(i), handler);
+ RepairCallback<Row> handler = repair(command, endpoints);
if (repairResponseHandlers == null)
- repairResponseHandlers = new ArrayList<QuorumResponseHandler<Row>>();
+ repairResponseHandlers = new ArrayList<RepairCallback<Row>>();
repairResponseHandlers.add(handler);
}
}
@@ -476,7 +413,7 @@ public class StorageProxy implements Sto
// read the results for the digest mismatch retries
if (repairResponseHandlers != null)
{
- for (QuorumResponseHandler<Row> handler : repairResponseHandlers)
+ for (RepairCallback<Row> handler : repairResponseHandlers)
{
try
{
@@ -494,6 +431,17 @@ public class StorageProxy implements Sto
return rows;
}
+ // TODO repair resolver shouldn't take consistencylevel (it should repair exactly as many as it receives replies for)
+ private static RepairCallback<Row> repair(ReadCommand command, List<InetAddress> endpoints)
+ throws IOException
+ {
+ ReadResponseResolver resolver = new ReadResponseResolver(command.table, command.key);
+ RepairCallback<Row> handler = new RepairCallback<Row>(resolver, endpoints);
+ Message messageRepair = command.makeReadMessage();
+ MessagingService.instance().sendRR(messageRepair, endpoints, handler);
+ return handler;
+ }
+
/*
* This function executes the read protocol locally. Consistency checks are performed in the background.
*/
@@ -845,31 +793,6 @@ public class StorageProxy implements Sto
return hintedHandoffEnabled;
}
- static class weakReadLocalCallable implements Callable<Object>
- {
- private ReadCommand command;
-
- weakReadLocalCallable(ReadCommand command)
- {
- this.command = command;
- }
-
- public Object call() throws IOException
- {
- if (logger.isDebugEnabled())
- logger.debug("weakreadlocal reading " + command);
-
- Table table = Table.open(command.table);
- Row row = command.getRow(table);
-
- // Do the consistency checks in the background
- if (randomlyReadRepair(command))
- StorageService.instance.doConsistencyCheck(row, command, FBUtilities.getLocalAddress());
-
- return row;
- }
- }
-
/**
* Performs the truncate operatoin, which effectively deletes all data from
* the column family cfname
@@ -915,4 +838,32 @@ public class StorageProxy implements Sto
{
return !Gossiper.instance.getUnreachableMembers().isEmpty();
}
+
+ private static class RepairRunner extends WrappedRunnable
+ {
+ private final IResponseResolver<Row> resolver;
+ private final ReadCommand command;
+ private final List<InetAddress> endpoints;
+
+ public RepairRunner(IResponseResolver<Row> resolver, ReadCommand command, List<InetAddress> endpoints)
+ {
+ this.resolver = resolver;
+ this.command = command;
+ this.endpoints = endpoints;
+ }
+
+ protected void runMayThrow() throws IOException
+ {
+ try
+ {
+ resolver.resolve();
+ }
+ catch (DigestMismatchException e)
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("Digest mismatch:", e);
+ repair(command, endpoints);
+ }
+ }
+ }
}
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java?rev=1055319&r1=1055318&r2=1055319&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java Wed Jan 5 06:58:16 2011
@@ -449,18 +449,6 @@ public class StorageService implements I
}
/**
- * This method performs the requisite operations to make
- * sure that the N replicas are in sync. We do this in the
- * background when we do not care much about consistency.
- */
- public void doConsistencyCheck(Row row, ReadCommand command, InetAddress dataSource)
- {
- List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(command.table, command.key);
- if (endpoints.size() > 1)
- consistencyManager_.submit(new ConsistencyChecker(command, row, endpoints, dataSource));
- }
-
- /**
* for a keyspace, return the ranges and corresponding hosts for a given keyspace.
* @param keyspace
* @return