You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by gd...@apache.org on 2011/01/19 19:10:36 UTC

svn commit: r1060890 - in /cassandra/trunk: ./ conf/ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/security/ src/java/org/apache/cassandra/security/streaming/ src/java/org/apache/cassandra/stream...

Author: gdusbabek
Date: Wed Jan 19 18:10:35 2011
New Revision: 1060890

URL: http://svn.apache.org/viewvc?rev=1060890&view=rev
Log:
configurable internode encryption. patch by rnirmal, reviewed by gdusbabek. CASSANDRA-1567

Added:
    cassandra/trunk/src/java/org/apache/cassandra/config/EncryptionOptions.java
    cassandra/trunk/src/java/org/apache/cassandra/security/
    cassandra/trunk/src/java/org/apache/cassandra/security/SSLFactory.java
    cassandra/trunk/src/java/org/apache/cassandra/security/streaming/
    cassandra/trunk/src/java/org/apache/cassandra/security/streaming/SSLFileStreamTask.java
    cassandra/trunk/src/java/org/apache/cassandra/security/streaming/SSLIncomingStreamReader.java
Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/conf/cassandra.yaml
    cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
    cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
    cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
    cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
    cassandra/trunk/test/conf/cassandra.yaml

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1060890&r1=1060889&r2=1060890&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Wed Jan 19 18:10:35 2011
@@ -3,6 +3,7 @@
  * adds support for columns that act as incr/decr counters 
    (CASSANDRA-1072, 1937, 1944)
  * make NetworkTopologyStrategy the default (CASSANDRA-1960)
+ * configurable internode encryption (CASSANDRA-1567)
 
 
 0.7.1-dev

Modified: cassandra/trunk/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra.yaml?rev=1060890&r1=1060889&r2=1060890&view=diff
==============================================================================
--- cassandra/trunk/conf/cassandra.yaml (original)
+++ cassandra/trunk/conf/cassandra.yaml Wed Jan 19 18:10:35 2011
@@ -289,6 +289,23 @@ request_scheduler: org.apache.cassandra.
 #  the index is at the cost of space.
 index_interval: 128
 
+# Enable or disable inter-node encryption
+# Default settings are TLS v1, RSA 1024-bit keys (it is imperative that
+# users generate their own keys) TLS_RSA_WITH_AES_128_CBC_SHA as the cipher
+# suite for authentication, key exchange and encryption of the actual data transfers.
+# NOTE: No custom encryption options are enabled at the moment
+# The available internode options are : all, none
+#
+# The passwords used in these options must match the passwords used when generating
+# the keystore and truststore.  For instructions on generating these files, see:
+# http://download.oracle.com/javase/6/docs/technotes/guides/security/jsse/JSSERefGuide.html#CreateKeystore
+encryption_options:
+    internode_encryption: none
+    keystore: conf/.keystore
+    keystore_password: cassandra
+    truststore: conf/.truststore
+    truststore_password: cassandra
+
 # Keyspaces have ColumnFamilies.        (Usually 1 KS per application.)
 # ColumnFamilies have Rows.             (Dozens of CFs per KS.)
 # Rows contain Columns.                 (Many per CF.)

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/Config.java?rev=1060890&r1=1060889&r2=1060890&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/Config.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/Config.java Wed Jan 19 18:10:35 2011
@@ -100,6 +100,8 @@ public class Config
     public RequestSchedulerId request_scheduler_id;
     public RequestSchedulerOptions request_scheduler_options;
 
+    public EncryptionOptions encryption_options;
+
     public Integer index_interval = 128;
 
     public List<RawKeyspace> keyspaces;

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1060890&r1=1060889&r2=1060890&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Wed Jan 19 18:10:35 2011
@@ -54,7 +54,7 @@ import org.yaml.snakeyaml.TypeDescriptio
 import org.yaml.snakeyaml.Yaml;
 import org.yaml.snakeyaml.error.YAMLException;
 
-public class    DatabaseDescriptor
+public class DatabaseDescriptor
 {
     private static Logger logger = LoggerFactory.getLogger(DatabaseDescriptor.class);
 
@@ -1139,8 +1139,14 @@ public class    DatabaseDescriptor
     {
         return conf.dynamic_snitch_badness_threshold;
     }
+
     public static void setDynamicBadnessThreshold(Double dynamicBadnessThreshold)
     {
         conf.dynamic_snitch_badness_threshold = dynamicBadnessThreshold;
     }
+
+    public static EncryptionOptions getEncryptionOptions()
+    {
+        return conf.encryption_options;
+    }
 }

