You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by gd...@apache.org on 2009/12/30 23:20:32 UTC

svn commit: r894721 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra/net: MessagingService.java TcpConnection.java TcpConnectionManager.java

Author: gdusbabek
Date: Wed Dec 30 22:20:32 2009
New Revision: 894721

URL: http://svn.apache.org/viewvc?rev=894721&view=rev
Log:
CASSANDRA-651  TcpConnectionManager was holding on to disconnected connections, giving the false indication they were being used.

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=894721&r1=894720&r2=894721&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Wed Dec 30 22:20:32 2009
@@ -20,6 +20,8 @@
 
 import org.apache.cassandra.concurrent.*;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.gms.IFailureDetectionEventListener;
 import org.apache.cassandra.net.io.SerializerType;
 import org.apache.cassandra.net.sink.SinkManager;
 import org.apache.cassandra.utils.*;
@@ -27,7 +29,6 @@
 
 import java.io.IOException;
 import java.net.ServerSocket;
-import java.net.SocketException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
@@ -40,7 +41,7 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
 
-public class MessagingService
+public class MessagingService implements IFailureDetectionEventListener
 {
     private static int version_ = 1;
     //TODO: make this parameter dynamic somehow.  Not sure if config is appropriate.
@@ -182,7 +183,14 @@
         }
         return result;
     }
-    
+
+    /** called by failure detection code to notify that housekeeping should be performed on downed sockets. */
+    public void convict(InetAddress ep)
+    {
+        logger_.debug("Canceling pool for " + ep);
+        getConnectionPool(FBUtilities.getLocalAddress(), ep).reset();
+    }
+
     /**
      * Listen on the specified port.
      * @param localEp InetAddress whose port to listen on.
@@ -199,7 +207,8 @@
 
         SelectionKey key = SelectorManager.getSelectorManager().register(serverChannel, handler, SelectionKey.OP_ACCEPT);          
         endPoints_.add(localEp);            
-        listenSockets_.put(localEp, key);             
+        listenSockets_.put(localEp, key);
+        FailureDetector.instance().registerFailureDetectionEventListener(this);
     }
     
     /**
@@ -410,12 +419,6 @@
             connection = MessagingService.getConnection(processedMessage.getFrom(), to, message);
             connection.write(message);
         }
-        catch (SocketException se)
-        {
-            // Shutting down the entire pool. May be too conservative an approach.
-            MessagingService.getConnectionPool(message.getFrom(), to).shutdown();
-            logger_.error("socket error writing to " + to, se);
-        }
         catch (IOException e)
         {
             if (connection != null)
@@ -492,6 +495,7 @@
         logger_.info("Shutting down ...");
         synchronized (MessagingService.class)
         {
+            FailureDetector.instance().unregisterFailureDetectionEventListener(MessagingService.instance());
             /* Stop listening on any TCP socket */
             for (SelectionKey skey : listenSockets_.values())
             {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java?rev=894721&r1=894720&r2=894721&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java Wed Dec 30 22:20:32 2009
@@ -316,7 +316,7 @@
         cancel(key_);
         pendingWrites_.clear();
         if (pool_ != null)
-            pool_.destroy(this);
+            pool_.reset();
     }
     
     private void cancel(SelectionKey key)

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java?rev=894721&r1=894720&r2=894721&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java Wed Dec 30 22:20:32 2009
@@ -64,24 +64,12 @@
         }
     }
 
-    synchronized void shutdown()
+    synchronized void reset()
     {
         for (TcpConnection con : new TcpConnection[] { cmdCon, ackCon })
             if (con != null)
                 con.closeSocket();
-    }
-
-    synchronized void destroy(TcpConnection con)
-    {
-        assert con != null;
-        if (cmdCon == con)
-        {
-            cmdCon = null;
-        }
-        else
-        {
-            assert ackCon == con;
-            ackCon = null;
-        }
+        cmdCon = null;
+        ackCon = null;
     }
 }