You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/01/21 00:38:40 UTC

svn commit: r901438 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra/net: MessagingService.java OutboundTcpConnection.java OutboundTcpConnectionPool.java

Author: jbellis
Date: Wed Jan 20 23:38:39 2010
New Revision: 901438

URL: http://svn.apache.org/viewvc?rev=901438&view=rev
Log:
handle node failure
patch by jbellis; reviewed by Brandon Williams for CASSANDRA-705

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=901438&r1=901437&r2=901438&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Wed Jan 20 23:38:39 2010
@@ -68,7 +68,7 @@
     /* Thread pool to handle messaging write activities */
     private static ExecutorService streamExecutor_;
     
-    private static NonBlockingHashMap<String, OutboundTcpConnectionPool> connectionManagers_ = new NonBlockingHashMap<String, OutboundTcpConnectionPool>();
+    private static NonBlockingHashMap<InetAddress, OutboundTcpConnectionPool> connectionManagers_ = new NonBlockingHashMap<InetAddress, OutboundTcpConnectionPool>();
     
     private static Logger logger_ = Logger.getLogger(MessagingService.class);
     
@@ -132,8 +132,8 @@
     /** called by failure detection code to notify that housekeeping should be performed on downed sockets. */
     public void convict(InetAddress ep)
     {
-        logger_.debug("Canceling pool for " + ep);
-        getConnectionPool(FBUtilities.getLocalAddress(), ep).reset();
+        logger_.debug("Resetting pool for " + ep);
+        getConnectionPool(ep).reset();
     }
 
     /**
@@ -167,21 +167,20 @@
         }, "ACCEPT-" + localEp).start();
     }
 
-    public static OutboundTcpConnectionPool getConnectionPool(InetAddress from, InetAddress to)
+    public static OutboundTcpConnectionPool getConnectionPool(InetAddress to)
     {
-        String key = from + ":" + to;
-        OutboundTcpConnectionPool cp = connectionManagers_.get(key);
+        OutboundTcpConnectionPool cp = connectionManagers_.get(to);
         if (cp == null)
         {
-            connectionManagers_.putIfAbsent(key, new OutboundTcpConnectionPool(from, to));
-            cp = connectionManagers_.get(key);
+            connectionManagers_.putIfAbsent(to, new OutboundTcpConnectionPool(to));
+            cp = connectionManagers_.get(to);
         }
         return cp;
     }
 
-    public static OutboundTcpConnection getConnection(InetAddress from, InetAddress to, Message msg)
+    public static OutboundTcpConnection getConnection(InetAddress to, Message msg)
     {
-        return getConnectionPool(from, to).getConnection(msg);
+        return getConnectionPool(to).getConnection(msg);
     }
         
     /**
@@ -292,12 +291,20 @@
             return;
         }
 
+        // message sinks are a testing hook
         Message processedMessage = SinkManager.processClientMessageSink(message);
         if (processedMessage == null)
         {
             return;
         }
 
+        // get pooled connection (really, connection queue)
+        OutboundTcpConnection connection = null;
+        connection = getConnection(to, message);
+        if (connection == null)
+            return;
+
+        // pack message with header in a bytebuffer
         byte[] data;
         try
         {
@@ -312,8 +319,7 @@
         assert data.length > 0;
         ByteBuffer buffer = packIt(data , false, false);
 
-        OutboundTcpConnection connection = null;
-        connection = getConnection(processedMessage.getFrom(), to, message);
+        // write it
         connection.write(buffer);
     }
     

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java?rev=901438&r1=901437&r2=901438&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java Wed Jan 20 23:38:39 2010
@@ -5,6 +5,7 @@
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.Socket;
+import java.net.SocketException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -22,19 +23,16 @@
     public DataOutputStream output;
     public Socket socket;
 
-    // TODO localEp is ignored, get rid of it
-    public OutboundTcpConnection(final OutboundTcpConnectionPool pool, InetAddress localEp, final InetAddress remoteEp)
+    public OutboundTcpConnection(final OutboundTcpConnectionPool pool, final InetAddress remoteEp)
+    throws IOException
     {
-        try
-        {
-            socket = new Socket(remoteEp, DatabaseDescriptor.getStoragePort());
-            socket.setTcpNoDelay(true);
-            output = new DataOutputStream(socket.getOutputStream());
-        }
-        catch (IOException e)
-        {
-            throw new IOError(e);
-        }
+        if (logger.isDebugEnabled())
+            logger.debug("attempting to connect to " + remoteEp);
+
+        socket = new Socket(remoteEp, DatabaseDescriptor.getStoragePort());
+        socket.setTcpNoDelay(true);
+        output = new DataOutputStream(socket.getOutputStream());
+
         new Thread(new Runnable()
         {
             public void run()

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java?rev=901438&r1=901437&r2=901438&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java Wed Jan 20 23:38:39 2010
@@ -21,29 +21,29 @@
 import java.io.IOException;
 import java.net.InetAddress;
 
+import org.apache.log4j.Logger;
+
 import org.apache.cassandra.concurrent.StageManager;
 
 class OutboundTcpConnectionPool
 {
-    private InetAddress localEp_;
+    private static Logger logger = Logger.getLogger(OutboundTcpConnectionPool.class);
+
+    private final int OPEN_RETRY_DELAY = 100; // ms between retries
+
     private InetAddress remoteEp_;
     private OutboundTcpConnection cmdCon;
     private OutboundTcpConnection ackCon;
+    private long lastFailedAttempt = Long.MIN_VALUE;
 
-    // TODO localEp is ignored, get rid of it
-    OutboundTcpConnectionPool(InetAddress localEp, InetAddress remoteEp)
+    OutboundTcpConnectionPool(InetAddress remoteEp)
     {
-        localEp_ = localEp;
         remoteEp_ = remoteEp;
     }
 
-    private OutboundTcpConnection newCon()
-    {
-        return new OutboundTcpConnection(this, localEp_, remoteEp_);
-    }
-
     /**
      * returns the appropriate connection based on message type.
+     * returns null if a connection could not be established.
      */
     synchronized OutboundTcpConnection getConnection(Message msg)
     {
@@ -51,13 +51,39 @@
             || StageManager.GOSSIP_STAGE.equals(msg.getMessageType()))
         {
             if (ackCon == null)
-                ackCon = newCon();
+            {
+                if (System.currentTimeMillis() < lastFailedAttempt + OPEN_RETRY_DELAY)
+                    return null;
+                try
+                {
+                    ackCon = new OutboundTcpConnection(this, remoteEp_);
+                }
+                catch (IOException e)
+                {
+                    lastFailedAttempt = System.currentTimeMillis();
+                    if (logger.isDebugEnabled())
+                        logger.debug("unable to connect to " + remoteEp_, e);
+                }
+            }
             return ackCon;
         }
         else
         {
             if (cmdCon == null)
-                cmdCon = newCon();
+            {
+                if (System.currentTimeMillis() < lastFailedAttempt + OPEN_RETRY_DELAY)
+                    return null;
+                try
+                {
+                    cmdCon = new OutboundTcpConnection(this, remoteEp_);
+                }
+                catch (IOException e)
+                {
+                    lastFailedAttempt = System.currentTimeMillis();
+                    if (logger.isDebugEnabled())
+                        logger.debug("unable to connect to " + remoteEp_, e);
+                }
+            }
             return cmdCon;
         }
     }