Added: cassandra/trunk/src/java/org/apache/cassandra/config/EncryptionOptions.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/EncryptionOptions.java?rev=1060890&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/EncryptionOptions.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/EncryptionOptions.java Wed Jan 19 18:10:35 2011
@@ -0,0 +1,38 @@
+package org.apache.cassandra.config;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+public class EncryptionOptions
+{
+    public InternodeEncryption internode_encryption = InternodeEncryption.none;
+    public String keystore = "conf/.keystore";
+    public String keystore_password = "cassandra";
+    public String truststore = "conf/.truststore";
+    public String truststore_password = "cassandra";
+    public String[] cipherSuites = {"TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA"};
+
+    public static enum InternodeEncryption
+    {
+        all,
+        none
+    }
+}

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=1060890&r1=1060889&r2=1060890&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java Wed Jan 19 18:10:35 2011
@@ -27,6 +27,9 @@ import java.net.Socket;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.EncryptionOptions;
+import org.apache.cassandra.security.streaming.SSLIncomingStreamReader;
 import org.apache.cassandra.streaming.IncomingStreamReader;
 import org.apache.cassandra.streaming.StreamHeader;
 
@@ -77,8 +80,7 @@ public class IncomingTcpConnection exten
                     int size = input.readInt();
                     byte[] headerBytes = new byte[size];
                     input.readFully(headerBytes);
-                    StreamHeader streamHeader = StreamHeader.serializer().deserialize(new DataInputStream(new ByteArrayInputStream(headerBytes)));
-                    new IncomingStreamReader(streamHeader, socket.getChannel()).read();
+                    stream(StreamHeader.serializer().deserialize(new DataInputStream(new ByteArrayInputStream(headerBytes))), input);
                     break;
                 }
                 else
@@ -124,4 +126,12 @@ public class IncomingTcpConnection exten
                 logger.debug("error closing socket", e);
         }
     }
