You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2011/10/26 22:56:31 UTC

svn commit: r1189448 - in /cassandra/branches/cassandra-1.0: ./ conf/ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/locator/ src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/streaming/

Author: brandonwilliams
Date: Wed Oct 26 20:56:31 2011
New Revision: 1189448

URL: http://svn.apache.org/viewvc?rev=1189448&view=rev
Log:
Allow encryption only between datacenters.
Patch by Vijay, reviewed by brandonwilliams for CASSANDRA-2802

Modified:
    cassandra/branches/cassandra-1.0/CHANGES.txt
    cassandra/branches/cassandra-1.0/conf/cassandra.yaml
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/config/Config.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/config/EncryptionOptions.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/locator/Ec2Snitch.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingService.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/streaming/FileStreamTask.java

Modified: cassandra/branches/cassandra-1.0/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/CHANGES.txt?rev=1189448&r1=1189447&r2=1189448&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/CHANGES.txt (original)
+++ cassandra/branches/cassandra-1.0/CHANGES.txt Wed Oct 26 20:56:31 2011
@@ -56,6 +56,7 @@ Merged from 0.8:
    (CASSANDRA-3351)
  * remove incorrect optimization from slice read path (CASSANDRA-3390)
  * Fix race in AntiEntropyService (CASSANDRA-3400)
+ * allow encryption only between datacenters (CASSANDRA-2802)
 
 
 1.0.0-final

Modified: cassandra/branches/cassandra-1.0/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/conf/cassandra.yaml?rev=1189448&r1=1189447&r2=1189448&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/conf/cassandra.yaml (original)
+++ cassandra/branches/cassandra-1.0/conf/cassandra.yaml Wed Oct 26 20:56:31 2011
@@ -164,6 +164,10 @@ sliced_buffer_size_in_kb: 64
 # TCP port, for commands and data
 storage_port: 7000
 
+# SSL port, for encrypted communication.  Unused unless enabled in
+# encryption_options
+ssl_storage_port: 7001
+
 # Address to bind to and tell other Cassandra nodes to connect to. You
 # _must_ change this if you want multiple nodes to be able to
 # communicate!
@@ -403,7 +407,10 @@ index_interval: 128
 # users generate their own keys) TLS_RSA_WITH_AES_128_CBC_SHA as the cipher
 # suite for authentication, key exchange and encryption of the actual data transfers.
 # NOTE: No custom encryption options are enabled at the moment
-# The available internode options are : all, none
+# The available internode options are : all, none, dc, rack
+#
+# If set to dc cassandra will encrypt the traffic between the DCs
+# If set to rack cassandra will encrypt the traffic between the racks
 #
 # The passwords used in these options must match the passwords used when generating
 # the keystore and truststore.  For instructions on generating these files, see:

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/config/Config.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/config/Config.java?rev=1189448&r1=1189447&r2=1189448&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/config/Config.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/config/Config.java Wed Oct 26 20:56:31 2011
@@ -62,6 +62,7 @@ public class Config
     public Integer sliced_buffer_size_in_kb = 64;
     
     public Integer storage_port = 7000;
+    public Integer ssl_storage_port = 7001;
     public String listen_address;
     public String broadcast_address;
     

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1189448&r1=1189447&r2=1189448&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Wed Oct 26 20:56:31 2011
@@ -632,6 +632,11 @@ public class DatabaseDescriptor
         return Integer.parseInt(System.getProperty("cassandra.storage_port", conf.storage_port.toString()));
     }
 
