You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/05/03 05:26:10 UTC
svn commit: r771019 -
/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
Author: jbellis
Date: Sun May 3 03:26:07 2009
New Revision: 771019
URL: http://svn.apache.org/viewvc?rev=771019&view=rev
Log:
make sendMessage only return true after ack by recipient.
patch by Jun Rao; reviewed by jbellis for CASSANDRA-34
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=771019&r1=771018&r2=771019&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Sun May 3 03:26:07 2009
@@ -21,8 +21,10 @@
import java.util.Collection;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import java.io.IOException;
import org.apache.log4j.Logger;
@@ -33,8 +35,7 @@
import org.apache.cassandra.net.EndPoint;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.IComponentShutdown;
-import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.*;
/**
@@ -83,26 +84,22 @@
endPoint_ = endPoint;
}
- private boolean sendMessage(String endpointAddress, String key) throws Exception
+ private boolean sendMessage(String endpointAddress, String key) throws DigestMismatchException, TimeoutException, IOException
{
- boolean success = false; // TODO : fix the hack we need to make sure the data is written on the other end.
- if(FailureDetector.instance().isAlive(new EndPoint(endpointAddress, DatabaseDescriptor.getControlPort())))
- {
- success = true;
- }
- else
- {
- return success;
- }
- Table table = Table.open(DatabaseDescriptor.getTables().get(0));
- Row row = null;
- row = table.get(key);
- RowMutation rm = new RowMutation(DatabaseDescriptor.getTables().get(0), row);
- RowMutationMessage rmMsg = new RowMutationMessage(rm);
- Message message = RowMutationMessage.makeRowMutationMessage( rmMsg );
- EndPoint endPoint = new EndPoint(endpointAddress, DatabaseDescriptor.getStoragePort());
- MessagingService.getMessagingInstance().sendOneWay(message, endPoint);
- return success;
+ EndPoint endPoint = new EndPoint(endpointAddress, DatabaseDescriptor.getStoragePort());
+ if (!FailureDetector.instance().isAlive(endPoint))
+ {
+ return false;
+ }
+
+ Table table = Table.open(DatabaseDescriptor.getTables().get(0));
+ Row row = table.get(key);
+ RowMutation rm = new RowMutation(DatabaseDescriptor.getTables().get(0), row);
+ Message message = rm.makeRowMutationMessage();
+
+ QuorumResponseHandler<Boolean> quorumResponseHandler = new QuorumResponseHandler<Boolean>(1, new WriteResponseResolver());
+ MessagingService.getMessagingInstance().sendRR(message, new EndPoint[]{ endPoint }, quorumResponseHandler);
+ return quorumResponseHandler.get();
}
private void deleteEndPoint(String endpointAddress, String key) throws Exception