You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by gd...@apache.org on 2010/01/25 22:26:29 UTC

svn commit: r902981 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra/net: FileStreamTask.java OutboundTcpConnection.java

Author: gdusbabek
Date: Mon Jan 25 21:26:28 2010
New Revision: 902981

URL: http://svn.apache.org/viewvc?rev=902981&view=rev
Log:
bind outgoing sockets to the locally specified cassandra interface (avoids using the result of InetAddress.anyLocalAddress(), which may not be the right cassandra interface). Patch by Gary Dusbabek, reviewed by Jonathan Ellis.

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java?rev=902981&r1=902980&r2=902981&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java Mon Jan 25 21:26:28 2010
@@ -25,6 +25,7 @@
 import java.nio.channels.FileChannel;
 import java.nio.channels.SocketChannel;
 
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.log4j.Logger;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -51,7 +52,11 @@
     
     public void runMayThrow() throws IOException
     {
-        SocketChannel channel = SocketChannel.open(new InetSocketAddress(to, DatabaseDescriptor.getStoragePort()));
+        SocketChannel channel = SocketChannel.open();
+        // force local binding on correctly specified interface.
+        channel.socket().bind(new InetSocketAddress(FBUtilities.getLocalAddress(), 0));
+        // obey the unwritten law that all nodes on a cluster must use the same storage port.
+        channel.connect(new InetSocketAddress(to, DatabaseDescriptor.getStoragePort()));
         try
         {
             stream(channel);

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java?rev=902981&r1=902980&r2=902981&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java Mon Jan 25 21:26:28 2010
@@ -1,148 +1,150 @@
-package org.apache.cassandra.net;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.Socket;
-import java.nio.ByteBuffer;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.log4j.Logger;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-
-public class OutboundTcpConnection extends Thread
-{
-    private static final Logger logger = Logger.getLogger(OutboundTcpConnection.class);
-
-    private static final ByteBuffer CLOSE_SENTINEL = ByteBuffer.allocate(0);
-    private static final int OPEN_RETRY_DELAY = 100; // ms between retries
-
-    private final OutboundTcpConnectionPool pool;
-    private final InetAddress endpoint;
-    private final BlockingQueue<ByteBuffer> queue = new LinkedBlockingQueue<ByteBuffer>();
-    private DataOutputStream output;
-    private Socket socket;
-
-    public OutboundTcpConnection(final OutboundTcpConnectionPool pool, final InetAddress remoteEp)
-    {
-        super("WRITE-" + remoteEp);
-        this.pool = pool;
-        this.endpoint = remoteEp;
-    }
-
-    public void write(ByteBuffer buffer)
-    {
-        try
-        {
-            queue.put(buffer);
-        }
-        catch (InterruptedException e)
-        {
-            throw new AssertionError(e);
-        }
-    }
-
-    public void closeSocket()
-    {
-        queue.clear();
-        write(CLOSE_SENTINEL);
-    }
-
-    public void run()
-    {
-        while (true)
-        {
-            ByteBuffer bb = take();
-            if (bb == CLOSE_SENTINEL)
-            {
-                disconnect();
-                continue;
-            }
-            if (socket != null || connect())
-                writeConnected(bb);
-        }
-    }
-
-    private void writeConnected(ByteBuffer bb)
-    {
-        try
-        {
-            output.write(bb.array(), 0, bb.limit());
-            if (queue.peek() == null)
-            {
-                output.flush();
-            }
-        }
-        catch (IOException e)
-        {
-            logger.info("error writing to " + endpoint);
-            disconnect();
-        }
-    }
-
-    private void disconnect()
-    {
-        if (socket != null)
-        {
-            try
-            {
-                socket.close();
-            }
-            catch (IOException e)
-            {
-                if (logger.isDebugEnabled())
-                    logger.debug("exception closing connection to " + endpoint, e);
-            }
-            output = null;
-            socket = null;
-        }
-    }
-
-    private ByteBuffer take()
-    {
-        ByteBuffer bb;
-        try
-        {
-            bb = queue.take();
-        }
-        catch (InterruptedException e)
-        {
-            throw new AssertionError(e);
-        }
-        return bb;
-    }
-
-    private boolean connect()
-    {
-        if (logger.isDebugEnabled())
-            logger.debug("attempting to connect to " + endpoint);
-        long start = System.currentTimeMillis();
-        while (System.currentTimeMillis() < start + DatabaseDescriptor.getRpcTimeout())
-        {
-            try
-            {
-                socket = new Socket(endpoint, DatabaseDescriptor.getStoragePort());
-                socket.setTcpNoDelay(true);
-                output = new DataOutputStream(socket.getOutputStream());
-                return true;
-            }
-            catch (IOException e)
-            {
-                socket = null;
-                if (logger.isTraceEnabled())
-                    logger.trace("unable to connect to " + endpoint, e);
-                try
-                {
-                    Thread.sleep(OPEN_RETRY_DELAY);
-                }
-                catch (InterruptedException e1)
-                {
-                    throw new AssertionError(e1);
-                }
-            }
-        }
-        return false;
-    }
-}
+package org.apache.cassandra.net;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+
+public class OutboundTcpConnection extends Thread
+{
+    private static final Logger logger = Logger.getLogger(OutboundTcpConnection.class);
+
+    private static final ByteBuffer CLOSE_SENTINEL = ByteBuffer.allocate(0);
+    private static final int OPEN_RETRY_DELAY = 100; // ms between retries
+
+    private final OutboundTcpConnectionPool pool;
+    private final InetAddress endpoint;
+    private final BlockingQueue<ByteBuffer> queue = new LinkedBlockingQueue<ByteBuffer>();
+    private DataOutputStream output;
+    private Socket socket;
+
+    public OutboundTcpConnection(final OutboundTcpConnectionPool pool, final InetAddress remoteEp)
+    {
+        super("WRITE-" + remoteEp);
+        this.pool = pool;
+        this.endpoint = remoteEp;
+    }
+
+    public void write(ByteBuffer buffer)
+    {
+        try
+        {
+            queue.put(buffer);
+        }
+        catch (InterruptedException e)
+        {
+            throw new AssertionError(e);
+        }
+    }
+
+    public void closeSocket()
+    {
+        queue.clear();
+        write(CLOSE_SENTINEL);
+    }
+
+    public void run()
+    {
+        while (true)
+        {
+            ByteBuffer bb = take();
+            if (bb == CLOSE_SENTINEL)
+            {
+                disconnect();
+                continue;
+            }
+            if (socket != null || connect())
+                writeConnected(bb);
+        }
+    }
+
+    private void writeConnected(ByteBuffer bb)
+    {
+        try
+        {
+            output.write(bb.array(), 0, bb.limit());
+            if (queue.peek() == null)
+            {
+                output.flush();
+            }
+        }
+        catch (IOException e)
+        {
+            logger.info("error writing to " + endpoint);
+            disconnect();
+        }
+    }
+
+    private void disconnect()
+    {
+        if (socket != null)
+        {
+            try
+            {
+                socket.close();
+            }
+            catch (IOException e)
+            {
+                if (logger.isDebugEnabled())
+                    logger.debug("exception closing connection to " + endpoint, e);
+            }
+            output = null;
+            socket = null;
+        }
+    }
+
+    private ByteBuffer take()
+    {
+        ByteBuffer bb;
+        try
+        {
+            bb = queue.take();
+        }
+        catch (InterruptedException e)
+        {
+            throw new AssertionError(e);
+        }
+        return bb;
+    }
+
+    private boolean connect()
+    {
+        if (logger.isDebugEnabled())
+            logger.debug("attempting to connect to " + endpoint);
+        long start = System.currentTimeMillis();
+        while (System.currentTimeMillis() < start + DatabaseDescriptor.getRpcTimeout())
+        {
+            try
+            {
+                // zero means 'bind on any available port.'
+                socket = new Socket(endpoint, DatabaseDescriptor.getStoragePort(), FBUtilities.getLocalAddress(), 0);
+                socket.setTcpNoDelay(true);
+                output = new DataOutputStream(socket.getOutputStream());
+                return true;
+            }
+            catch (IOException e)
+            {
+                socket = null;
+                if (logger.isTraceEnabled())
+                    logger.trace("unable to connect to " + endpoint, e);
+                try
+                {
+                    Thread.sleep(OPEN_RETRY_DELAY);
+                }
+                catch (InterruptedException e1)
+                {
+                    throw new AssertionError(e1);
+                }
+            }
+        }
+        return false;
+    }
+}