+    public static int getSSLStoragePort()
+    {
+        return Integer.parseInt(System.getProperty("cassandra.ssl_storage_port", conf.ssl_storage_port.toString()));
+    }
+
     public static int getRpcPort()
     {
         return Integer.parseInt(System.getProperty("cassandra.rpc_port", conf.rpc_port.toString()));

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/config/EncryptionOptions.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/config/EncryptionOptions.java?rev=1189448&r1=1189447&r2=1189448&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/config/EncryptionOptions.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/config/EncryptionOptions.java Wed Oct 26 20:56:31 2011
@@ -33,6 +33,8 @@ public class EncryptionOptions
     public static enum InternodeEncryption
     {
         all,
-        none
+        none,
+        dc,
+        rack
     }
 }

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/locator/Ec2Snitch.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/locator/Ec2Snitch.java?rev=1189448&r1=1189447&r2=1189448&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/locator/Ec2Snitch.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/locator/Ec2Snitch.java Wed Oct 26 20:56:31 2011
@@ -44,6 +44,8 @@ public class Ec2Snitch extends AbstractN
 {
     protected static Logger logger = LoggerFactory.getLogger(Ec2Snitch.class);
     protected static final String ZONE_NAME_QUERY_URL = "http://169.254.169.254/latest/meta-data/placement/availability-zone";
+    private static final String DEFAULT_DC = "UNKNOWN-DC";
+    private static final String DEFAULT_RACK = "UNKNOWN-RACK";
     protected String ec2zone;
     protected String ec2region;
 
@@ -83,14 +85,20 @@ public class Ec2Snitch extends AbstractN
     {
         if (endpoint.equals(FBUtilities.getBroadcastAddress()))
             return ec2zone;
-        return Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.RACK).value;
+        EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
+        if (null == state || null == state.getApplicationState(ApplicationState.RACK))
+            return DEFAULT_RACK;
+        return state.getApplicationState(ApplicationState.RACK).value;
     }
 
     public String getDatacenter(InetAddress endpoint)
     {
         if (endpoint.equals(FBUtilities.getBroadcastAddress()))
             return ec2region;
-        return Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.DC).value;
+        EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
+        if (null == state || null == state.getApplicationState(ApplicationState.DC))
+            return DEFAULT_DC;
+        return state.getApplicationState(ApplicationState.DC).value;
     }
 
     @Override

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingService.java?rev=1189448&r1=1189447&r2=1189448&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingService.java Wed Oct 26 20:56:31 2011
@@ -35,6 +35,8 @@ import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -88,7 +90,7 @@ public final class MessagingService impl
     private static final Logger logger_ = LoggerFactory.getLogger(MessagingService.class);
     private static final int LOG_DROPPED_INTERVAL_IN_MS = 5000;
 
-    private SocketThread socketThread;
+    private List<SocketThread> socketThreads = Lists.newArrayList();
     private final SimpleCondition listenGate;
 
     /**
@@ -236,41 +238,45 @@ public final class MessagingService impl
      */
     public void listen(InetAddress localEp) throws IOException, ConfigurationException
     {
-        socketThread = new SocketThread(getServerSocket(localEp), "ACCEPT-" + localEp);
-        socketThread.start();
+        for (ServerSocket ss: getServerSocket(localEp))
+        {
+            SocketThread th = new SocketThread(ss, "ACCEPT-" + localEp);
+            th.start();
+            socketThreads.add(th);
+        }
         listenGate.signalAll();
     }
 
