You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/10/12 21:57:59 UTC

svn commit: r824477 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra: net/ResponseVerbHandler.java service/ConsistencyManager.java service/QuorumResponseHandler.java utils/SimpleCondition.java

Author: jbellis
Date: Mon Oct 12 19:57:58 2009
New Revision: 824477

URL: http://svn.apache.org/viewvc?rev=824477&view=rev
Log:
move callback synchronization into ResponseVerbHandler where it's less likely to be a maintenance problem
patch by jbellis for CASSANDRA-478

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/SimpleCondition.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java?rev=824477&r1=824476&r2=824477&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java Mon Oct 12 19:57:58 2009
@@ -28,19 +28,22 @@
     {     
         String messageId = message.getMessageId();        
         IAsyncCallback cb = MessagingService.getRegisteredCallback(messageId);
-        if ( cb != null )
+        if (cb != null)
         {
             if (logger_.isDebugEnabled())
-              logger_.debug("Processing response on a callback from " + message.getMessageId() + "@" + message.getFrom());
-            cb.response(message);
+                logger_.debug("Processing response on a callback from " + message.getMessageId() + "@" + message.getFrom());
+            synchronized (cb)
+            {
+                cb.response(message);
+            }
         }
         else
-        {            
+        {
             IAsyncResult ar = MessagingService.getAsyncResult(messageId);
-            if ( ar != null )
+            if (ar != null)
             {
                 if (logger_.isDebugEnabled())
-                  logger_.debug("Processing response on an async result from " + message.getMessageId() + "@" + message.getFrom());
+                    logger_.debug("Processing response on an async result from " + message.getMessageId() + "@" + message.getFrom());
                 ar.result(message);
             }
         }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java?rev=824477&r1=824476&r2=824477&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java Mon Oct 12 19:57:58 2009
@@ -47,7 +47,7 @@
 	{
 		List<Message> responses_ = new ArrayList<Message>();
 
-		public synchronized void response(Message msg)
+		public void response(Message msg)
 		{
 			responses_.add(msg);
             if (responses_.size() == ConsistencyManager.this.replicas_.size())
@@ -104,7 +104,7 @@
 			majority_ = (responseCount >> 1) + 1;  
 		}
 		
-		public synchronized void response(Message message)
+		public void response(Message message)
 		{
 			if (logger_.isDebugEnabled())
 			  logger_.debug("Received responses in DataRepairHandler : " + message.toString());

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java?rev=824477&r1=824476&r2=824477&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java Mon Oct 12 19:57:58 2009
@@ -31,17 +31,17 @@
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.LogUtil;
+import org.apache.cassandra.utils.SimpleCondition;
+
 import org.apache.log4j.Logger;
 
 public class QuorumResponseHandler<T> implements IAsyncCallback
 {
     private static Logger logger_ = Logger.getLogger( QuorumResponseHandler.class );
-    private Lock lock_ = new ReentrantLock();
-    private Condition condition_;
+    private SimpleCondition condition_ = new SimpleCondition();
     private int responseCount_;
     private List<Message> responses_ = new ArrayList<Message>();
     private IResponseResolver<T> responseResolver_;
-    private AtomicBoolean done_ = new AtomicBoolean(false);
     private long startTime_;
 
     public QuorumResponseHandler(int responseCount, IResponseResolver<T> responseResolver) throws InvalidRequestException
@@ -50,7 +50,6 @@
             throw new InvalidRequestException("Cannot block for more than the replication factor of " + DatabaseDescriptor.getReplicationFactor());
         if (responseCount < 1)
             throw new InvalidRequestException("Cannot block for less than one replica");
-        condition_ = lock_.newCondition();
         responseCount_ = responseCount;
         responseResolver_ =  responseResolver;
         startTime_ = System.currentTimeMillis();
@@ -58,31 +57,20 @@
     
     public T get() throws TimeoutException, DigestMismatchException, IOException
     {
-        lock_.lock();
         try
         {
-            boolean bVal = true;
+            long timeout = System.currentTimeMillis() - startTime_ + DatabaseDescriptor.getRpcTimeout();
+            boolean success;
             try
             {
-                if (!done_.get())
-                {
-                    long timeout = System.currentTimeMillis() - startTime_ + DatabaseDescriptor.getRpcTimeout();
-                    if (timeout > 0)
-                    {
-                        bVal = condition_.await(timeout, TimeUnit.MILLISECONDS);
-                    }
-                    else
-                    {
-                        bVal = false;
-                    }
-                }
+                success = condition_.await(timeout, TimeUnit.MILLISECONDS);
             }
             catch (InterruptedException ex)
             {
                 throw new AssertionError(ex);
             }
 
-            if (!bVal && !done_.get())
+            if (!success)
             {
                 StringBuilder sb = new StringBuilder("");
                 for (Message message : responses_)
@@ -94,7 +82,6 @@
         }
         finally
         {
-            lock_.unlock();
             for (Message response : responses_)
             {
                 MessagingService.removeRegisteredCallback(response.getMessageId());
@@ -106,22 +93,13 @@
     
     public void response(Message message)
     {
-        lock_.lock();
-        try
-        {
-            if (!done_.get())
-            {
-                responses_.add(message);
-                if (responses_.size() >= responseCount_ && responseResolver_.isDataPresent(responses_))
-                {
-                    done_.set(true);
-                    condition_.signal();
-                }
-            }
-        }
-        finally
+        if (condition_.isSignaled())
+            return;
+
+        responses_.add(message);
+        if (responses_.size() >= responseCount_ && responseResolver_.isDataPresent(responses_))
         {
-            lock_.unlock();
+            condition_.signal();
         }
     }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/SimpleCondition.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/SimpleCondition.java?rev=824477&r1=824476&r2=824477&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/SimpleCondition.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/SimpleCondition.java Mon Oct 12 19:57:58 2009
@@ -9,7 +9,7 @@
 // _after_ signal(), it will work as desired.)
 public class SimpleCondition implements Condition
 {
-    volatile boolean set;
+    boolean set;
 
     public synchronized void await() throws InterruptedException
     {
@@ -17,6 +17,19 @@
             wait();
     }
 
+    public synchronized boolean await(long time, TimeUnit unit) throws InterruptedException
+    {
+        // micro/nanoseconds not supported
+        assert unit == TimeUnit.DAYS || unit == TimeUnit.HOURS || unit == TimeUnit.MINUTES || unit == TimeUnit.SECONDS || unit == TimeUnit.MILLISECONDS;
+
+        long end = System.currentTimeMillis() + unit.convert(time, TimeUnit.MILLISECONDS);
+        while (!set && end > System.currentTimeMillis())
+        {
+            TimeUnit.MILLISECONDS.timedWait(this, end - System.currentTimeMillis());
+        }
+        return set;
+    }
+
     public synchronized void signal()
     {
         set = true;
@@ -29,17 +42,17 @@
         notifyAll();
     }
 
-    public void awaitUninterruptibly()
+    public synchronized boolean isSignaled()
     {
-        throw new UnsupportedOperationException();
+        return set;
     }
 
-    public long awaitNanos(long nanosTimeout) throws InterruptedException
+    public void awaitUninterruptibly()
     {
         throw new UnsupportedOperationException();
     }
 
-    public boolean await(long time, TimeUnit unit) throws InterruptedException
+    public long awaitNanos(long nanosTimeout) throws InterruptedException
     {
         throw new UnsupportedOperationException();
     }