You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by gd...@apache.org on 2011/01/19 19:10:36 UTC
svn commit: r1060890 - in /cassandra/trunk: ./ conf/
src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/net/
src/java/org/apache/cassandra/security/
src/java/org/apache/cassandra/security/streaming/
src/java/org/apache/cassandra/stream...
Author: gdusbabek
Date: Wed Jan 19 18:10:35 2011
New Revision: 1060890
URL: http://svn.apache.org/viewvc?rev=1060890&view=rev
Log:
configurable internode encryption. patch by rnirmal, reviewed by gdusbabek. CASSANDRA-1567
Added:
cassandra/trunk/src/java/org/apache/cassandra/config/EncryptionOptions.java
cassandra/trunk/src/java/org/apache/cassandra/security/
cassandra/trunk/src/java/org/apache/cassandra/security/SSLFactory.java
cassandra/trunk/src/java/org/apache/cassandra/security/streaming/
cassandra/trunk/src/java/org/apache/cassandra/security/streaming/SSLFileStreamTask.java
cassandra/trunk/src/java/org/apache/cassandra/security/streaming/SSLIncomingStreamReader.java
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/conf/cassandra.yaml
cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
cassandra/trunk/test/conf/cassandra.yaml
Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1060890&r1=1060889&r2=1060890&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Wed Jan 19 18:10:35 2011
@@ -3,6 +3,7 @@
* adds support for columns that act as incr/decr counters
(CASSANDRA-1072, 1937, 1944)
* make NetworkTopologyStrategy the default (CASSANDRA-1960)
+ * configurable internode encryption (CASSANDRA-1567)
0.7.1-dev
Modified: cassandra/trunk/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra.yaml?rev=1060890&r1=1060889&r2=1060890&view=diff
==============================================================================
--- cassandra/trunk/conf/cassandra.yaml (original)
+++ cassandra/trunk/conf/cassandra.yaml Wed Jan 19 18:10:35 2011
@@ -289,6 +289,23 @@ request_scheduler: org.apache.cassandra.
# the index is at the cost of space.
index_interval: 128
+# Enable or disable inter-node encryption
+# Default settings are TLS v1, RSA 1024-bit keys (it is imperative that
+# users generate their own keys) TLS_RSA_WITH_AES_128_CBC_SHA as the cipher
+# suite for authentication, key exchange and encryption of the actual data transfers.
+# NOTE: No custom encryption options are enabled at the moment
+# The available internode options are : all, none
+#
+# The passwords used in these options must match the passwords used when generating
+# the keystore and truststore. For instructions on generating these files, see:
+# http://download.oracle.com/javase/6/docs/technotes/guides/security/jsse/JSSERefGuide.html#CreateKeystore
+encryption_options:
+ internode_encryption: none
+ keystore: conf/.keystore
+ keystore_password: cassandra
+ truststore: conf/.truststore
+ truststore_password: cassandra
+
# Keyspaces have ColumnFamilies. (Usually 1 KS per application.)
# ColumnFamilies have Rows. (Dozens of CFs per KS.)
# Rows contain Columns. (Many per CF.)
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/Config.java?rev=1060890&r1=1060889&r2=1060890&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/Config.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/Config.java Wed Jan 19 18:10:35 2011
@@ -100,6 +100,8 @@ public class Config
public RequestSchedulerId request_scheduler_id;
public RequestSchedulerOptions request_scheduler_options;
+ public EncryptionOptions encryption_options;
+
public Integer index_interval = 128;
public List<RawKeyspace> keyspaces;
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1060890&r1=1060889&r2=1060890&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Wed Jan 19 18:10:35 2011
@@ -54,7 +54,7 @@ import org.yaml.snakeyaml.TypeDescriptio
import org.yaml.snakeyaml.Yaml;
import org.yaml.snakeyaml.error.YAMLException;
-public class DatabaseDescriptor
+public class DatabaseDescriptor
{
private static Logger logger = LoggerFactory.getLogger(DatabaseDescriptor.class);
@@ -1139,8 +1139,14 @@ public class DatabaseDescriptor
{
return conf.dynamic_snitch_badness_threshold;
}
+
public static void setDynamicBadnessThreshold(Double dynamicBadnessThreshold)
{
conf.dynamic_snitch_badness_threshold = dynamicBadnessThreshold;
}
+
+ public static EncryptionOptions getEncryptionOptions()
+ {
+ return conf.encryption_options;
+ }
}
Added: cassandra/trunk/src/java/org/apache/cassandra/config/EncryptionOptions.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/EncryptionOptions.java?rev=1060890&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/EncryptionOptions.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/EncryptionOptions.java Wed Jan 19 18:10:35 2011
@@ -0,0 +1,38 @@
+package org.apache.cassandra.config;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+public class EncryptionOptions
+{
+ public InternodeEncryption internode_encryption = InternodeEncryption.none;
+ public String keystore = "conf/.keystore";
+ public String keystore_password = "cassandra";
+ public String truststore = "conf/.truststore";
+ public String truststore_password = "cassandra";
+ public String[] cipherSuites = {"TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA"};
+
+ public static enum InternodeEncryption
+ {
+ all,
+ none
+ }
+}
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=1060890&r1=1060889&r2=1060890&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java Wed Jan 19 18:10:35 2011
@@ -27,6 +27,9 @@ import java.net.Socket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.EncryptionOptions;
+import org.apache.cassandra.security.streaming.SSLIncomingStreamReader;
import org.apache.cassandra.streaming.IncomingStreamReader;
import org.apache.cassandra.streaming.StreamHeader;
@@ -77,8 +80,7 @@ public class IncomingTcpConnection exten
int size = input.readInt();
byte[] headerBytes = new byte[size];
input.readFully(headerBytes);
- StreamHeader streamHeader = StreamHeader.serializer().deserialize(new DataInputStream(new ByteArrayInputStream(headerBytes)));
- new IncomingStreamReader(streamHeader, socket.getChannel()).read();
+ stream(StreamHeader.serializer().deserialize(new DataInputStream(new ByteArrayInputStream(headerBytes))), input);
break;
}
else
@@ -124,4 +126,12 @@ public class IncomingTcpConnection exten
logger.debug("error closing socket", e);
}
}
+
+ private void stream(StreamHeader streamHeader, DataInputStream input) throws IOException
+ {
+ if (DatabaseDescriptor.getEncryptionOptions().internode_encryption == EncryptionOptions.InternodeEncryption.all)
+ new SSLIncomingStreamReader(streamHeader, socket, input).read();
+ else
+ new IncomingStreamReader(streamHeader, socket).read();
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=1060890&r1=1060889&r2=1060890&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Wed Jan 19 18:10:35 2011
@@ -46,12 +46,15 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.EncryptionOptions;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.locator.ILatencyPublisher;
import org.apache.cassandra.locator.ILatencySubscriber;
import org.apache.cassandra.net.io.SerializerType;
import org.apache.cassandra.net.sink.SinkManager;
import org.apache.cassandra.service.GCInspector;
+import org.apache.cassandra.security.SSLFactory;
+import org.apache.cassandra.security.streaming.SSLFileStreamTask;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.FileStreamTask;
import org.apache.cassandra.streaming.StreamHeader;
@@ -175,16 +178,32 @@ public final class MessagingService impl
* @param localEp InetAddress whose port to listen on.
*/
public void listen(InetAddress localEp) throws IOException
- {
- ServerSocketChannel serverChannel = ServerSocketChannel.open();
- final ServerSocket ss = serverChannel.socket();
- ss.setReuseAddress(true);
- ss.bind(new InetSocketAddress(localEp, DatabaseDescriptor.getStoragePort()));
- socketThread = new SocketThread(ss, "ACCEPT-" + localEp);
+ {
+ socketThread = new SocketThread(getServerSocket(localEp), "ACCEPT-" + localEp);
socketThread.start();
listenGate.signalAll();
}
+ private ServerSocket getServerSocket(InetAddress localEp) throws IOException
+ {
+ final ServerSocket ss;
+ if (DatabaseDescriptor.getEncryptionOptions().internode_encryption == EncryptionOptions.InternodeEncryption.all)
+ {
+ ss = SSLFactory.getServerSocket(DatabaseDescriptor.getEncryptionOptions(), localEp, DatabaseDescriptor.getStoragePort());
+ // setReuseAddress happens in the factory.
+ logger_.info("Starting Encrypted Messaging Service on port {}", DatabaseDescriptor.getStoragePort());
+ }
+ else
+ {
+ ServerSocketChannel serverChannel = ServerSocketChannel.open();
+ ss = serverChannel.socket();
+ ss.setReuseAddress(true);
+ ss.bind(new InetSocketAddress(localEp, DatabaseDescriptor.getStoragePort()));
+ logger_.info("Starting Messaging Service on port {}", DatabaseDescriptor.getStoragePort());
+ }
+ return ss;
+ }
+
public void waitUntilListening()
{
try
@@ -391,7 +410,10 @@ public final class MessagingService impl
public void stream(StreamHeader header, InetAddress to)
{
/* Streaming asynchronously on streamExector_ threads. */
- streamExecutor_.execute(new FileStreamTask(header, to));
+ if (DatabaseDescriptor.getEncryptionOptions().internode_encryption == EncryptionOptions.InternodeEncryption.all)
+ streamExecutor_.execute(new SSLFileStreamTask(header, to));
+ else
+ streamExecutor_.execute(new FileStreamTask(header, to));
}
public void register(ILatencySubscriber subcriber)
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java?rev=1060890&r1=1060889&r2=1060890&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java Wed Jan 19 18:10:35 2011
@@ -35,6 +35,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.EncryptionOptions;
+import org.apache.cassandra.security.SSLFactory;
import org.apache.cassandra.utils.FBUtilities;
public class OutboundTcpConnection extends Thread
@@ -163,7 +165,14 @@ public class OutboundTcpConnection exten
try
{
// zero means 'bind on any available port.'
- socket = new Socket(endpoint, DatabaseDescriptor.getStoragePort(), FBUtilities.getLocalAddress(), 0);
+ if (DatabaseDescriptor.getEncryptionOptions().internode_encryption == EncryptionOptions.InternodeEncryption.all)
+ {
+ socket = SSLFactory.getSocket(DatabaseDescriptor.getEncryptionOptions(), endpoint, DatabaseDescriptor.getStoragePort(), FBUtilities.getLocalAddress(), 0);
+ }
+ else {
+ socket = new Socket(endpoint, DatabaseDescriptor.getStoragePort(), FBUtilities.getLocalAddress(), 0);
+ }
+
socket.setKeepAlive(true);
socket.setTcpNoDelay(true);
output = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream(), 4096));
Added: cassandra/trunk/src/java/org/apache/cassandra/security/SSLFactory.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/security/SSLFactory.java?rev=1060890&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/security/SSLFactory.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/security/SSLFactory.java Wed Jan 19 18:10:35 2011
@@ -0,0 +1,102 @@
+package org.apache.cassandra.security;
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.security.KeyStore;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLServerSocket;
+import javax.net.ssl.SSLSocket;
+import javax.net.ssl.TrustManagerFactory;
+
+import org.apache.cassandra.config.EncryptionOptions;
+
+/**
+ * A Factory for providing and setting up Client and Server SSL wrapped
+ * Socket and ServerSocket
+ */
+public final class SSLFactory
+{
+ private static final String PROTOCOL = "TLS";
+ private static final String ALGORITHM = "SunX509";
+ private static final String STORE_TYPE = "JKS";
+
+
+ public static SSLServerSocket getServerSocket(EncryptionOptions options, InetAddress address, int port) throws IOException
+ {
+ SSLContext ctx = createSSLContext(options);
+ SSLServerSocket serverSocket = (SSLServerSocket)ctx.getServerSocketFactory().createServerSocket();
+ serverSocket.setReuseAddress(true);
+ serverSocket.setEnabledCipherSuites(options.cipherSuites);
+ serverSocket.bind(new InetSocketAddress(address, port), 100);
+ return serverSocket;
+ }
+
+ /** Create a socket and connect */
+ public static SSLSocket getSocket(EncryptionOptions options, InetAddress address, int port, InetAddress localAddress, int localPort) throws IOException
+ {
+ SSLContext ctx = createSSLContext(options);
+ SSLSocket socket = (SSLSocket) ctx.getSocketFactory().createSocket(address, port, localAddress, localPort);
+ socket.setEnabledCipherSuites(options.cipherSuites);
+ return socket;
+ }
+
+ /** Just create a socket */
+ public static SSLSocket getSocket(EncryptionOptions options) throws IOException
+ {
+ SSLContext ctx = createSSLContext(options);
+ SSLSocket socket = (SSLSocket) ctx.getSocketFactory().createSocket();
+ socket.setEnabledCipherSuites(options.cipherSuites);
+ return socket;
+ }
+
+ private static SSLContext createSSLContext(EncryptionOptions options) throws IOException {
+ SSLContext ctx;
+ try {
+ ctx = SSLContext.getInstance(PROTOCOL);
+ TrustManagerFactory tmf = null;
+ KeyManagerFactory kmf = null;
+
+ tmf = TrustManagerFactory.getInstance(ALGORITHM);
+ KeyStore ts = KeyStore.getInstance(STORE_TYPE);
+ ts.load(new FileInputStream(options.truststore), options.truststore_password.toCharArray());
+ tmf.init(ts);
+
+ kmf = KeyManagerFactory.getInstance(ALGORITHM);
+ KeyStore ks = KeyStore.getInstance(STORE_TYPE);
+ ks.load(new FileInputStream(options.keystore), options.keystore_password.toCharArray());
+ kmf.init(ks, options.keystore_password.toCharArray());
+
+ ctx.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
+
+ } catch (Exception e) {
+ throw new IOException("Error creating the initializing the SSL Context", e);
+ }
+ return ctx;
+ }
+}
Added: cassandra/trunk/src/java/org/apache/cassandra/security/streaming/SSLFileStreamTask.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/security/streaming/SSLFileStreamTask.java?rev=1060890&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/security/streaming/SSLFileStreamTask.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/security/streaming/SSLFileStreamTask.java Wed Jan 19 18:10:35 2011
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.security.streaming;
+
+import java.io.*;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import org.apache.cassandra.security.SSLFactory;
+import org.apache.cassandra.streaming.FileStreamTask;
+import org.apache.cassandra.streaming.StreamHeader;
+import org.apache.cassandra.utils.FBUtilities;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.utils.Pair;
+
+/**
+ * This class uses a DataOutputStream to write data as opposed to a FileChannel.transferTo
+ * used by FileStreamTask because the underlying SSLSocket doesn't support
+ * encrypting over NIO SocketChannel.
+ */
+public class SSLFileStreamTask extends FileStreamTask
+{
+ private DataOutputStream output;
+ private Socket socket;
+
+ public SSLFileStreamTask(StreamHeader header, InetAddress to)
+ {
+ super(header, to);
+ }
+
+ @Override
+ protected long write(FileChannel fc, Pair<Long, Long> section, long length, long bytesTransferred) throws IOException
+ {
+ int toTransfer = (int)Math.min(CHUNK_SIZE, length - bytesTransferred);
+ fc.position(section.left + bytesTransferred);
+ ByteBuffer buf = ByteBuffer.allocate(toTransfer);
+ fc.read(buf);
+ buf.flip();
+ output.write(buf.array(), 0, buf.limit());
+ output.flush();
+ return buf.limit();
+ }
+
+ @Override
+ protected void writeHeader(ByteBuffer buffer) throws IOException
+ {
+ output.write(buffer.array(), 0, buffer.limit());
+ output.flush();
+ }
+
+ @Override
+ protected void bind() throws IOException
+ {
+ socket = SSLFactory.getSocket(DatabaseDescriptor.getEncryptionOptions());
+ socket.bind(new InetSocketAddress(FBUtilities.getLocalAddress(), 0));
+ }
+
+ @Override
+ protected void connect() throws IOException
+ {
+ socket.connect(new InetSocketAddress(to, DatabaseDescriptor.getStoragePort()));
+ output = new DataOutputStream(socket.getOutputStream());
+ }
+
+ @Override
+ protected void close() throws IOException
+ {
+ socket.close();
+ }
+}
Added: cassandra/trunk/src/java/org/apache/cassandra/security/streaming/SSLIncomingStreamReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/security/streaming/SSLIncomingStreamReader.java?rev=1060890&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/security/streaming/SSLIncomingStreamReader.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/security/streaming/SSLIncomingStreamReader.java Wed Jan 19 18:10:35 2011
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.security.streaming;
+
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.io.IOException;
+import java.io.DataInputStream;
+
+import org.apache.cassandra.streaming.FileStreamTask;
+import org.apache.cassandra.streaming.IncomingStreamReader;
+import org.apache.cassandra.streaming.StreamHeader;
+
+/**
+ * This class uses a DataInputStream to read data as opposed to a FileChannel.transferFrom
+ * used by IncomingStreamReader because the underlying SSLServerSocket doesn't support
+ * encrypting over NIO SocketChannel.
+ */
+public class SSLIncomingStreamReader extends IncomingStreamReader
+{
+ private final DataInputStream input;
+
+ public SSLIncomingStreamReader(StreamHeader header, Socket socket, DataInputStream input) throws IOException
+ {
+ super(header, socket);
+ this.input = input;
+ }
+
+ @Override
+ protected long readnwrite(long length, long bytesRead, long offset, FileChannel fc) throws IOException
+ {
+ int toRead = (int)Math.min(FileStreamTask.CHUNK_SIZE, length - bytesRead);
+ ByteBuffer buf = ByteBuffer.allocate(toRead);
+ input.readFully(buf.array());
+ fc.write(buf);
+ bytesRead += buf.limit();
+ remoteFile.progress += buf.limit();
+ return bytesRead;
+ }
+}
Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java?rev=1060890&r1=1060889&r2=1060890&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java Wed Jan 19 18:10:35 2011
@@ -47,8 +47,9 @@ public class FileStreamTask extends Wrap
// around 10 minutes at the default rpctimeout
public static final int MAX_CONNECT_ATTEMPTS = 8;
- private final StreamHeader header;
- private final InetAddress to;
+ protected final StreamHeader header;
+ protected final InetAddress to;
+ private SocketChannel channel;
public FileStreamTask(StreamHeader header, InetAddress to)
{
@@ -58,19 +59,18 @@ public class FileStreamTask extends Wrap
public void runMayThrow() throws IOException
{
- SocketChannel channel = connect();
-
- // successfully connected: stream.
- // (at this point, if we fail, it is the receiver's job to re-request)
try
{
- stream(channel);
+ connectAttempt();
+ // successfully connected: stream.
+ // (at this point, if we fail, it is the receiver's job to re-request)
+ stream();
}
finally
{
try
{
- channel.close();
+ close();
}
catch (IOException e)
{
@@ -82,11 +82,11 @@ public class FileStreamTask extends Wrap
logger.debug("Done streaming " + header.file);
}
- private void stream(SocketChannel channel) throws IOException
+ private void stream() throws IOException
{
ByteBuffer buffer = MessagingService.instance().constructStreamHeader(header, false);
- channel.write(buffer);
- assert buffer.remaining() == 0;
+ writeHeader(buffer);
+
if (header.file == null)
return;
@@ -101,8 +101,7 @@ public class FileStreamTask extends Wrap
long bytesTransferred = 0;
while (bytesTransferred < length)
{
- long toTransfer = Math.min(CHUNK_SIZE, length - bytesTransferred);
- long lastWrite = fc.transferTo(section.left + bytesTransferred, toTransfer, channel);
+ long lastWrite = write(fc, section, length, bytesTransferred);
bytesTransferred += lastWrite;
header.file.progress += lastWrite;
}
@@ -116,24 +115,33 @@ public class FileStreamTask extends Wrap
}
}
+ protected long write(FileChannel fc, Pair<Long, Long> section, long length, long bytesTransferred) throws IOException
+ {
+ long toTransfer = Math.min(CHUNK_SIZE, length - bytesTransferred);
+ return fc.transferTo(section.left + bytesTransferred, toTransfer, channel);
+ }
+
+ protected void writeHeader(ByteBuffer buffer) throws IOException
+ {
+ channel.write(buffer);
+ assert buffer.remaining() == 0;
+ }
+
/**
* Connects to the destination, with backoff for failed attempts.
* TODO: all nodes on a cluster must currently use the same storage port
* @throws IOException If all attempts fail.
*/
- private SocketChannel connect() throws IOException
+ private void connectAttempt() throws IOException
{
- SocketChannel channel = SocketChannel.open();
- // force local binding on correctly specified interface.
- channel.socket().bind(new InetSocketAddress(FBUtilities.getLocalAddress(), 0));
+ bind();
int attempts = 0;
while (true)
{
try
{
- channel.connect(new InetSocketAddress(to, DatabaseDescriptor.getStoragePort()));
- // success
- return channel;
+ connect();
+ break;
}
catch (IOException e)
{
@@ -153,4 +161,21 @@ public class FileStreamTask extends Wrap
}
}
}
+
+ protected void bind() throws IOException
+ {
+ channel = SocketChannel.open();
+ // force local binding on correctly specified interface.
+ channel.socket().bind(new InetSocketAddress(FBUtilities.getLocalAddress(), 0));
+ }
+
+ protected void connect() throws IOException
+ {
+ channel.connect(new InetSocketAddress(to, DatabaseDescriptor.getStoragePort()));
+ }
+
+ protected void close() throws IOException
+ {
+ channel.close();
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=1060890&r1=1060889&r2=1060890&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java Wed Jan 19 18:10:35 2011
@@ -22,6 +22,7 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.net.Socket;
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;
@@ -35,16 +36,15 @@ public class IncomingStreamReader
{
private static final Logger logger = LoggerFactory.getLogger(IncomingStreamReader.class);
- private final PendingFile localFile;
- private final PendingFile remoteFile;
+ protected final PendingFile localFile;
+ protected final PendingFile remoteFile;
private final SocketChannel socketChannel;
- private final StreamInSession session;
+ protected final StreamInSession session;
- public IncomingStreamReader(StreamHeader header, SocketChannel socketChannel) throws IOException
+ public IncomingStreamReader(StreamHeader header, Socket socket) throws IOException
{
- this.socketChannel = socketChannel;
- InetSocketAddress remoteAddress = (InetSocketAddress)socketChannel.socket().getRemoteSocketAddress();
-
+ this.socketChannel = socket.getChannel();
+ InetSocketAddress remoteAddress = (InetSocketAddress)socket.getRemoteSocketAddress();
session = StreamInSession.get(remoteAddress.getAddress(), header.sessionId);
session.addFiles(header.pendingFiles);
// set the current file we are streaming so progress shows up in jmx
@@ -63,7 +63,7 @@ public class IncomingStreamReader
session.closeIfFinished();
}
- private void readFile() throws IOException
+ protected void readFile() throws IOException
{
if (logger.isDebugEnabled())
{
@@ -82,10 +82,7 @@ public class IncomingStreamReader
long bytesRead = 0;
while (bytesRead < length)
{
- long toRead = Math.min(FileStreamTask.CHUNK_SIZE, length - bytesRead);
- long lastRead = fc.transferFrom(socketChannel, offset + bytesRead, toRead);
- bytesRead += lastRead;
- remoteFile.progress += lastRead;
+ bytesRead = readnwrite(length, bytesRead, offset, fc);
}
offset += length;
}
@@ -106,4 +103,13 @@ public class IncomingStreamReader
session.finished(remoteFile, localFile);
}
+
+ protected long readnwrite(long length, long bytesRead, long offset, FileChannel fc) throws IOException
+ {
+ long toRead = Math.min(FileStreamTask.CHUNK_SIZE, length - bytesRead);
+ long lastRead = fc.transferFrom(socketChannel, offset + bytesRead, toRead);
+ bytesRead += lastRead;
+ remoteFile.progress += lastRead;
+ return bytesRead;
+ }
}
Modified: cassandra/trunk/test/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/conf/cassandra.yaml?rev=1060890&r1=1060889&r2=1060890&view=diff
==============================================================================
--- cassandra/trunk/test/conf/cassandra.yaml (original)
+++ cassandra/trunk/test/conf/cassandra.yaml Wed Jan 19 18:10:35 2011
@@ -26,6 +26,12 @@ endpoint_snitch: org.apache.cassandra.lo
dynamic_snitch: true
request_scheduler: org.apache.cassandra.scheduler.RoundRobinScheduler
request_scheduler_id: keyspace
+encryption_options:
+ internode_encryption: none
+ keystore: conf/.keystore
+ keystore_password: cassandra
+ truststore: conf/.truststore
+ truststore_password: cassandra
keyspaces:
- name: Keyspace1
replica_placement_strategy: org.apache.cassandra.locator.SimpleStrategy