-    private ServerSocket getServerSocket(InetAddress localEp) throws IOException, ConfigurationException
+    private List<ServerSocket> getServerSocket(InetAddress localEp) throws IOException, ConfigurationException
     {
-        final ServerSocket ss;
-        if (DatabaseDescriptor.getEncryptionOptions() != null && DatabaseDescriptor.getEncryptionOptions().internode_encryption == EncryptionOptions.InternodeEncryption.all)
+       final List<ServerSocket> ss = new ArrayList<ServerSocket>();
+        if (DatabaseDescriptor.getEncryptionOptions() != null && DatabaseDescriptor.getEncryptionOptions().internode_encryption != EncryptionOptions.InternodeEncryption.none)
         {
-            ss = SSLFactory.getServerSocket(DatabaseDescriptor.getEncryptionOptions(), localEp, DatabaseDescriptor.getStoragePort());
+            ss.add(SSLFactory.getServerSocket(DatabaseDescriptor.getEncryptionOptions(), localEp, DatabaseDescriptor.getSSLStoragePort()));
             // setReuseAddress happens in the factory.
-            logger_.info("Starting Encrypted Messaging Service on port {}", DatabaseDescriptor.getStoragePort());
+            logger_.info("Starting Encrypted Messaging Service on SSL port {}", DatabaseDescriptor.getSSLStoragePort());
         }
-        else
+        
+        ServerSocketChannel serverChannel = ServerSocketChannel.open();
+        ServerSocket socket = serverChannel.socket();
+        socket.setReuseAddress(true);
+        InetSocketAddress address = new InetSocketAddress(localEp, DatabaseDescriptor.getStoragePort());
+        try
         {
-            ServerSocketChannel serverChannel = ServerSocketChannel.open();
-            ss = serverChannel.socket();
-            ss.setReuseAddress(true);
-            InetSocketAddress address = new InetSocketAddress(localEp, DatabaseDescriptor.getStoragePort());
-            try
-            {
-                ss.bind(address);
-            }
-            catch (BindException e)
-            {
-                if (e.getMessage().contains("in use"))
-                    throw new ConfigurationException(address + " is in use by another process.  Change listen_address:storage_port in cassandra.yaml to values that do not conflict with other services");
-                else if (e.getMessage().contains("Cannot assign requested address"))
-                    throw new ConfigurationException("Unable to bind to address " + address + ". Set listen_address in cassandra.yaml to an interface you can bind to, e.g., your private IP address on EC2");
-                else
-                    throw e;
-            }
-            logger_.info("Starting Messaging Service on {}", address);
+            socket.bind(address);
+        }
+        catch (BindException e)
+        {
+            if (e.getMessage().contains("in use"))
+                throw new ConfigurationException(address + " is in use by another process.  Change listen_address:storage_port in cassandra.yaml to values that do not conflict with other services");
+            else if (e.getMessage().contains("Cannot assign requested address"))
+                throw new ConfigurationException("Unable to bind to address " + address
+                        + ". Set listen_address in cassandra.yaml to an interface you can bind to, e.g., your private IP address on EC2");
+            else
+                throw e;
         }
+        logger_.info("Starting Messaging Service on port {}", DatabaseDescriptor.getStoragePort());
+        ss.add(socket);
         return ss;
     }
 
@@ -453,7 +459,7 @@ public final class MessagingService impl
     public void stream(StreamHeader header, InetAddress to)
     {
         /* Streaming asynchronously on streamExector_ threads. */
-        streamExecutor_.execute(new FileStreamTask(header, to, DatabaseDescriptor.getEncryptionOptions()));
+        streamExecutor_.execute(new FileStreamTask(header, to));
     }
 
     /** The count of active outbound stream tasks. */
@@ -485,7 +491,8 @@ public final class MessagingService impl
 
         try
         {
-            socketThread.close();
+            for (SocketThread th : socketThreads)
+                th.close();
         }
         catch (IOException e)
         {

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnection.java?rev=1189448&r1=1189447&r2=1189448&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnection.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnection.java Wed Oct 26 20:56:31 2011
@@ -24,7 +24,6 @@ package org.apache.cassandra.net;
 import java.io.BufferedOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.net.InetAddress;
 import java.net.Socket;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -34,9 +33,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.EncryptionOptions;
-import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.security.SSLFactory;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
@@ -51,23 +47,17 @@ public class OutboundTcpConnection exten
                                                               MessagingService.version_);
 
     private static final int OPEN_RETRY_DELAY = 100; // ms between retries
-
-    private InetAddress endpoint;
     private final BlockingQueue<Pair<Message, String>> queue = new LinkedBlockingQueue<Pair<Message, String>>();
-    private DataOutputStream out;
+    private final OutboundTcpConnectionPool poolReference;    
 
+    private DataOutputStream out;
     private Socket socket;
     private long completedCount;
 
-    public OutboundTcpConnection(InetAddress remoteEp)
-    {
-        super("WRITE-" + remoteEp);
-        setEndPoint(remoteEp);        
-    }
-    
-    public void setEndPoint(InetAddress remoteEndPoint)
+    public OutboundTcpConnection(OutboundTcpConnectionPool pool)
     {
-        this.endpoint = remoteEndPoint;
+        super("WRITE-" + pool.endPoint());
+        this.poolReference = pool;
     }
 
     public void enqueue(Message message, String id)
