You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by gd...@apache.org on 2010/07/20 18:55:52 UTC

svn commit: r965906 - in /cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra: net/MessagingService.java net/OutboundTcpConnection.java service/StorageService.java

Author: gdusbabek
Date: Tue Jul 20 16:55:51 2010
New Revision: 965906

URL: http://svn.apache.org/viewvc?rev=965906&view=rev
Log:
outbound tcp connections were never being shutdown properly, causing occasional dropped messages. patch by gdusbabek, reviewed by jbellis. CASSANDRA-1221

Modified:
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java?rev=965906&r1=965905&r2=965906&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java Tue Jul 20 16:55:51 2010
@@ -50,7 +50,7 @@ import java.util.concurrent.ExecutorServ
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
-public class MessagingService implements IFailureDetectionEventListener
+public class MessagingService
 {
     private static int version_ = 1;
     //TODO: make this parameter dynamic somehow.  Not sure if config is appropriate.
@@ -129,7 +129,7 @@ public class MessagingService implements
     /** called by failure detection code to notify that housekeeping should be performed on downed sockets. */
     public void convict(InetAddress ep)
     {
-        logger_.trace("Resetting pool for " + ep);
+        logger_.debug("Resetting pool for " + ep);
         getConnectionPool(ep).reset();
     }
 

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/OutboundTcpConnection.java?rev=965906&r1=965905&r2=965906&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/OutboundTcpConnection.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/OutboundTcpConnection.java Tue Jul 20 16:55:51 2010
@@ -84,6 +84,9 @@ public class OutboundTcpConnection exten
             }
             if (socket != null || connect())
                 writeConnected(bb);
+            else
+                // clear out the queue, else gossip messages back up.
+                queue.clear();            
         }
     }
 

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java?rev=965906&r1=965905&r2=965906&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java Tue Jul 20 16:55:51 2010
@@ -874,7 +874,10 @@ public class StorageService implements I
             deliverHints(endpoint);
     }
 
-    public void onDead(InetAddress endpoint, EndPointState state) {}
+    public void onDead(InetAddress endpoint, EndPointState state) 
+    {
+        MessagingService.instance.convict(endpoint);
+    }
 
     /** raw load value */
     public double getLoad()