You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/10/12 21:57:59 UTC
svn commit: r824477 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra:
net/ResponseVerbHandler.java service/ConsistencyManager.java
service/QuorumResponseHandler.java utils/SimpleCondition.java
Author: jbellis
Date: Mon Oct 12 19:57:58 2009
New Revision: 824477
URL: http://svn.apache.org/viewvc?rev=824477&view=rev
Log:
move callback synchronization into ResponseVerbHandler where it's less likely to be a maintenance problem
patch by jbellis for CASSANDRA-478
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/SimpleCondition.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java?rev=824477&r1=824476&r2=824477&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java Mon Oct 12 19:57:58 2009
@@ -28,19 +28,22 @@
{
String messageId = message.getMessageId();
IAsyncCallback cb = MessagingService.getRegisteredCallback(messageId);
- if ( cb != null )
+ if (cb != null)
{
if (logger_.isDebugEnabled())
- logger_.debug("Processing response on a callback from " + message.getMessageId() + "@" + message.getFrom());
- cb.response(message);
+ logger_.debug("Processing response on a callback from " + message.getMessageId() + "@" + message.getFrom());
+ synchronized (cb)
+ {
+ cb.response(message);
+ }
}
else
- {
+ {
IAsyncResult ar = MessagingService.getAsyncResult(messageId);
- if ( ar != null )
+ if (ar != null)
{
if (logger_.isDebugEnabled())
- logger_.debug("Processing response on an async result from " + message.getMessageId() + "@" + message.getFrom());
+ logger_.debug("Processing response on an async result from " + message.getMessageId() + "@" + message.getFrom());
ar.result(message);
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java?rev=824477&r1=824476&r2=824477&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java Mon Oct 12 19:57:58 2009
@@ -47,7 +47,7 @@
{
List<Message> responses_ = new ArrayList<Message>();
- public synchronized void response(Message msg)
+ public void response(Message msg)
{
responses_.add(msg);
if (responses_.size() == ConsistencyManager.this.replicas_.size())
@@ -104,7 +104,7 @@
majority_ = (responseCount >> 1) + 1;
}
- public synchronized void response(Message message)
+ public void response(Message message)
{
if (logger_.isDebugEnabled())
logger_.debug("Received responses in DataRepairHandler : " + message.toString());
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java?rev=824477&r1=824476&r2=824477&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java Mon Oct 12 19:57:58 2009
@@ -31,17 +31,17 @@
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.LogUtil;
+import org.apache.cassandra.utils.SimpleCondition;
+
import org.apache.log4j.Logger;
public class QuorumResponseHandler<T> implements IAsyncCallback
{
private static Logger logger_ = Logger.getLogger( QuorumResponseHandler.class );
- private Lock lock_ = new ReentrantLock();
- private Condition condition_;
+ private SimpleCondition condition_ = new SimpleCondition();
private int responseCount_;
private List<Message> responses_ = new ArrayList<Message>();
private IResponseResolver<T> responseResolver_;
- private AtomicBoolean done_ = new AtomicBoolean(false);
private long startTime_;
public QuorumResponseHandler(int responseCount, IResponseResolver<T> responseResolver) throws InvalidRequestException
@@ -50,7 +50,6 @@
throw new InvalidRequestException("Cannot block for more than the replication factor of " + DatabaseDescriptor.getReplicationFactor());
if (responseCount < 1)
throw new InvalidRequestException("Cannot block for less than one replica");
- condition_ = lock_.newCondition();
responseCount_ = responseCount;
responseResolver_ = responseResolver;
startTime_ = System.currentTimeMillis();
@@ -58,31 +57,20 @@
public T get() throws TimeoutException, DigestMismatchException, IOException
{
- lock_.lock();
try
{
- boolean bVal = true;
+ long timeout = System.currentTimeMillis() - startTime_ + DatabaseDescriptor.getRpcTimeout();
+ boolean success;
try
{
- if (!done_.get())
- {
- long timeout = System.currentTimeMillis() - startTime_ + DatabaseDescriptor.getRpcTimeout();
- if (timeout > 0)
- {
- bVal = condition_.await(timeout, TimeUnit.MILLISECONDS);
- }
- else
- {
- bVal = false;
- }
- }
+ success = condition_.await(timeout, TimeUnit.MILLISECONDS);
}
catch (InterruptedException ex)
{
throw new AssertionError(ex);
}
- if (!bVal && !done_.get())
+ if (!success)
{
StringBuilder sb = new StringBuilder("");
for (Message message : responses_)
@@ -94,7 +82,6 @@
}
finally
{
- lock_.unlock();
for (Message response : responses_)
{
MessagingService.removeRegisteredCallback(response.getMessageId());
@@ -106,22 +93,13 @@
public void response(Message message)
{
- lock_.lock();
- try
- {
- if (!done_.get())
- {
- responses_.add(message);
- if (responses_.size() >= responseCount_ && responseResolver_.isDataPresent(responses_))
- {
- done_.set(true);
- condition_.signal();
- }
- }
- }
- finally
+ if (condition_.isSignaled())
+ return;
+
+ responses_.add(message);
+ if (responses_.size() >= responseCount_ && responseResolver_.isDataPresent(responses_))
{
- lock_.unlock();
+ condition_.signal();
}
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/SimpleCondition.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/SimpleCondition.java?rev=824477&r1=824476&r2=824477&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/SimpleCondition.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/SimpleCondition.java Mon Oct 12 19:57:58 2009
@@ -9,7 +9,7 @@
// _after_ signal(), it will work as desired.)
public class SimpleCondition implements Condition
{
- volatile boolean set;
+ boolean set;
public synchronized void await() throws InterruptedException
{
@@ -17,6 +17,19 @@
wait();
}
+ public synchronized boolean await(long time, TimeUnit unit) throws InterruptedException
+ {
+ // micro/nanoseconds not supported
+ assert unit == TimeUnit.DAYS || unit == TimeUnit.HOURS || unit == TimeUnit.MINUTES || unit == TimeUnit.SECONDS || unit == TimeUnit.MILLISECONDS;
+
+ long end = System.currentTimeMillis() + unit.convert(time, TimeUnit.MILLISECONDS);
+ while (!set && end > System.currentTimeMillis())
+ {
+ TimeUnit.MILLISECONDS.timedWait(this, end - System.currentTimeMillis());
+ }
+ return set;
+ }
+
public synchronized void signal()
{
set = true;
@@ -29,17 +42,17 @@
notifyAll();
}
- public void awaitUninterruptibly()
+ public synchronized boolean isSignaled()
{
- throw new UnsupportedOperationException();
+ return set;
}
- public long awaitNanos(long nanosTimeout) throws InterruptedException
+ public void awaitUninterruptibly()
{
throw new UnsupportedOperationException();
}
- public boolean await(long time, TimeUnit unit) throws InterruptedException
+ public long awaitNanos(long nanosTimeout) throws InterruptedException
{
throw new UnsupportedOperationException();
}