You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/05/03 05:26:10 UTC

svn commit: r771019 - /incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java

Author: jbellis
Date: Sun May  3 03:26:07 2009
New Revision: 771019

URL: http://svn.apache.org/viewvc?rev=771019&view=rev
Log:
make sendMessage only return true after ack by recipient.
patch by Jun Rao; reviewed by jbellis for CASSANDRA-34

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=771019&r1=771018&r2=771019&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Sun May  3 03:26:07 2009
@@ -21,8 +21,10 @@
 import java.util.Collection;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.io.IOException;
 
 import org.apache.log4j.Logger;
 
@@ -33,8 +35,7 @@
 import org.apache.cassandra.net.EndPoint;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.IComponentShutdown;
-import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.*;
 
 
 /**
@@ -83,26 +84,22 @@
         	endPoint_ = endPoint;
         }
 
-        private boolean sendMessage(String endpointAddress, String key) throws Exception
+        private boolean sendMessage(String endpointAddress, String key) throws DigestMismatchException, TimeoutException, IOException
         {
-        	boolean success = false; // TODO : fix the hack we need to make sure the data is written on the other end.
-        	if(FailureDetector.instance().isAlive(new EndPoint(endpointAddress, DatabaseDescriptor.getControlPort())))
-        	{
-        		success = true;
-        	}
-        	else
-        	{
-        		return success;
-        	}
-        	Table table = Table.open(DatabaseDescriptor.getTables().get(0));
-        	Row row = null;
-        	row = table.get(key);
-        	RowMutation rm = new RowMutation(DatabaseDescriptor.getTables().get(0), row);
-			RowMutationMessage rmMsg = new RowMutationMessage(rm);
-			Message message = RowMutationMessage.makeRowMutationMessage( rmMsg );
-			EndPoint endPoint = new EndPoint(endpointAddress, DatabaseDescriptor.getStoragePort());
-			MessagingService.getMessagingInstance().sendOneWay(message, endPoint);
-			return success;
+            EndPoint endPoint = new EndPoint(endpointAddress, DatabaseDescriptor.getStoragePort());
+            if (!FailureDetector.instance().isAlive(endPoint))
+            {
+                return false;
+            }
+
+            Table table = Table.open(DatabaseDescriptor.getTables().get(0));
+            Row row = table.get(key);
+            RowMutation rm = new RowMutation(DatabaseDescriptor.getTables().get(0), row);
+            Message message = rm.makeRowMutationMessage();
+
+            QuorumResponseHandler<Boolean> quorumResponseHandler = new QuorumResponseHandler<Boolean>(1, new WriteResponseResolver());
+            MessagingService.getMessagingInstance().sendRR(message, new EndPoint[]{ endPoint }, quorumResponseHandler);
+            return quorumResponseHandler.get();
         }
 
         private void deleteEndPoint(String endpointAddress, String key) throws Exception