You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2011/10/26 22:56:31 UTC
svn commit: r1189448 - in /cassandra/branches/cassandra-1.0: ./ conf/
src/java/org/apache/cassandra/config/
src/java/org/apache/cassandra/locator/ src/java/org/apache/cassandra/net/
src/java/org/apache/cassandra/streaming/
Author: brandonwilliams
Date: Wed Oct 26 20:56:31 2011
New Revision: 1189448
URL: http://svn.apache.org/viewvc?rev=1189448&view=rev
Log:
Allow encryption only between datacenters.
Patch by Vijay, reviewed by brandonwilliams for CASSANDRA-2802
Modified:
cassandra/branches/cassandra-1.0/CHANGES.txt
cassandra/branches/cassandra-1.0/conf/cassandra.yaml
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/config/Config.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/config/EncryptionOptions.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/locator/Ec2Snitch.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingService.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/streaming/FileStreamTask.java
Modified: cassandra/branches/cassandra-1.0/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/CHANGES.txt?rev=1189448&r1=1189447&r2=1189448&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/CHANGES.txt (original)
+++ cassandra/branches/cassandra-1.0/CHANGES.txt Wed Oct 26 20:56:31 2011
@@ -56,6 +56,7 @@ Merged from 0.8:
(CASSANDRA-3351)
* remove incorrect optimization from slice read path (CASSANDRA-3390)
* Fix race in AntiEntropyService (CASSANDRA-3400)
+ * allow encryption only between datacenters (CASSANDRA-2802)
1.0.0-final
Modified: cassandra/branches/cassandra-1.0/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/conf/cassandra.yaml?rev=1189448&r1=1189447&r2=1189448&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/conf/cassandra.yaml (original)
+++ cassandra/branches/cassandra-1.0/conf/cassandra.yaml Wed Oct 26 20:56:31 2011
@@ -164,6 +164,10 @@ sliced_buffer_size_in_kb: 64
# TCP port, for commands and data
storage_port: 7000
+# SSL port, for encrypted communication. Unused unless enabled in
+# encryption_options
+ssl_storage_port: 7001
+
# Address to bind to and tell other Cassandra nodes to connect to. You
# _must_ change this if you want multiple nodes to be able to
# communicate!
@@ -403,7 +407,10 @@ index_interval: 128
# users generate their own keys) TLS_RSA_WITH_AES_128_CBC_SHA as the cipher
# suite for authentication, key exchange and encryption of the actual data transfers.
# NOTE: No custom encryption options are enabled at the moment
-# The available internode options are : all, none
+# The available internode options are : all, none, dc, rack
+#
+# If set to dc cassandra will encrypt the traffic between the DCs
+# If set to rack cassandra will encrypt the traffic between the racks
#
# The passwords used in these options must match the passwords used when generating
# the keystore and truststore. For instructions on generating these files, see:
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/config/Config.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/config/Config.java?rev=1189448&r1=1189447&r2=1189448&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/config/Config.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/config/Config.java Wed Oct 26 20:56:31 2011
@@ -62,6 +62,7 @@ public class Config
public Integer sliced_buffer_size_in_kb = 64;
public Integer storage_port = 7000;
+ public Integer ssl_storage_port = 7001;
public String listen_address;
public String broadcast_address;
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1189448&r1=1189447&r2=1189448&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Wed Oct 26 20:56:31 2011
@@ -632,6 +632,11 @@ public class DatabaseDescriptor
return Integer.parseInt(System.getProperty("cassandra.storage_port", conf.storage_port.toString()));
}
+ public static int getSSLStoragePort()
+ {
+ return Integer.parseInt(System.getProperty("cassandra.ssl_storage_port", conf.ssl_storage_port.toString()));
+ }
+
public static int getRpcPort()
{
return Integer.parseInt(System.getProperty("cassandra.rpc_port", conf.rpc_port.toString()));
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/config/EncryptionOptions.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/config/EncryptionOptions.java?rev=1189448&r1=1189447&r2=1189448&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/config/EncryptionOptions.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/config/EncryptionOptions.java Wed Oct 26 20:56:31 2011
@@ -33,6 +33,8 @@ public class EncryptionOptions
public static enum InternodeEncryption
{
all,
- none
+ none,
+ dc,
+ rack
}
}
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/locator/Ec2Snitch.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/locator/Ec2Snitch.java?rev=1189448&r1=1189447&r2=1189448&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/locator/Ec2Snitch.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/locator/Ec2Snitch.java Wed Oct 26 20:56:31 2011
@@ -44,6 +44,8 @@ public class Ec2Snitch extends AbstractN
{
protected static Logger logger = LoggerFactory.getLogger(Ec2Snitch.class);
protected static final String ZONE_NAME_QUERY_URL = "http://169.254.169.254/latest/meta-data/placement/availability-zone";
+ private static final String DEFAULT_DC = "UNKNOWN-DC";
+ private static final String DEFAULT_RACK = "UNKNOWN-RACK";
protected String ec2zone;
protected String ec2region;
@@ -83,14 +85,20 @@ public class Ec2Snitch extends AbstractN
{
if (endpoint.equals(FBUtilities.getBroadcastAddress()))
return ec2zone;
- return Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.RACK).value;
+ EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
+ if (null == state || null == state.getApplicationState(ApplicationState.RACK))
+ return DEFAULT_RACK;
+ return state.getApplicationState(ApplicationState.RACK).value;
}
public String getDatacenter(InetAddress endpoint)
{
if (endpoint.equals(FBUtilities.getBroadcastAddress()))
return ec2region;
- return Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.DC).value;
+ EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
+ if (null == state || null == state.getApplicationState(ApplicationState.DC))
+ return DEFAULT_DC;
+ return state.getApplicationState(ApplicationState.DC).value;
}
@Override
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingService.java?rev=1189448&r1=1189447&r2=1189448&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingService.java Wed Oct 26 20:56:31 2011
@@ -35,6 +35,8 @@ import javax.management.MBeanServer;
import javax.management.ObjectName;
import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -88,7 +90,7 @@ public final class MessagingService impl
private static final Logger logger_ = LoggerFactory.getLogger(MessagingService.class);
private static final int LOG_DROPPED_INTERVAL_IN_MS = 5000;
- private SocketThread socketThread;
+ private List<SocketThread> socketThreads = Lists.newArrayList();
private final SimpleCondition listenGate;
/**
@@ -236,41 +238,45 @@ public final class MessagingService impl
*/
public void listen(InetAddress localEp) throws IOException, ConfigurationException
{
- socketThread = new SocketThread(getServerSocket(localEp), "ACCEPT-" + localEp);
- socketThread.start();
+ for (ServerSocket ss: getServerSocket(localEp))
+ {
+ SocketThread th = new SocketThread(ss, "ACCEPT-" + localEp);
+ th.start();
+ socketThreads.add(th);
+ }
listenGate.signalAll();
}
- private ServerSocket getServerSocket(InetAddress localEp) throws IOException, ConfigurationException
+ private List<ServerSocket> getServerSocket(InetAddress localEp) throws IOException, ConfigurationException
{
- final ServerSocket ss;
- if (DatabaseDescriptor.getEncryptionOptions() != null && DatabaseDescriptor.getEncryptionOptions().internode_encryption == EncryptionOptions.InternodeEncryption.all)
+ final List<ServerSocket> ss = new ArrayList<ServerSocket>();
+ if (DatabaseDescriptor.getEncryptionOptions() != null && DatabaseDescriptor.getEncryptionOptions().internode_encryption != EncryptionOptions.InternodeEncryption.none)
{
- ss = SSLFactory.getServerSocket(DatabaseDescriptor.getEncryptionOptions(), localEp, DatabaseDescriptor.getStoragePort());
+ ss.add(SSLFactory.getServerSocket(DatabaseDescriptor.getEncryptionOptions(), localEp, DatabaseDescriptor.getSSLStoragePort()));
// setReuseAddress happens in the factory.
- logger_.info("Starting Encrypted Messaging Service on port {}", DatabaseDescriptor.getStoragePort());
+ logger_.info("Starting Encrypted Messaging Service on SSL port {}", DatabaseDescriptor.getSSLStoragePort());
}
- else
+
+ ServerSocketChannel serverChannel = ServerSocketChannel.open();
+ ServerSocket socket = serverChannel.socket();
+ socket.setReuseAddress(true);
+ InetSocketAddress address = new InetSocketAddress(localEp, DatabaseDescriptor.getStoragePort());
+ try
{
- ServerSocketChannel serverChannel = ServerSocketChannel.open();
- ss = serverChannel.socket();
- ss.setReuseAddress(true);
- InetSocketAddress address = new InetSocketAddress(localEp, DatabaseDescriptor.getStoragePort());
- try
- {
- ss.bind(address);
- }
- catch (BindException e)
- {
- if (e.getMessage().contains("in use"))
- throw new ConfigurationException(address + " is in use by another process. Change listen_address:storage_port in cassandra.yaml to values that do not conflict with other services");
- else if (e.getMessage().contains("Cannot assign requested address"))
- throw new ConfigurationException("Unable to bind to address " + address + ". Set listen_address in cassandra.yaml to an interface you can bind to, e.g., your private IP address on EC2");
- else
- throw e;
- }
- logger_.info("Starting Messaging Service on {}", address);
+ socket.bind(address);
+ }
+ catch (BindException e)
+ {
+ if (e.getMessage().contains("in use"))
+ throw new ConfigurationException(address + " is in use by another process. Change listen_address:storage_port in cassandra.yaml to values that do not conflict with other services");
+ else if (e.getMessage().contains("Cannot assign requested address"))
+ throw new ConfigurationException("Unable to bind to address " + address
+ + ". Set listen_address in cassandra.yaml to an interface you can bind to, e.g., your private IP address on EC2");
+ else
+ throw e;
}
+ logger_.info("Starting Messaging Service on port {}", DatabaseDescriptor.getStoragePort());
+ ss.add(socket);
return ss;
}
@@ -453,7 +459,7 @@ public final class MessagingService impl
public void stream(StreamHeader header, InetAddress to)
{
/* Streaming asynchronously on streamExector_ threads. */
- streamExecutor_.execute(new FileStreamTask(header, to, DatabaseDescriptor.getEncryptionOptions()));
+ streamExecutor_.execute(new FileStreamTask(header, to));
}
/** The count of active outbound stream tasks. */
@@ -485,7 +491,8 @@ public final class MessagingService impl
try
{
- socketThread.close();
+ for (SocketThread th : socketThreads)
+ th.close();
}
catch (IOException e)
{
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnection.java?rev=1189448&r1=1189447&r2=1189448&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnection.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnection.java Wed Oct 26 20:56:31 2011
@@ -24,7 +24,6 @@ package org.apache.cassandra.net;
import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
-import java.net.InetAddress;
import java.net.Socket;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -34,9 +33,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.EncryptionOptions;
-import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.security.SSLFactory;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
@@ -51,23 +47,17 @@ public class OutboundTcpConnection exten
MessagingService.version_);
private static final int OPEN_RETRY_DELAY = 100; // ms between retries
-
- private InetAddress endpoint;
private final BlockingQueue<Pair<Message, String>> queue = new LinkedBlockingQueue<Pair<Message, String>>();
- private DataOutputStream out;
+ private final OutboundTcpConnectionPool poolReference;
+ private DataOutputStream out;
private Socket socket;
private long completedCount;
- public OutboundTcpConnection(InetAddress remoteEp)
- {
- super("WRITE-" + remoteEp);
- setEndPoint(remoteEp);
- }
-
- public void setEndPoint(InetAddress remoteEndPoint)
+ public OutboundTcpConnection(OutboundTcpConnectionPool pool)
{
- this.endpoint = remoteEndPoint;
+ super("WRITE-" + pool.endPoint());
+ this.poolReference = pool;
}
public void enqueue(Message message, String id)
@@ -131,7 +121,7 @@ public class OutboundTcpConnection exten
catch (IOException e)
{
if (logger.isDebugEnabled())
- logger.debug("error writing to " + endpoint, e);
+ logger.debug("error writing to " + poolReference.endPoint(), e);
disconnect();
}
}
@@ -185,7 +175,7 @@ public class OutboundTcpConnection exten
catch (IOException e)
{
if (logger.isDebugEnabled())
- logger.debug("exception closing connection to " + endpoint, e);
+ logger.debug("exception closing connection to " + poolReference.endPoint(), e);
}
out = null;
socket = null;
@@ -210,22 +200,13 @@ public class OutboundTcpConnection exten
private boolean connect()
{
if (logger.isDebugEnabled())
- logger.debug("attempting to connect to " + endpoint);
+ logger.debug("attempting to connect to " + poolReference.endPoint());
long start = System.currentTimeMillis();
while (System.currentTimeMillis() < start + DatabaseDescriptor.getRpcTimeout())
{
try
{
- // zero means 'bind on any available port.'
- EncryptionOptions options = DatabaseDescriptor.getEncryptionOptions();
- if (options != null && options.internode_encryption == EncryptionOptions.InternodeEncryption.all)
- {
- socket = SSLFactory.getSocket(options, endpoint, DatabaseDescriptor.getStoragePort(), FBUtilities.getLocalAddress(), 0);
- }
- else {
- socket = new Socket(endpoint, DatabaseDescriptor.getStoragePort(), FBUtilities.getLocalAddress(), 0);
- }
-
+ socket = poolReference.newSocket();
socket.setKeepAlive(true);
socket.setTcpNoDelay(true);
out = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream(), 4096));
@@ -235,7 +216,7 @@ public class OutboundTcpConnection exten
{
socket = null;
if (logger.isTraceEnabled())
- logger.trace("unable to connect to " + endpoint, e);
+ logger.trace("unable to connect to " + poolReference.endPoint(), e);
try
{
Thread.sleep(OPEN_RETRY_DELAY);
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java?rev=1189448&r1=1189447&r2=1189448&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java Wed Oct 26 20:56:31 2011
@@ -18,20 +18,32 @@
package org.apache.cassandra.net;
+import java.io.IOException;
import java.net.InetAddress;
+import java.net.Socket;
import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.security.SSLFactory;
+import org.apache.cassandra.utils.FBUtilities;
public class OutboundTcpConnectionPool
{
+ private IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+ // pointer for the real Address.
+ private final InetAddress id;
public final OutboundTcpConnection cmdCon;
public final OutboundTcpConnection ackCon;
+ // pointer to the reseted Address.
+ private InetAddress resetedEndpoint;
OutboundTcpConnectionPool(InetAddress remoteEp)
{
- cmdCon = new OutboundTcpConnection(remoteEp);
+ id = remoteEp;
+ cmdCon = new OutboundTcpConnection(this);
cmdCon.start();
- ackCon = new OutboundTcpConnection(remoteEp);
+ ackCon = new OutboundTcpConnection(this);
ackCon.start();
}
@@ -55,9 +67,46 @@ public class OutboundTcpConnectionPool
public void reset(InetAddress remoteEP)
{
- ackCon.setEndPoint(remoteEP);
- ackCon.closeSocket();
- cmdCon.setEndPoint(remoteEP);
- cmdCon.closeSocket();
+ resetedEndpoint = remoteEP;
+ reset();
+ }
+
+ public Socket newSocket() throws IOException
+ {
+ // zero means 'bind on any available port.'
+ if (isEncryptedChannel())
+ {
+ return SSLFactory.getSocket(DatabaseDescriptor.getEncryptionOptions(), endPoint(), DatabaseDescriptor.getSSLStoragePort(), FBUtilities.getLocalAddress(), 0);
+ }
+ else {
+ return new Socket(endPoint(), DatabaseDescriptor.getStoragePort(), FBUtilities.getLocalAddress(), 0);
+ }
+ }
+
+ InetAddress endPoint()
+ {
+ return resetedEndpoint == null ? id : resetedEndpoint;
+ }
+
+ boolean isEncryptedChannel()
+ {
+ switch (DatabaseDescriptor.getEncryptionOptions().internode_encryption)
+ {
+ case none:
+ return false; // if nothing needs to be encrypted then return immediately.
+ case all:
+ break;
+ case dc:
+ if (snitch.getDatacenter(id).equals(snitch.getDatacenter(FBUtilities.getBroadcastAddress())))
+ return false;
+ break;
+ case rack:
+ // for rack then check if the DC's are the same.
+ if (snitch.getRack(id).equals(snitch.getRack(FBUtilities.getBroadcastAddress()))
+ && snitch.getDatacenter(id).equals(snitch.getDatacenter(FBUtilities.getBroadcastAddress())))
+ return false;
+ break;
+ }
+ return true;
}
}
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/streaming/FileStreamTask.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/streaming/FileStreamTask.java?rev=1189448&r1=1189447&r2=1189448&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/streaming/FileStreamTask.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/streaming/FileStreamTask.java Wed Oct 26 20:56:31 2011
@@ -22,20 +22,16 @@ import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
-import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
-import org.apache.cassandra.config.EncryptionOptions;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
import org.apache.cassandra.io.util.RandomAccessReader;
-import org.apache.cassandra.security.SSLFactory;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.Throttle;
import org.apache.cassandra.utils.WrappedRunnable;
@@ -60,18 +56,15 @@ public class FileStreamTask extends Wrap
private Socket socket;
// socket's output stream
private OutputStream output;
- // system encryption options if any
- private final EncryptionOptions encryptionOptions;
// allocate buffer to use for transfers only once
private final byte[] transferBuffer = new byte[CHUNK_SIZE];
// outbound global throughput limiter
private final Throttle throttle;
- public FileStreamTask(StreamHeader header, InetAddress to, EncryptionOptions encryptionOptions)
+ public FileStreamTask(StreamHeader header, InetAddress to)
{
this.header = header;
this.to = to;
- this.encryptionOptions = encryptionOptions;
this.throttle = new Throttle(toString(), new Throttle.ThroughputFunction()
{
/** @return Instantaneous throughput target in bytes per millisecond. */
@@ -198,13 +191,13 @@ public class FileStreamTask extends Wrap
*/
private void connectAttempt() throws IOException
{
- bind();
int attempts = 0;
while (true)
{
try
{
- connect();
+ socket = MessagingService.instance().getConnectionPool(to).newSocket();
+ output = socket.getOutputStream();
break;
}
catch (IOException e)
@@ -226,22 +219,6 @@ public class FileStreamTask extends Wrap
}
}
- protected void bind() throws IOException
- {
- socket = (encryptionOptions != null && encryptionOptions.internode_encryption == EncryptionOptions.InternodeEncryption.all)
- ? SSLFactory.getSocket(encryptionOptions)
- : new Socket();
-
- // force local binding on correctly specified interface.
- socket.bind(new InetSocketAddress(FBUtilities.getLocalAddress(), 0));
- }
-
- protected void connect() throws IOException
- {
- socket.connect(new InetSocketAddress(to, DatabaseDescriptor.getStoragePort()));
- output = socket.getOutputStream();
- }
-
protected void close() throws IOException
{
output.close();