+
+    private void stream(StreamHeader streamHeader, DataInputStream input) throws IOException
+    {
+        if (DatabaseDescriptor.getEncryptionOptions().internode_encryption == EncryptionOptions.InternodeEncryption.all)
+            new SSLIncomingStreamReader(streamHeader, socket, input).read();
+        else
+            new IncomingStreamReader(streamHeader, socket).read();
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=1060890&r1=1060889&r2=1060890&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Wed Jan 19 18:10:35 2011
@@ -46,12 +46,15 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.EncryptionOptions;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.locator.ILatencyPublisher;
 import org.apache.cassandra.locator.ILatencySubscriber;
 import org.apache.cassandra.net.io.SerializerType;
 import org.apache.cassandra.net.sink.SinkManager;
 import org.apache.cassandra.service.GCInspector;
+import org.apache.cassandra.security.SSLFactory;
+import org.apache.cassandra.security.streaming.SSLFileStreamTask;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.FileStreamTask;
 import org.apache.cassandra.streaming.StreamHeader;
@@ -175,16 +178,32 @@ public final class MessagingService impl
      * @param localEp InetAddress whose port to listen on.
      */
     public void listen(InetAddress localEp) throws IOException
-    {        
-        ServerSocketChannel serverChannel = ServerSocketChannel.open();
-        final ServerSocket ss = serverChannel.socket();
-        ss.setReuseAddress(true);
-        ss.bind(new InetSocketAddress(localEp, DatabaseDescriptor.getStoragePort()));
-        socketThread = new SocketThread(ss, "ACCEPT-" + localEp);
+    {
+        socketThread = new SocketThread(getServerSocket(localEp), "ACCEPT-" + localEp);
         socketThread.start();
         listenGate.signalAll();
     }
 
+    private ServerSocket getServerSocket(InetAddress localEp) throws IOException
+    {
+        final ServerSocket ss;
+        if (DatabaseDescriptor.getEncryptionOptions().internode_encryption == EncryptionOptions.InternodeEncryption.all)
+        {
+            ss = SSLFactory.getServerSocket(DatabaseDescriptor.getEncryptionOptions(), localEp, DatabaseDescriptor.getStoragePort());
+            // setReuseAddress happens in the factory.
+            logger_.info("Starting Encrypted Messaging Service on port {}", DatabaseDescriptor.getStoragePort());
+        }
+        else
+        {
+            ServerSocketChannel serverChannel = ServerSocketChannel.open();
+            ss = serverChannel.socket();
+            ss.setReuseAddress(true);
+            ss.bind(new InetSocketAddress(localEp, DatabaseDescriptor.getStoragePort()));
+            logger_.info("Starting Messaging Service on port {}", DatabaseDescriptor.getStoragePort());
+        }
+        return ss;
+    }
+
     public void waitUntilListening()
     {
         try
@@ -391,7 +410,10 @@ public final class MessagingService impl
     public void stream(StreamHeader header, InetAddress to)
     {
         /* Streaming asynchronously on streamExector_ threads. */
-        streamExecutor_.execute(new FileStreamTask(header, to));
+        if (DatabaseDescriptor.getEncryptionOptions().internode_encryption == EncryptionOptions.InternodeEncryption.all)
+            streamExecutor_.execute(new SSLFileStreamTask(header, to));
+        else
+            streamExecutor_.execute(new FileStreamTask(header, to));
     }
     
     public void register(ILatencySubscriber subcriber)

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java?rev=1060890&r1=1060889&r2=1060890&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java Wed Jan 19 18:10:35 2011
@@ -35,6 +35,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.EncryptionOptions;
+import org.apache.cassandra.security.SSLFactory;
 import org.apache.cassandra.utils.FBUtilities;
 
 public class OutboundTcpConnection extends Thread
@@ -163,7 +165,14 @@ public class OutboundTcpConnection exten
             try
             {
                 // zero means 'bind on any available port.'
-                socket = new Socket(endpoint, DatabaseDescriptor.getStoragePort(), FBUtilities.getLocalAddress(), 0);
+                if (DatabaseDescriptor.getEncryptionOptions().internode_encryption == EncryptionOptions.InternodeEncryption.all)
+                {
+                    socket = SSLFactory.getSocket(DatabaseDescriptor.getEncryptionOptions(), endpoint, DatabaseDescriptor.getStoragePort(), FBUtilities.getLocalAddress(), 0);
+                }
+                else {
+                    socket = new Socket(endpoint, DatabaseDescriptor.getStoragePort(), FBUtilities.getLocalAddress(), 0);
+                }
+
                 socket.setKeepAlive(true);
                 socket.setTcpNoDelay(true);
                 output = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream(), 4096));

Added: cassandra/trunk/src/java/org/apache/cassandra/security/SSLFactory.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/security/SSLFactory.java?rev=1060890&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/security/SSLFactory.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/security/SSLFactory.java Wed Jan 19 18:10:35 2011
@@ -0,0 +1,102 @@
+package org.apache.cassandra.security;
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.security.KeyStore;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLServerSocket;
+import javax.net.ssl.SSLSocket;
+import javax.net.ssl.TrustManagerFactory;
+
+import org.apache.cassandra.config.EncryptionOptions;
+
+/**
+ * A Factory for providing and setting up Client and Server SSL wrapped
+ * Socket and ServerSocket
+ */
+public final class SSLFactory
+{
+    private static final String PROTOCOL = "TLS";
+    private static final String ALGORITHM = "SunX509";
+    private static final String STORE_TYPE = "JKS";
+
+
+    public static SSLServerSocket getServerSocket(EncryptionOptions options, InetAddress address, int port) throws IOException
+    {
+        SSLContext ctx = createSSLContext(options);
+        SSLServerSocket serverSocket = (SSLServerSocket)ctx.getServerSocketFactory().createServerSocket();
+        serverSocket.setReuseAddress(true);
+        serverSocket.setEnabledCipherSuites(options.cipherSuites);
+        serverSocket.bind(new InetSocketAddress(address, port), 100);
+        return serverSocket;
+    }
+
+    /** Create a socket and connect */
+    public static SSLSocket getSocket(EncryptionOptions options, InetAddress address, int port, InetAddress localAddress, int localPort) throws IOException
+    {
+        SSLContext ctx = createSSLContext(options);
+        SSLSocket socket = (SSLSocket) ctx.getSocketFactory().createSocket(address, port, localAddress, localPort);
+        socket.setEnabledCipherSuites(options.cipherSuites);
+        return socket;
+    }
+
+    /** Just create a socket */
+    public static SSLSocket getSocket(EncryptionOptions options) throws IOException
+    {
+        SSLContext ctx = createSSLContext(options);
+        SSLSocket socket = (SSLSocket) ctx.getSocketFactory().createSocket();
+        socket.setEnabledCipherSuites(options.cipherSuites);
+        return socket;
+    }
+
+    private static SSLContext createSSLContext(EncryptionOptions options) throws IOException {
+        SSLContext ctx;
+        try {
+            ctx = SSLContext.getInstance(PROTOCOL);
+            TrustManagerFactory tmf = null;
+            KeyManagerFactory kmf = null;
+
+            tmf = TrustManagerFactory.getInstance(ALGORITHM);
+            KeyStore ts = KeyStore.getInstance(STORE_TYPE);
+            ts.load(new FileInputStream(options.truststore), options.truststore_password.toCharArray());
+            tmf.init(ts);
+
+            kmf = KeyManagerFactory.getInstance(ALGORITHM);
+            KeyStore ks = KeyStore.getInstance(STORE_TYPE);
+            ks.load(new FileInputStream(options.keystore), options.keystore_password.toCharArray());
+            kmf.init(ks, options.keystore_password.toCharArray());
+
+            ctx.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
+
+        } catch (Exception e) {
+            throw new IOException("Error creating the initializing the SSL Context", e);
+        }
+        return ctx;
+    }
+}

Added: cassandra/trunk/src/java/org/apache/cassandra/security/streaming/SSLFileStreamTask.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/security/streaming/SSLFileStreamTask.java?rev=1060890&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/security/streaming/SSLFileStreamTask.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/security/streaming/SSLFileStreamTask.java Wed Jan 19 18:10:35 2011
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.security.streaming;
+
+import java.io.*;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import org.apache.cassandra.security.SSLFactory;
+import org.apache.cassandra.streaming.FileStreamTask;
+import org.apache.cassandra.streaming.StreamHeader;
+import org.apache.cassandra.utils.FBUtilities;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.utils.Pair;
+
+/**
+ * This class uses a DataOutputStream to write data as opposed to a FileChannel.transferTo
+ * used by FileStreamTask because the underlying SSLSocket doesn't support
+ * encrypting over NIO SocketChannel.
+ */
+public class SSLFileStreamTask extends FileStreamTask
+{
+    private DataOutputStream output;
+    private Socket socket;
+    
+    public SSLFileStreamTask(StreamHeader header, InetAddress to)
+    {
+        super(header, to);
+    }
+
+    @Override
+    protected long write(FileChannel fc, Pair<Long, Long> section, long length, long bytesTransferred) throws IOException
+    {
+        int toTransfer = (int)Math.min(CHUNK_SIZE, length - bytesTransferred);
+        fc.position(section.left + bytesTransferred);
+        ByteBuffer buf = ByteBuffer.allocate(toTransfer);
+        fc.read(buf);
+        buf.flip();
+        output.write(buf.array(), 0, buf.limit());
+        output.flush();
+        return buf.limit();
+    }
+
+    @Override
+    protected void writeHeader(ByteBuffer buffer) throws IOException
+    {
+        output.write(buffer.array(), 0, buffer.limit());
+        output.flush();
+    }
+
+    @Override
+    protected void bind() throws IOException
+    {
+        socket = SSLFactory.getSocket(DatabaseDescriptor.getEncryptionOptions());
+        socket.bind(new InetSocketAddress(FBUtilities.getLocalAddress(), 0));
+    }
+
+    @Override
+    protected void connect() throws IOException
+    {
+        socket.connect(new InetSocketAddress(to, DatabaseDescriptor.getStoragePort()));
+        output = new DataOutputStream(socket.getOutputStream());
+    }
+
+    @Override
+    protected void close() throws IOException
+    {
+        socket.close();
+    }
+}

Added: cassandra/trunk/src/java/org/apache/cassandra/security/streaming/SSLIncomingStreamReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/security/streaming/SSLIncomingStreamReader.java?rev=1060890&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/security/streaming/SSLIncomingStreamReader.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/security/streaming/SSLIncomingStreamReader.java Wed Jan 19 18:10:35 2011
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.security.streaming;
+
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.io.IOException;
+import java.io.DataInputStream;
+
+import org.apache.cassandra.streaming.FileStreamTask;
+import org.apache.cassandra.streaming.IncomingStreamReader;
+import org.apache.cassandra.streaming.StreamHeader;
+
+/**
+ * This class uses a DataInputStream to read data as opposed to a FileChannel.transferFrom
+ * used by IncomingStreamReader because the underlying SSLServerSocket doesn't support
+ * encrypting over NIO SocketChannel.
+ */
+public class SSLIncomingStreamReader extends IncomingStreamReader
+{
+    private final DataInputStream input;
+
+    public SSLIncomingStreamReader(StreamHeader header, Socket socket, DataInputStream input) throws IOException
+    {
+        super(header, socket);
+        this.input = input;
+    }
+
+    @Override
+    protected long readnwrite(long length, long bytesRead, long offset, FileChannel fc) throws IOException
+    {
+        int toRead = (int)Math.min(FileStreamTask.CHUNK_SIZE, length - bytesRead);
+        ByteBuffer buf = ByteBuffer.allocate(toRead);
+        input.readFully(buf.array());
+        fc.write(buf);
+        bytesRead += buf.limit();
+        remoteFile.progress += buf.limit();
+        return bytesRead;
+    }
+}

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java?rev=1060890&r1=1060889&r2=1060890&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java Wed Jan 19 18:10:35 2011
@@ -47,8 +47,9 @@ public class FileStreamTask extends Wrap
     // around 10 minutes at the default rpctimeout
     public static final int MAX_CONNECT_ATTEMPTS = 8;
 
-    private final StreamHeader header;
-    private final InetAddress to;
+    protected final StreamHeader header;
+    protected final InetAddress to;
+    private SocketChannel channel;
     
     public FileStreamTask(StreamHeader header, InetAddress to)
     {
@@ -58,19 +59,18 @@ public class FileStreamTask extends Wrap
     
     public void runMayThrow() throws IOException
     {
-        SocketChannel channel = connect();
-
-        // successfully connected: stream.
-        // (at this point, if we fail, it is the receiver's job to re-request)
         try
         {
-            stream(channel);
+            connectAttempt();
+            // successfully connected: stream.
+            // (at this point, if we fail, it is the receiver's job to re-request)
+            stream();
         }
         finally
         {
             try
             {
-                channel.close();
+                close();
             }
             catch (IOException e)
             {
@@ -82,11 +82,11 @@ public class FileStreamTask extends Wrap
             logger.debug("Done streaming " + header.file);
     }
 
-    private void stream(SocketChannel channel) throws IOException
+    private void stream() throws IOException
     {
         ByteBuffer buffer = MessagingService.instance().constructStreamHeader(header, false);
-        channel.write(buffer);
-        assert buffer.remaining() == 0;
+        writeHeader(buffer);
+
         if (header.file == null)
             return;
 
@@ -101,8 +101,7 @@ public class FileStreamTask extends Wrap
                 long bytesTransferred = 0;
                 while (bytesTransferred < length)
                 {
-                    long toTransfer = Math.min(CHUNK_SIZE, length - bytesTransferred);
-                    long lastWrite = fc.transferTo(section.left + bytesTransferred, toTransfer, channel);
+                    long lastWrite = write(fc, section, length, bytesTransferred);
                     bytesTransferred += lastWrite;
                     header.file.progress += lastWrite;
                 }
@@ -116,24 +115,33 @@ public class FileStreamTask extends Wrap
         }
     }
 
+    protected long write(FileChannel fc, Pair<Long, Long> section, long length, long bytesTransferred) throws IOException
+    {
+        long toTransfer = Math.min(CHUNK_SIZE, length - bytesTransferred);
+        return fc.transferTo(section.left + bytesTransferred, toTransfer, channel);
+    }
+
+    protected void writeHeader(ByteBuffer buffer) throws IOException
+    {
+        channel.write(buffer);
+        assert buffer.remaining() == 0;
+    }
+
     /**
      * Connects to the destination, with backoff for failed attempts.
      * TODO: all nodes on a cluster must currently use the same storage port
      * @throws IOException If all attempts fail.
      */
-    private SocketChannel connect() throws IOException
+    private void connectAttempt() throws IOException
     {
-        SocketChannel channel = SocketChannel.open();
-        // force local binding on correctly specified interface.
-        channel.socket().bind(new InetSocketAddress(FBUtilities.getLocalAddress(), 0));
+        bind();
         int attempts = 0;
         while (true)
         {
             try
             {
-                channel.connect(new InetSocketAddress(to, DatabaseDescriptor.getStoragePort()));
-                // success
-                return channel;
+                connect();
+                break;
             }
             catch (IOException e)
             {
@@ -153,4 +161,21 @@ public class FileStreamTask extends Wrap
             }
         }
     }
+
+    protected void bind() throws IOException
+    {
+        channel = SocketChannel.open();
+        // force local binding on correctly specified interface.
+        channel.socket().bind(new InetSocketAddress(FBUtilities.getLocalAddress(), 0));
+    }
+
+    protected void connect() throws IOException
+    {
+        channel.connect(new InetSocketAddress(to, DatabaseDescriptor.getStoragePort()));
+    }
+
+    protected void close() throws IOException
+    {
+        channel.close();
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=1060890&r1=1060889&r2=1060890&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java Wed Jan 19 18:10:35 2011
@@ -22,6 +22,7 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.net.Socket;
 import java.nio.channels.FileChannel;
 import java.nio.channels.SocketChannel;
 
@@ -35,16 +36,15 @@ public class IncomingStreamReader
 {
     private static final Logger logger = LoggerFactory.getLogger(IncomingStreamReader.class);
 
-    private final PendingFile localFile;
-    private final PendingFile remoteFile;
+    protected final PendingFile localFile;
+    protected final PendingFile remoteFile;
     private final SocketChannel socketChannel;
-    private final StreamInSession session;
+    protected final StreamInSession session;
 
-    public IncomingStreamReader(StreamHeader header, SocketChannel socketChannel) throws IOException
+    public IncomingStreamReader(StreamHeader header, Socket socket) throws IOException
     {
-        this.socketChannel = socketChannel;
-        InetSocketAddress remoteAddress = (InetSocketAddress)socketChannel.socket().getRemoteSocketAddress();
-
+        this.socketChannel = socket.getChannel();
+        InetSocketAddress remoteAddress = (InetSocketAddress)socket.getRemoteSocketAddress();
         session = StreamInSession.get(remoteAddress.getAddress(), header.sessionId);
         session.addFiles(header.pendingFiles);
         // set the current file we are streaming so progress shows up in jmx
@@ -63,7 +63,7 @@ public class IncomingStreamReader
         session.closeIfFinished();
     }
 
-    private void readFile() throws IOException
+    protected void readFile() throws IOException
     {
         if (logger.isDebugEnabled())
         {
@@ -82,10 +82,7 @@ public class IncomingStreamReader
                 long bytesRead = 0;
                 while (bytesRead < length)
                 {
-                    long toRead = Math.min(FileStreamTask.CHUNK_SIZE, length - bytesRead);
-                    long lastRead = fc.transferFrom(socketChannel, offset + bytesRead, toRead);
-                    bytesRead += lastRead;
-                    remoteFile.progress += lastRead;
+                    bytesRead = readnwrite(length, bytesRead, offset, fc);
                 }
                 offset += length;
             }
@@ -106,4 +103,13 @@ public class IncomingStreamReader
 
         session.finished(remoteFile, localFile);
     }
+
+    protected long readnwrite(long length, long bytesRead, long offset, FileChannel fc) throws IOException
+    {
+        long toRead = Math.min(FileStreamTask.CHUNK_SIZE, length - bytesRead);
+        long lastRead = fc.transferFrom(socketChannel, offset + bytesRead, toRead);
+        bytesRead += lastRead;
+        remoteFile.progress += lastRead;
+        return bytesRead;
+    }
 }

Modified: cassandra/trunk/test/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/conf/cassandra.yaml?rev=1060890&r1=1060889&r2=1060890&view=diff
==============================================================================
--- cassandra/trunk/test/conf/cassandra.yaml (original)
+++ cassandra/trunk/test/conf/cassandra.yaml Wed Jan 19 18:10:35 2011
@@ -26,6 +26,12 @@ endpoint_snitch: org.apache.cassandra.lo
 dynamic_snitch: true
 request_scheduler: org.apache.cassandra.scheduler.RoundRobinScheduler
 request_scheduler_id: keyspace
+encryption_options:
+    internode_encryption: none
+    keystore: conf/.keystore
+    keystore_password: cassandra
+    truststore: conf/.truststore
+    truststore_password: cassandra
 keyspaces:
     - name: Keyspace1
       replica_placement_strategy: org.apache.cassandra.locator.SimpleStrategy