You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/01/27 17:53:44 UTC
svn commit: r1064193 - in /cassandra/branches/cassandra-0.6: ./
src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/net/
src/java/org/apache/cassandra/service/ src/java/org/apache/cassandra/utils/
Author: jbellis
Date: Thu Jan 27 16:53:44 2011
New Revision: 1064193
URL: http://svn.apache.org/viewvc?rev=1064193&view=rev
Log:
reduce garbage generated by MessagingServiceto prevent loadspikes
patch by jbellis; reviewed by brandonwilliams and tjake for CASSANDRA-2058
Modified:
cassandra/branches/cassandra-0.6/CHANGES.txt
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/HintedHandOffManager.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/AsyncResult.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/Header.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/Message.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/WriteResponseHandler.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/ExpiringMap.java
Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=1064193&r1=1064192&r2=1064193&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Thu Jan 27 16:53:44 2011
@@ -1,3 +1,8 @@
+0.6.11
+ * reduce garbage generated by MessagingService to prevent load spikes
+ (CASSANDRA-2058)
+
+
0.6.10
* buffer network stack to avoid inefficient small TCP messages while avoiding
the nagle/delayed ack problem (CASSANDRA-1896)
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=1064193&r1=1064192&r2=1064193&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/HintedHandOffManager.java Thu Jan 27 16:53:44 2011
@@ -130,7 +130,7 @@ public class HintedHandOffManager
rm.add(cf);
Message message = rm.makeRowMutationMessage();
WriteResponseHandler responseHandler = new WriteResponseHandler(1, tableName);
- MessagingService.instance.sendRR(message, new InetAddress[] { endPoint }, responseHandler);
+ MessagingService.instance.sendRR(message, endPoint, responseHandler);
try
{
responseHandler.get();
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/AsyncResult.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/AsyncResult.java?rev=1064193&r1=1064192&r2=1064193&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/AsyncResult.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/AsyncResult.java Thu Jan 27 16:53:44 2011
@@ -121,8 +121,6 @@ class AsyncResult implements IAsyncResul
{
lock_.unlock();
}
-
- MessagingService.removeRegisteredCallback(response.getMessageId());
}
public InetAddress getFrom()
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/Header.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/Header.java?rev=1064193&r1=1064192&r2=1064193&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/Header.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/Header.java Thu Jan 27 16:53:44 2011
@@ -95,11 +95,6 @@ public class Header
return messageId_;
}
- void setMessageId(String id)
- {
- messageId_ = id;
- }
-
byte[] getDetail(Object key)
{
return details_.get(key);
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/Message.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/Message.java?rev=1064193&r1=1064192&r2=1064193&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/Message.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/Message.java Thu Jan 27 16:53:44 2011
@@ -93,11 +93,6 @@ public class Message
return header_.getMessageId();
}
- void setMessageId(String id)
- {
- header_.setMessageId(id);
- }
-
// TODO should take byte[] + length so we don't have to copy to a byte[] of exactly the right len
public Message getReply(InetAddress from, byte[] args)
{
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java?rev=1064193&r1=1064192&r2=1064193&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java Thu Jan 27 16:53:44 2011
@@ -29,7 +29,6 @@ import java.nio.channels.AsynchronousClo
import java.nio.channels.ServerSocketChannel;
import java.security.MessageDigest;
import java.util.*;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -45,12 +44,12 @@ import org.apache.cassandra.locator.ILat
import org.apache.cassandra.net.io.SerializerType;
import org.apache.cassandra.net.sink.SinkManager;
import org.apache.cassandra.service.GCInspector;
+import org.apache.cassandra.service.QuorumResponseHandler;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ExpiringMap;
-import org.apache.cassandra.utils.GuidGenerator;
+import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.SimpleCondition;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
-import org.cliffc.high_scale_lib.NonBlockingHashSet;
public class MessagingService
{
@@ -62,8 +61,7 @@ public class MessagingService
public static final int PROTOCOL_MAGIC = 0xCA552DFA;
/* This records all the results mapped by message Id */
- private static ExpiringMap<String, IMessageCallback> callbacks;
- private static ConcurrentMap<String, Collection<InetAddress>> targets = new NonBlockingHashMap<String, Collection<InetAddress>>();
+ private static ExpiringMap<String, Pair<InetAddress, IMessageCallback>> callbacks;
/* Lookup table for registering message handlers based on the verb. */
private static Map<StorageService.Verb, IVerbHandler> verbHandlers_;
@@ -103,21 +101,16 @@ public class MessagingService
listenGate = new SimpleCondition();
verbHandlers_ = new HashMap<StorageService.Verb, IVerbHandler>();
- Function<String, ?> timeoutReporter = new Function<String, Object>()
+ Function<Pair<String, Pair<InetAddress, IMessageCallback>>, ?> timeoutReporter = new Function<Pair<String, Pair<InetAddress, IMessageCallback>>, Object>()
{
- public Object apply(String messageId)
+ public Object apply(Pair<String, Pair<InetAddress, IMessageCallback>> pair)
{
- Collection<InetAddress> addresses = targets.remove(messageId);
- if (addresses == null)
- return null;
-
- for (InetAddress address : addresses)
- addLatency(address, (double) DatabaseDescriptor.getRpcTimeout());
-
+ Pair<InetAddress, IMessageCallback> expiredValue = pair.right;
+ maybeAddLatency(expiredValue.right, expiredValue.left, (double) DatabaseDescriptor.getRpcTimeout());
return null;
}
};
- callbacks = new ExpiringMap<String, IMessageCallback>((long) (1.1 * DatabaseDescriptor.getRpcTimeout()), timeoutReporter);
+ callbacks = new ExpiringMap<String, Pair<InetAddress, IMessageCallback>>((long) (1.1 * DatabaseDescriptor.getRpcTimeout()), timeoutReporter);
defaultExecutor_ = new JMXEnabledThreadPoolExecutor("MISCELLANEOUS-POOL");
streamExecutor_ = new JMXEnabledThreadPoolExecutor("MESSAGE-STREAMING-POOL");
@@ -133,6 +126,18 @@ public class MessagingService
timer.schedule(logDropped, LOG_DROPPED_INTERVAL_IN_MS, LOG_DROPPED_INTERVAL_IN_MS);
}
+ /**
+ * Track latency information for the dynamic snitch
+ * @param cb: the callback associated with this message -- this lets us know if it's a message type we're interested in
+ * @param address: the host that replied to the message
+ * @param latency
+ */
+ public void maybeAddLatency(IMessageCallback cb, InetAddress address, double latency)
+ {
+ if (cb instanceof QuorumResponseHandler || cb instanceof AsyncResult)
+ addLatency(address, latency);
+ }
+
public void addLatency(InetAddress address, double latency)
{
for (ILatencySubscriber subscriber : subscribers)
@@ -227,49 +232,9 @@ public class MessagingService
return verbHandlers_.get(type);
}
- /**
- * Send a message to a given endpoint.
- * @param message message to be sent.
- * @param to endpoint to which the message needs to be sent
- * @return an reference to an IAsyncResult which can be queried for the
- * response
- */
- public String sendRR(Message message, InetAddress[] to, IAsyncCallback cb)
- {
- String messageId = message.getMessageId();
- addCallback(cb, messageId);
- for (InetAddress endpoint : to)
- {
- putTarget(messageId, endpoint);
- sendOneWay(message, endpoint);
- }
- return messageId;
- }
-
- private static void putTarget(String messageId, InetAddress endpoint)
- {
- Collection<InetAddress> addresses = targets.get(messageId);
- if (addresses == null)
- {
- addresses = new NonBlockingHashSet<InetAddress>();
- Collection<InetAddress> oldAddresses = targets.putIfAbsent(messageId, addresses);
- if (oldAddresses != null)
- addresses = oldAddresses;
- }
- addresses.add(endpoint);
- }
-
- private static void removeTarget(String messageId, InetAddress from)
- {
- Collection<InetAddress> addresses = targets.get(messageId);
- // null is expected if we removed the callback or we got a reply after its timeout expired
- if (addresses != null)
- addresses.remove(from);
- }
-
- public void addCallback(IAsyncCallback cb, String messageId)
+ private void addCallback(IMessageCallback cb, String messageId, InetAddress to)
{
- callbacks.put(messageId, cb);
+ callbacks.put(messageId, new Pair<InetAddress, IMessageCallback>(to, cb));
}
/**
@@ -285,43 +250,12 @@ public class MessagingService
public String sendRR(Message message, InetAddress to, IAsyncCallback cb)
{
String messageId = message.getMessageId();
- addCallback(cb, messageId);
- putTarget(messageId, to);
+ addCallback(cb, messageId, to);
sendOneWay(message, to);
return messageId;
}
/**
- * Send a message to a given endpoint. The ith element in the <code>messages</code>
- * array is sent to the ith element in the <code>to</code> array.This method assumes
- * there is a one-one mapping between the <code>messages</code> array and
- * the <code>to</code> array. Otherwise an IllegalArgumentException will be thrown.
- * This method also informs the MessagingService to wait for at least
- * <code>howManyResults</code> responses to determine success of failure.
- * @param messages messages to be sent.
- * @param to endpoints to which the message needs to be sent
- * @param cb callback interface which is used to pass the responses or
- * suggest that a timeout occured to the invoker of the send().
- * @return an reference to message id used to match with the result
- */
- public String sendRR(Message[] messages, InetAddress[] to, IAsyncCallback cb)
- {
- if ( messages.length != to.length )
- {
- throw new IllegalArgumentException("Number of messages and the number of endpoints need to be same.");
- }
- String groupId = GuidGenerator.guid();
- addCallback(cb, groupId);
- for ( int i = 0; i < messages.length; ++i )
- {
- messages[i].setMessageId(groupId);
- putTarget(groupId, to[i]);
- sendOneWay(messages[i], to[i]);
- }
- return groupId;
- }
-
- /**
* Send a message to a given endpoint. This method adheres to the fire and forget
* style messaging.
* @param message messages to be sent.
@@ -368,8 +302,7 @@ public class MessagingService
public IAsyncResult sendRR(Message message, InetAddress to)
{
IAsyncResult iar = new AsyncResult();
- callbacks.put(message.getMessageId(), iar);
- putTarget(message.getMessageId(), to);
+ addCallback(iar, message.getMessageId(), to);
sendOneWay(message, to);
return iar;
}
@@ -443,14 +376,8 @@ public class MessagingService
}
}
- public static IMessageCallback getRegisteredCallback(String messageId)
- {
- return callbacks.get(messageId);
- }
-
- public static IMessageCallback removeRegisteredCallback(String messageId)
+ public static Pair<InetAddress, IMessageCallback> removeRegisteredCallback(String messageId)
{
- targets.remove(messageId); // TODO fix this when we clean up quorum reads to do proper RR
return callbacks.remove(messageId);
}
@@ -459,11 +386,6 @@ public class MessagingService
return callbacks.getAge(messageId);
}
- public static void responseReceivedFrom(String messageId, InetAddress from)
- {
- removeTarget(messageId, from);
- }
-
public static void validateMagic(int magic) throws IOException
{
if (magic != PROTOCOL_MAGIC)
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/ResponseVerbHandler.java?rev=1064193&r1=1064192&r2=1064193&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/ResponseVerbHandler.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/ResponseVerbHandler.java Thu Jan 27 16:53:44 2011
@@ -18,8 +18,12 @@
package org.apache.cassandra.net;
+import java.net.InetAddress;
+
import org.apache.log4j.Logger;
+import org.apache.cassandra.utils.Pair;
+
public class ResponseVerbHandler implements IVerbHandler
{
private static final Logger logger_ = Logger.getLogger( ResponseVerbHandler.class );
@@ -27,14 +31,13 @@ public class ResponseVerbHandler impleme
public void doVerb(Message message)
{
String messageId = message.getMessageId();
- MessagingService.responseReceivedFrom(messageId, message.getFrom());
double age = System.currentTimeMillis() - MessagingService.getRegisteredCallbackAge(messageId);
- IMessageCallback cb = MessagingService.getRegisteredCallback(messageId);
- if (cb == null)
+ Pair<InetAddress, IMessageCallback> pair = MessagingService.removeRegisteredCallback(messageId);
+ if (pair == null)
return;
- // if cb is not null, then age will be valid
- MessagingService.instance.addLatency(message.getFrom(), age);
+ IMessageCallback cb = pair.right;
+ MessagingService.instance.maybeAddLatency(cb, message.getFrom(), age);
if (cb instanceof IAsyncCallback)
{
@@ -49,5 +52,4 @@ public class ResponseVerbHandler impleme
((IAsyncResult) cb).result(message);
}
}
-
}
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java?rev=1064193&r1=1064192&r2=1064193&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java Thu Jan 27 16:53:44 2011
@@ -84,15 +84,16 @@ class ConsistencyChecker implements Runn
ReadCommand readCommandDigestOnly = constructReadMessage(true);
try
{
- Message message = readCommandDigestOnly.makeReadMessage();
- if (logger_.isDebugEnabled())
- logger_.debug("Reading consistency digest for " + readCommand_.key + " from " + message.getMessageId() + "@[" + StringUtils.join(replicas_, ", ") + "]");
-
- MessagingService.instance.addCallback(new DigestResponseHandler(), message.getMessageId());
+ DigestResponseHandler handler = new DigestResponseHandler();
for (InetAddress endpoint : replicas_)
{
if (!endpoint.equals(dataSource))
- MessagingService.instance.sendOneWay(message, endpoint);
+ {
+ Message message = readCommandDigestOnly.makeReadMessage();
+ if (logger_.isDebugEnabled())
+ logger_.debug("Reading consistency digest for " + readCommand_.key + " from " + message.getMessageId() + "@" + endpoint);
+ MessagingService.instance.sendRR(message, endpoint, handler);
+ }
}
}
catch (IOException ex)
@@ -128,14 +129,16 @@ class ConsistencyChecker implements Runn
if (!Arrays.equals(dataDigest, digest))
{
ReadCommand readCommand = constructReadMessage(false);
- Message message = readCommand.makeReadMessage();
- if (logger_.isDebugEnabled())
- logger_.debug("Digest mismatch; re-reading " + readCommand_.key + " from " + message.getMessageId() + "@[" + StringUtils.join(replicas_, ", ") + "]");
- MessagingService.instance.addCallback(new DataRepairHandler(), message.getMessageId());
+ DataRepairHandler handler = new DataRepairHandler();
for (InetAddress endpoint : replicas_)
{
if (!endpoint.equals(dataSource))
- MessagingService.instance.sendOneWay(message, endpoint);
+ {
+ Message message = readCommand.makeReadMessage();
+ if (logger_.isDebugEnabled())
+ logger_.debug("Digest mismatch; re-reading " + readCommand_.key + " from " + message.getMessageId() + "@" + endpoint);
+ MessagingService.instance.sendRR(message, endpoint, handler);
+ }
}
repairInvoked = true;
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/QuorumResponseHandler.java?rev=1064193&r1=1064192&r2=1064193&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/QuorumResponseHandler.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/QuorumResponseHandler.java Thu Jan 27 16:53:44 2011
@@ -49,35 +49,25 @@ public class QuorumResponseHandler<T> im
public T get() throws TimeoutException, DigestMismatchException, IOException
{
+ long timeout = DatabaseDescriptor.getRpcTimeout() - (System.currentTimeMillis() - startTime);
+ boolean success;
try
{
- long timeout = DatabaseDescriptor.getRpcTimeout() - (System.currentTimeMillis() - startTime);
- boolean success;
- try
- {
- success = condition.await(timeout, TimeUnit.MILLISECONDS);
- }
- catch (InterruptedException ex)
- {
- throw new AssertionError(ex);
- }
-
- if (!success)
- {
- StringBuilder sb = new StringBuilder("");
- for (Message message : responses)
- {
- sb.append(message.getFrom());
- }
- throw new TimeoutException("Operation timed out - received only " + responses.size() + " responses from " + sb.toString() + " .");
- }
+ success = condition.await(timeout, TimeUnit.MILLISECONDS);
}
- finally
+ catch (InterruptedException ex)
+ {
+ throw new AssertionError(ex);
+ }
+
+ if (!success)
{
- for (Message response : responses)
+ StringBuilder sb = new StringBuilder("");
+ for (Message message : responses)
{
- MessagingService.removeRegisteredCallback(response.getMessageId());
+ sb.append(message.getFrom());
}
+ throw new TimeoutException("Operation timed out - received only " + responses.size() + " responses from " + sb.toString() + " .");
}
return responseResolver.resolve(responses);
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1064193&r1=1064192&r2=1064193&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java Thu Jan 27 16:53:44 2011
@@ -210,7 +210,6 @@ public class StorageProxy implements Sto
// send out the writes, as in mutate() above, but this time with a callback that tracks responses
final WriteResponseHandler responseHandler = ss.getWriteResponseHandler(blockFor, consistency_level, table);
responseHandlers.add(responseHandler);
- Message unhintedMessage = null;
for (Map.Entry<InetAddress, Collection<InetAddress>> entry : hintedEndpoints.asMap().entrySet())
{
InetAddress destination = entry.getKey();
@@ -226,14 +225,10 @@ public class StorageProxy implements Sto
else
{
// belongs on a different server. send it there.
- if (unhintedMessage == null)
- {
- unhintedMessage = rm.makeRowMutationMessage();
- MessagingService.instance.addCallback(responseHandler, unhintedMessage.getMessageId());
- }
+ Message unhintedMessage = rm.makeRowMutationMessage();
if (logger.isDebugEnabled())
logger.debug("insert writing key " + rm.key() + " to " + unhintedMessage.getMessageId() + "@" + destination);
- MessagingService.instance.sendOneWay(unhintedMessage, destination);
+ MessagingService.instance.sendRR(unhintedMessage, destination, responseHandler);
}
}
else
@@ -251,8 +246,9 @@ public class StorageProxy implements Sto
}
// (non-destination hints are part of the callback and count towards consistency only under CL.ANY)
if (writeEndpoints.contains(destination) || consistency_level == ConsistencyLevel.ANY)
- MessagingService.instance.addCallback(responseHandler, hintedMessage.getMessageId());
- MessagingService.instance.sendOneWay(hintedMessage, destination);
+ MessagingService.instance.sendRR(hintedMessage, destination, responseHandler);
+ else
+ MessagingService.instance.sendOneWay(hintedMessage, destination);
}
}
}
@@ -441,17 +437,15 @@ public class StorageProxy implements Sto
private static List<Row> strongRead(List<ReadCommand> commands, ConsistencyLevel consistency_level) throws IOException, UnavailableException, TimeoutException
{
List<QuorumResponseHandler<Row>> quorumResponseHandlers = new ArrayList<QuorumResponseHandler<Row>>();
- List<InetAddress[]> commandEndPoints = new ArrayList<InetAddress[]>();
+ List<List<InetAddress>> commandEndPoints = new ArrayList<List<InetAddress>>();
List<Row> rows = new ArrayList<Row>();
// send out read requests
for (ReadCommand command: commands)
{
assert !command.isDigestQuery();
- ReadCommand readMessageDigestOnly = command.copy();
- readMessageDigestOnly.setDigestQuery(true);
- Message message = command.makeReadMessage();
- Message messageDigestOnly = readMessageDigestOnly.makeReadMessage();
+ ReadCommand digestCommand = command.copy();
+ digestCommand.setDigestQuery(true);
InetAddress dataPoint = StorageService.instance.findSuitableEndPoint(command.table, command.key);
List<InetAddress> endpointList = StorageService.instance.getLiveNaturalEndpoints(command.table, command.key);
@@ -460,24 +454,19 @@ public class StorageProxy implements Sto
if (endpointList.size() < responseCount)
throw new UnavailableException();
- InetAddress[] endPoints = new InetAddress[endpointList.size()];
- Message messages[] = new Message[endpointList.size()];
+ ReadResponseResolver resolver = new ReadResponseResolver(command.table, command.key, responseCount);
+ QuorumResponseHandler<Row> quorumResponseHandler = new QuorumResponseHandler<Row>(responseCount, resolver);
// data-request message is sent to dataPoint, the node that will actually get
// the data for us. The other replicas are only sent a digest query.
- int n = 0;
for (InetAddress endpoint : endpointList)
{
- Message m = endpoint.equals(dataPoint) ? message : messageDigestOnly;
- endPoints[n] = endpoint;
- messages[n++] = m;
+ Message m = endpoint.equals(dataPoint) ? command.makeReadMessage() : digestCommand.makeReadMessage();
if (logger.isDebugEnabled())
- logger.debug("strongread reading " + (m == message ? "data" : "digest") + " for " + command + " from " + m.getMessageId() + "@" + endpoint);
+ logger.debug("strongread reading " + (endpoint.equals(dataPoint) ? "data" : "digest") + " for " + command + " from " + m.getMessageId() + "@" + endpoint);
+ MessagingService.instance.sendRR(m, endpoint, quorumResponseHandler);
}
- ReadResponseResolver resolver = new ReadResponseResolver(command.table, command.key, responseCount);
- QuorumResponseHandler<Row> quorumResponseHandler = new QuorumResponseHandler<Row>(responseCount, resolver);
- MessagingService.instance.sendRR(messages, endPoints, quorumResponseHandler);
quorumResponseHandlers.add(quorumResponseHandler);
- commandEndPoints.add(endPoints);
+ commandEndPoints.add(endpointList);
}
// read results and make a second pass for any digest mismatches
@@ -506,8 +495,11 @@ public class StorageProxy implements Sto
int responseCount = determineBlockFor(DatabaseDescriptor.getReplicationFactor(command.table), consistency_level);
ReadResponseResolver resolver = new ReadResponseResolver(command.table, command.key, responseCount);
QuorumResponseHandler<Row> qrhRepair = new QuorumResponseHandler<Row>(responseCount, resolver);
- Message messageRepair = command.makeReadMessage();
- MessagingService.instance.sendRR(messageRepair, commandEndPoints.get(i), qrhRepair);
+ for (InetAddress endPoint : commandEndPoints.get(i))
+ {
+ Message messageRepair = command.makeReadMessage();
+ MessagingService.instance.sendRR(messageRepair, endPoint, qrhRepair);
+ }
if (repairResponseHandlers == null)
repairResponseHandlers = new ArrayList<QuorumResponseHandler<Row>>();
repairResponseHandlers.add(qrhRepair);
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/WriteResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/WriteResponseHandler.java?rev=1064193&r1=1064192&r2=1064193&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/WriteResponseHandler.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/WriteResponseHandler.java Thu Jan 27 16:53:44 2011
@@ -55,30 +55,20 @@ public class WriteResponseHandler implem
public void get() throws TimeoutException
{
+ long timeout = DatabaseDescriptor.getRpcTimeout() - (System.currentTimeMillis() - startTime);
+ boolean success;
try
{
- long timeout = DatabaseDescriptor.getRpcTimeout() - (System.currentTimeMillis() - startTime);
- boolean success;
- try
- {
- success = condition.await(timeout, TimeUnit.MILLISECONDS);
- }
- catch (InterruptedException ex)
- {
- throw new AssertionError(ex);
- }
-
- if (!success)
- {
- throw new TimeoutException("Operation timed out - received only " + responses.size() + localResponses + " responses");
- }
+ success = condition.await(timeout, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException ex)
+ {
+ throw new AssertionError(ex);
}
- finally
+
+ if (!success)
{
- for (Message response : responses)
- {
- MessagingService.removeRegisteredCallback(response.getMessageId());
- }
+ throw new TimeoutException("Operation timed out - received only " + responses.size() + localResponses + " responses");
}
}
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/ExpiringMap.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/ExpiringMap.java?rev=1064193&r1=1064192&r2=1064193&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/ExpiringMap.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/ExpiringMap.java Thu Jan 27 16:53:44 2011
@@ -18,10 +18,7 @@
package org.apache.cassandra.utils;
-import java.util.Enumeration;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
+import java.util.*;
import java.util.concurrent.Callable;
import com.google.common.base.Function;
@@ -33,7 +30,7 @@ import org.cliffc.high_scale_lib.NonBloc
public class ExpiringMap<K, V>
{
private static final Logger logger = LoggerFactory.getLogger(ExpiringMap.class);
- private final Function<K, ?> postExpireHook;
+ private final Function<Pair<K,V>, ?> postExpireHook;
private static class CacheableObject<T>
{
@@ -69,18 +66,12 @@ public class ExpiringMap<K, V>
@Override
public void run()
{
- synchronized (cache)
+ for (Map.Entry<K, CacheableObject> entry : cache.entrySet())
{
- Enumeration<K> e = cache.keys();
- while (e.hasMoreElements())
+ if (entry.getValue().isReadyToDie(expiration))
{
- K key = e.nextElement();
- CacheableObject co = cache.get(key);
- if (co != null && co.isReadyToDie(expiration))
- {
- cache.remove(key);
- postExpireHook.apply(key);
- }
+ cache.remove(entry.getKey());
+ postExpireHook.apply(new Pair(entry.getKey(), entry.getValue().getValue()));
}
}
}
@@ -99,7 +90,7 @@ public class ExpiringMap<K, V>
*
* @param expiration the TTL for objects in the cache in milliseconds
*/
- public ExpiringMap(long expiration, Function<K, ?> postExpireHook)
+ public ExpiringMap(long expiration, Function<Pair<K,V>, ?> postExpireHook)
{
this.postExpireHook = postExpireHook;
if (expiration <= 0)