@@ -131,7 +121,7 @@ public class OutboundTcpConnection exten
         catch (IOException e)
         {
             if (logger.isDebugEnabled())
-                logger.debug("error writing to " + endpoint, e);
+                logger.debug("error writing to " + poolReference.endPoint(), e);
             disconnect();
         }
     }
@@ -185,7 +175,7 @@ public class OutboundTcpConnection exten
             catch (IOException e)
             {
                 if (logger.isDebugEnabled())
-                    logger.debug("exception closing connection to " + endpoint, e);
+                    logger.debug("exception closing connection to " + poolReference.endPoint(), e);
             }
             out = null;
             socket = null;
@@ -210,22 +200,13 @@ public class OutboundTcpConnection exten
     private boolean connect()
     {
         if (logger.isDebugEnabled())
-            logger.debug("attempting to connect to " + endpoint);
+            logger.debug("attempting to connect to " + poolReference.endPoint());
         long start = System.currentTimeMillis();
         while (System.currentTimeMillis() < start + DatabaseDescriptor.getRpcTimeout())
         {
             try
             {
-                // zero means 'bind on any available port.'
-                EncryptionOptions options = DatabaseDescriptor.getEncryptionOptions();
-                if (options != null && options.internode_encryption == EncryptionOptions.InternodeEncryption.all)
-                {
-                    socket = SSLFactory.getSocket(options, endpoint, DatabaseDescriptor.getStoragePort(), FBUtilities.getLocalAddress(), 0);
-                }
-                else {
-                    socket = new Socket(endpoint, DatabaseDescriptor.getStoragePort(), FBUtilities.getLocalAddress(), 0);
-                }
-
+                socket = poolReference.newSocket();
                 socket.setKeepAlive(true);
                 socket.setTcpNoDelay(true);
                 out = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream(), 4096));
@@ -235,7 +216,7 @@ public class OutboundTcpConnection exten
             {
                 socket = null;
                 if (logger.isTraceEnabled())
-                    logger.trace("unable to connect to " + endpoint, e);
+                    logger.trace("unable to connect to " + poolReference.endPoint(), e);
                 try
                 {
                     Thread.sleep(OPEN_RETRY_DELAY);

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java?rev=1189448&r1=1189447&r2=1189448&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java Wed Oct 26 20:56:31 2011
@@ -18,20 +18,32 @@
 
 package org.apache.cassandra.net;
 
+import java.io.IOException;
 import java.net.InetAddress;
+import java.net.Socket;
 
 import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.security.SSLFactory;
+import org.apache.cassandra.utils.FBUtilities;
 
 public class OutboundTcpConnectionPool
 {
+    private IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+    // pointer for the real Address.
+    private final InetAddress id;
     public final OutboundTcpConnection cmdCon;
     public final OutboundTcpConnection ackCon;
+    // pointer to the reseted Address.
+    private InetAddress resetedEndpoint;
 
     OutboundTcpConnectionPool(InetAddress remoteEp)
     {
-        cmdCon = new OutboundTcpConnection(remoteEp);
+        id = remoteEp;
+        cmdCon = new OutboundTcpConnection(this);
         cmdCon.start();
-        ackCon = new OutboundTcpConnection(remoteEp);
+        ackCon = new OutboundTcpConnection(this);
         ackCon.start();
     }
 
@@ -55,9 +67,46 @@ public class OutboundTcpConnectionPool
     
     public void reset(InetAddress remoteEP)
     {
-        ackCon.setEndPoint(remoteEP);
-        ackCon.closeSocket();
-        cmdCon.setEndPoint(remoteEP);
-        cmdCon.closeSocket();
+        resetedEndpoint = remoteEP;
+        reset();
+    }
+    
+    public Socket newSocket() throws IOException
+    {
+        // zero means 'bind on any available port.'
+        if (isEncryptedChannel())
+        {
+            return SSLFactory.getSocket(DatabaseDescriptor.getEncryptionOptions(), endPoint(), DatabaseDescriptor.getSSLStoragePort(), FBUtilities.getLocalAddress(), 0);
+        }
+        else {
+            return new Socket(endPoint(), DatabaseDescriptor.getStoragePort(), FBUtilities.getLocalAddress(), 0);
+        }
+    }
+    
+    InetAddress endPoint()
+    {
+        return resetedEndpoint == null ? id : resetedEndpoint;
+    }
+    
+    boolean isEncryptedChannel()
+    {
+        switch (DatabaseDescriptor.getEncryptionOptions().internode_encryption)
+        {
+            case none:
+                return false; // if nothing needs to be encrypted then return immediately.
+            case all:
+                break;
+            case dc:
+                if (snitch.getDatacenter(id).equals(snitch.getDatacenter(FBUtilities.getBroadcastAddress())))
+                    return false;
+                break;
+            case rack:
+                // for rack then check if the DC's are the same.
+                if (snitch.getRack(id).equals(snitch.getRack(FBUtilities.getBroadcastAddress()))
+                        && snitch.getDatacenter(id).equals(snitch.getDatacenter(FBUtilities.getBroadcastAddress())))
+                    return false;
+                break;
+        }
+        return true;
     }
 }

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/streaming/FileStreamTask.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/streaming/FileStreamTask.java?rev=1189448&r1=1189447&r2=1189448&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/streaming/FileStreamTask.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/streaming/FileStreamTask.java Wed Oct 26 20:56:31 2011
@@ -22,20 +22,16 @@ import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.net.InetAddress;
-import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.nio.ByteBuffer;
 
-import org.apache.cassandra.config.EncryptionOptions;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
 import org.apache.cassandra.io.util.RandomAccessReader;
-import org.apache.cassandra.security.SSLFactory;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.Throttle;
 import org.apache.cassandra.utils.WrappedRunnable;
@@ -60,18 +56,15 @@ public class FileStreamTask extends Wrap
     private Socket socket;
     // socket's output stream
     private OutputStream output;
-    // system encryption options if any
-    private final EncryptionOptions encryptionOptions;
     // allocate buffer to use for transfers only once
     private final byte[] transferBuffer = new byte[CHUNK_SIZE];
     // outbound global throughput limiter
     private final Throttle throttle;
 
-    public FileStreamTask(StreamHeader header, InetAddress to, EncryptionOptions encryptionOptions)
+    public FileStreamTask(StreamHeader header, InetAddress to)
     {
         this.header = header;
         this.to = to;
-        this.encryptionOptions = encryptionOptions;
         this.throttle = new Throttle(toString(), new Throttle.ThroughputFunction()
         {
             /** @return Instantaneous throughput target in bytes per millisecond. */
@@ -198,13 +191,13 @@ public class FileStreamTask extends Wrap
      */
     private void connectAttempt() throws IOException
     {
-        bind();
         int attempts = 0;
         while (true)
         {
             try
             {
-                connect();
+                socket = MessagingService.instance().getConnectionPool(to).newSocket();
+                output = socket.getOutputStream();
                 break;
             }
             catch (IOException e)
@@ -226,22 +219,6 @@ public class FileStreamTask extends Wrap
         }
     }
 
-    protected void bind() throws IOException
-    {
-        socket = (encryptionOptions != null && encryptionOptions.internode_encryption == EncryptionOptions.InternodeEncryption.all)
-                    ? SSLFactory.getSocket(encryptionOptions)
-                    : new Socket();
-
-        // force local binding on correctly specified interface.
-        socket.bind(new InetSocketAddress(FBUtilities.getLocalAddress(), 0));
-    }
-
-    protected void connect() throws IOException
-    {
-        socket.connect(new InetSocketAddress(to, DatabaseDescriptor.getStoragePort()));
-        output = socket.getOutputStream();
-    }
-
     protected void close() throws IOException
     {
         output.close();