You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2014/08/15 20:27:00 UTC

[3/6] git commit: Fix sstableloader unable to connect encrypted node

Fix sstableloader unable to connect encrypted node

patch by yukim; reviewed by krummas for CASSANDRA-7585


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/563cea14
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/563cea14
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/563cea14

Branch: refs/heads/trunk
Commit: 563cea14b4bb87cd37ab10399904f08757c34d27
Parents: ad6ba3d
Author: Yuki Morishita <yu...@apache.org>
Authored: Fri Aug 15 12:31:59 2014 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Fri Aug 15 12:31:59 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../config/YamlConfigurationLoader.java         |   6 +-
 .../cassandra/io/sstable/SSTableLoader.java     |  22 ++-
 .../cassandra/streaming/ConnectionHandler.java  |  48 +-----
 .../streaming/DefaultConnectionFactory.java     |  74 +++++++++
 .../streaming/StreamConnectionFactory.java      |  30 ++++
 .../apache/cassandra/streaming/StreamPlan.java  |  16 +-
 .../cassandra/streaming/StreamResultFuture.java |   2 +-
 .../cassandra/streaming/StreamSession.java      |  13 +-
 .../tools/BulkLoadConnectionFactory.java        |  68 +++++++++
 .../org/apache/cassandra/tools/BulkLoader.java  | 149 +++++++++++++------
 .../streaming/StreamTransferTaskTest.java       |   2 +-
 12 files changed, 330 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/563cea14/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4306de5..e335484 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -47,6 +47,7 @@
  * Backport CASSANDRA-6747 (CASSANDRA-7560)
  * Track max/min timestamps for range tombstones (CASSANDRA-7647)
  * Fix NPE when listing saved caches dir (CASSANDRA-7632)
+ * Fix sstableloader unable to connect encrypted node (CASSANDRA-7585)
 Merged from 1.2:
  * Remove duplicates from StorageService.getJoiningNodes (CASSANDRA-7478)
  * Clone token map outside of hot gossip loops (CASSANDRA-7758)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/563cea14/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java b/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
index 6b5a152..b520d07 100644
--- a/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
+++ b/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
@@ -69,10 +69,14 @@ public class YamlConfigurationLoader implements ConfigurationLoader
 
     public Config loadConfig() throws ConfigurationException
     {
+        return loadConfig(getStorageConfigURL());
+    }
+
+    public Config loadConfig(URL url) throws ConfigurationException
+    {
         InputStream input = null;
         try
         {
-            URL url = getStorageConfigURL();
             logger.info("Loading settings from {}", url);
             try
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/563cea14/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 4a1604d..85dc0e4 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -50,7 +50,7 @@ public class SSTableLoader implements StreamEventHandler
     private final OutputHandler outputHandler;
     private final Set<InetAddress> failedHosts = new HashSet<>();
 
-    private final List<SSTableReader> sstables = new ArrayList<SSTableReader>();
+    private final List<SSTableReader> sstables = new ArrayList<>();
     private final Multimap<InetAddress, StreamSession.SSTableStreamingSections> streamingDetails = HashMultimap.create();
 
     static
@@ -94,7 +94,7 @@ public class SSTableLoader implements StreamEventHandler
                     return false;
                 }
 
-                Set<Component> components = new HashSet<Component>();
+                Set<Component> components = new HashSet<>();
                 components.add(Component.DATA);
                 components.add(Component.PRIMARY_INDEX);
                 if (new File(desc.filenameFor(Component.SUMMARY)).exists())
@@ -149,7 +149,7 @@ public class SSTableLoader implements StreamEventHandler
         client.init(keyspace);
         outputHandler.output("Established connection to initial hosts");
 
-        StreamPlan plan = new StreamPlan("Bulk Load");
+        StreamPlan plan = new StreamPlan("Bulk Load").connectionFactory(client.getConnectionFactory());
 
         Map<InetAddress, Collection<Range<Token>>> endpointToRanges = client.getEndpointToRangesMap();
         openSSTables(endpointToRanges);
@@ -220,7 +220,7 @@ public class SSTableLoader implements StreamEventHandler
 
     public static abstract class Client
     {
-        private final Map<InetAddress, Collection<Range<Token>>> endpointToRanges = new HashMap<InetAddress, Collection<Range<Token>>>();
+        private final Map<InetAddress, Collection<Range<Token>>> endpointToRanges = new HashMap<>();
         private IPartitioner partitioner;
 
         /**
@@ -240,6 +240,17 @@ public class SSTableLoader implements StreamEventHandler
         public void stop() {}
 
         /**
+         * Provides connection factory.
+         * By default, it uses DefaultConnectionFactory.
+         *
+         * @return StreamConnectionFactory to use
+         */
+        public StreamConnectionFactory getConnectionFactory()
+        {
+            return new DefaultConnectionFactory();
+        }
+
+        /**
          * Validate that {@code keyspace} is an existing keyspace and {@code
          * cfName} one of its existing column family.
          */
@@ -258,6 +269,7 @@ public class SSTableLoader implements StreamEventHandler
         protected void setPartitioner(IPartitioner partitioner)
         {
             this.partitioner = partitioner;
+            // the following is still necessary since Range/Token reference partitioner through StorageService.getPartitioner
             DatabaseDescriptor.setPartitioner(partitioner);
         }
 
@@ -271,7 +283,7 @@ public class SSTableLoader implements StreamEventHandler
             Collection<Range<Token>> ranges = endpointToRanges.get(endpoint);
             if (ranges == null)
             {
-                ranges = new HashSet<Range<Token>>();
+                ranges = new HashSet<>();
                 endpointToRanges.put(endpoint, ranges);
             }
             ranges.add(range);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/563cea14/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index b06a818..8fba41b 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.streaming;
 
 import java.io.IOException;
-import java.net.InetAddress;
 import java.net.Socket;
 import java.net.SocketException;
 import java.nio.ByteBuffer;
@@ -37,8 +36,6 @@ import com.google.common.util.concurrent.SettableFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.net.OutboundTcpConnectionPool;
 import org.apache.cassandra.streaming.messages.StreamInitMessage;
 import org.apache.cassandra.streaming.messages.StreamMessage;
 import org.apache.cassandra.utils.FBUtilities;
@@ -55,8 +52,6 @@ public class ConnectionHandler
 {
     private static final Logger logger = LoggerFactory.getLogger(ConnectionHandler.class);
 
-    private static final int MAX_CONNECT_ATTEMPTS = 3;
-
     private final StreamSession session;
 
     private IncomingMessageHandler incoming;
@@ -79,12 +74,12 @@ public class ConnectionHandler
     public void initiate() throws IOException
     {
         logger.debug("[Stream #{}] Sending stream init for incoming stream", session.planId());
-        Socket incomingSocket = connect(session.peer);
+        Socket incomingSocket = session.createConnection();
         incoming.start(incomingSocket, StreamMessage.CURRENT_VERSION);
         incoming.sendInitMessage(incomingSocket, true);
 
         logger.debug("[Stream #{}] Sending stream init for outgoing stream", session.planId());
-        Socket outgoingSocket = connect(session.peer);
+        Socket outgoingSocket = session.createConnection();
         outgoing.start(outgoingSocket, StreamMessage.CURRENT_VERSION);
         outgoing.sendInitMessage(outgoingSocket, false);
     }
@@ -104,45 +99,6 @@ public class ConnectionHandler
             incoming.start(socket, version);
     }
 
-    /**
-     * Connect to peer and start exchanging message.
-     * When connect attempt fails, this retries for maximum of MAX_CONNECT_ATTEMPTS times.
-     *
-     * @param peer the peer to connect to.
-     * @return the created socket.
-     *
-     * @throws IOException when connection failed.
-     */
-    private static Socket connect(InetAddress peer) throws IOException
-    {
-        int attempts = 0;
-        while (true)
-        {
-            try
-            {
-                Socket socket = OutboundTcpConnectionPool.newSocket(peer);
-                socket.setSoTimeout(DatabaseDescriptor.getStreamingSocketTimeout());
-                return socket;
-            }
-            catch (IOException e)
-            {
-                if (++attempts >= MAX_CONNECT_ATTEMPTS)
-                    throw e;
-
-                long waitms = DatabaseDescriptor.getRpcTimeout() * (long)Math.pow(2, attempts);
-                logger.warn("Failed attempt " + attempts + " to connect to " + peer + ". Retrying in " + waitms + " ms. (" + e + ")");
-                try
-                {
-                    Thread.sleep(waitms);
-                }
-                catch (InterruptedException wtf)
-                {
-                    throw new IOException("interrupted", wtf);
-                }
-            }
-        }
-    }
-
     public ListenableFuture<?> close()
     {
         logger.debug("[Stream #{}] Closing stream connection handler on {}", session.planId(), session.peer);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/563cea14/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java b/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java
new file mode 100644
index 0000000..53af4c8
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.OutboundTcpConnectionPool;
+
+public class DefaultConnectionFactory implements StreamConnectionFactory
+{
+    private static final Logger logger = LoggerFactory.getLogger(DefaultConnectionFactory.class);
+
+    private static final int MAX_CONNECT_ATTEMPTS = 3;
+
+    /**
+     * Connect to peer and start exchanging message.
+     * When connect attempt fails, this retries for maximum of MAX_CONNECT_ATTEMPTS times.
+     *
+     * @param peer the peer to connect to.
+     * @return the created socket.
+     *
+     * @throws IOException when connection failed.
+     */
+    public Socket createConnection(InetAddress peer) throws IOException
+    {
+        int attempts = 0;
+        while (true)
+        {
+            try
+            {
+                Socket socket = OutboundTcpConnectionPool.newSocket(peer);
+                socket.setSoTimeout(DatabaseDescriptor.getStreamingSocketTimeout());
+                return socket;
+            }
+            catch (IOException e)
+            {
+                if (++attempts >= MAX_CONNECT_ATTEMPTS)
+                    throw e;
+
+                long waitms = DatabaseDescriptor.getRpcTimeout() * (long)Math.pow(2, attempts);
+                logger.warn("Failed attempt " + attempts + " to connect to " + peer + ". Retrying in " + waitms + " ms. (" + e + ")");
+                try
+                {
+                    Thread.sleep(waitms);
+                }
+                catch (InterruptedException wtf)
+                {
+                    throw new IOException("interrupted", wtf);
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/563cea14/src/java/org/apache/cassandra/streaming/StreamConnectionFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamConnectionFactory.java b/src/java/org/apache/cassandra/streaming/StreamConnectionFactory.java
new file mode 100644
index 0000000..dd99611
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/StreamConnectionFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+
+/**
+ * Interface that creates connection used by streaming.
+ */
+public interface StreamConnectionFactory
+{
+    Socket createConnection(InetAddress peer) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/563cea14/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java
index b57e097..e582c79 100644
--- a/src/java/org/apache/cassandra/streaming/StreamPlan.java
+++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java
@@ -38,6 +38,8 @@ public class StreamPlan
     // sessions per InetAddress of the other end.
     private final Map<InetAddress, StreamSession> sessions = new HashMap<>();
 
+    private StreamConnectionFactory connectionFactory = new DefaultConnectionFactory();
+
     private boolean flushBeforeTransfer = true;
 
     /**
@@ -132,6 +134,18 @@ public class StreamPlan
     }
 
     /**
+     * Set custom StreamConnectionFactory to be used for establishing connection
+     *
+     * @param factory StreamConnectionFactory to use
+     * @return self
+     */
+    public StreamPlan connectionFactory(StreamConnectionFactory factory)
+    {
+        this.connectionFactory = factory;
+        return this;
+    }
+
+    /**
      * @return true if this plan has no plan to execute
      */
     public boolean isEmpty()
@@ -167,7 +181,7 @@ public class StreamPlan
         StreamSession session = sessions.get(peer);
         if (session == null)
         {
-            session = new StreamSession(peer);
+            session = new StreamSession(peer, connectionFactory);
             sessions.put(peer, session);
         }
         return session;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/563cea14/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index dcffaff..add14f7 100644
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@ -106,7 +106,7 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
         StreamResultFuture future = StreamManager.instance.getReceivingStream(planId);
         if (future == null)
         {
-            final StreamSession session = new StreamSession(from);
+            final StreamSession session = new StreamSession(from, null);
 
             // The main reason we create a StreamResultFuture on the receiving side is for JMX exposure.
             future = new StreamResultFuture(planId, description, Collections.singleton(session));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/563cea14/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 55e30f0..4fcbe36 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.streaming;
 
 import java.io.IOException;
 import java.net.InetAddress;
+import java.net.Socket;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -128,6 +129,8 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
     // data receivers, filled after receiving prepare message
     private final Map<UUID, StreamReceiveTask> receivers = new ConcurrentHashMap<>();
     private final StreamingMetrics metrics;
+    /* can be null when session is created in remote */
+    private final StreamConnectionFactory factory;
 
     public final ConnectionHandler handler;
 
@@ -152,10 +155,12 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
      * Create new streaming session with the peer.
      *
      * @param peer Address of streaming peer
+     * @param factory is used for establishing connection
      */
-    public StreamSession(InetAddress peer)
+    public StreamSession(InetAddress peer, StreamConnectionFactory factory)
     {
         this.peer = peer;
+        this.factory = factory;
         this.handler = new ConnectionHandler(this);
         this.metrics = StreamingMetrics.get(peer);
     }
@@ -211,6 +216,12 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
         });
     }
 
+    public Socket createConnection() throws IOException
+    {
+        assert factory != null;
+        return factory.createConnection(peer);
+    }
+
     /**
      * Request data fetch task to this session.
      *

http://git-wip-us.apache.org/repos/asf/cassandra/blob/563cea14/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java b/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java
new file mode 100644
index 0000000..399344e
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.tools;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.channels.SocketChannel;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.cassandra.config.EncryptionOptions;
+import org.apache.cassandra.security.SSLFactory;
+import org.apache.cassandra.streaming.StreamConnectionFactory;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class BulkLoadConnectionFactory implements StreamConnectionFactory
+{
+    private final boolean outboundBindAny;
+    private final int storagePort;
+    private final int secureStoragePort;
+    private final EncryptionOptions.ServerEncryptionOptions encryptionOptions;
+
+    public BulkLoadConnectionFactory(int storagePort, int secureStoragePort, EncryptionOptions.ServerEncryptionOptions encryptionOptions, boolean outboundBindAny)
+    {
+        this.storagePort = storagePort;
+        this.secureStoragePort = secureStoragePort;
+        this.encryptionOptions = encryptionOptions;
+        this.outboundBindAny = outboundBindAny;
+    }
+
+    public Socket createConnection(InetAddress peer) throws IOException
+    {
+        // Connect to secure port for all peers if ServerEncryptionOptions is configured other than 'none'
+        // When 'all', 'dc' and 'rack', server nodes always have SSL port open, and since thin client like sstableloader
+        // does not know which node is in which dc/rack, connecting to SSL port is always the option.
+        if (encryptionOptions != null && encryptionOptions.internode_encryption != EncryptionOptions.ServerEncryptionOptions.InternodeEncryption.none)
+        {
+            if (outboundBindAny)
+                return SSLFactory.getSocket(encryptionOptions, peer, secureStoragePort);
+            else
+                return SSLFactory.getSocket(encryptionOptions, peer, secureStoragePort, FBUtilities.getLocalAddress(), 0);
+        }
+        else
+        {
+            Socket socket = SocketChannel.open(new InetSocketAddress(peer, storagePort)).socket();
+            if (outboundBindAny && !socket.isBound())
+                socket.bind(new InetSocketAddress(FBUtilities.getLocalAddress(), 0));
+            return socket;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/563cea14/src/java/org/apache/cassandra/tools/BulkLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/BulkLoader.java b/src/java/org/apache/cassandra/tools/BulkLoader.java
index 37ec635..4077722 100644
--- a/src/java/org/apache/cassandra/tools/BulkLoader.java
+++ b/src/java/org/apache/cassandra/tools/BulkLoader.java
@@ -18,29 +18,27 @@
 package org.apache.cassandra.tools;
 
 import java.io.File;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
+import java.net.*;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.base.Joiner;
 import com.google.common.collect.Sets;
-import org.apache.cassandra.config.EncryptionOptions;
+
+import org.apache.cassandra.config.*;
+
 import org.apache.commons.cli.*;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransport;
 
 import org.apache.cassandra.auth.IAuthenticator;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.SSTableLoader;
 import org.apache.cassandra.streaming.*;
 import org.apache.cassandra.thrift.*;
@@ -60,7 +58,10 @@ public class BulkLoader
     private static final String USER_OPTION = "username";
     private static final String PASSWD_OPTION = "password";
     private static final String THROTTLE_MBITS = "throttle";
+
     private static final String TRANSPORT_FACTORY = "transport-factory";
+
+    /* client encryption options */
     private static final String SSL_TRUSTSTORE = "truststore";
     private static final String SSL_TRUSTSTORE_PW = "truststore-password";
     private static final String SSL_KEYSTORE = "keystore";
@@ -69,12 +70,20 @@ public class BulkLoader
     private static final String SSL_ALGORITHM = "ssl-alg";
     private static final String SSL_STORE_TYPE = "store-type";
     private static final String SSL_CIPHER_SUITES = "ssl-ciphers";
+    private static final String CONFIG_PATH = "conf-path";
 
     public static void main(String args[])
     {
         LoaderOptions options = LoaderOptions.parseArgs(args);
         OutputHandler handler = new OutputHandler.SystemOutput(options.verbose, options.debug);
-        SSTableLoader loader = new SSTableLoader(options.directory, new ExternalClient(options.hosts, options.rpcPort, options.user, options.passwd, options.transportFactory), handler);
+        SSTableLoader loader = new SSTableLoader(options.directory, new ExternalClient(options.hosts,
+                                                                                       options.rpcPort,
+                                                                                       options.user,
+                                                                                       options.passwd,
+                                                                                       options.transportFactory,
+                                                                                       options.storagePort,
+                                                                                       options.sslStoragePort,
+                                                                                       options.serverEncOptions), handler);
         DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(options.throttle);
         StreamResultFuture future = null;
         try
@@ -207,8 +216,18 @@ public class BulkLoader
         private final String user;
         private final String passwd;
         private final ITransportFactory transportFactory;
-
-        public ExternalClient(Set<InetAddress> hosts, int port, String user, String passwd, ITransportFactory transportFactory)
+        private final int storagePort;
+        private final int sslStoragePort;
+        private final EncryptionOptions.ServerEncryptionOptions serverEncOptions;
+
+        public ExternalClient(Set<InetAddress> hosts,
+                              int port,
+                              String user,
+                              String passwd,
+                              ITransportFactory transportFactory,
+                              int storagePort,
+                              int sslStoragePort,
+                              EncryptionOptions.ServerEncryptionOptions serverEncryptionOptions)
         {
             super();
             this.hosts = hosts;
@@ -216,8 +235,12 @@ public class BulkLoader
             this.user = user;
             this.passwd = passwd;
             this.transportFactory = transportFactory;
+            this.storagePort = storagePort;
+            this.sslStoragePort = sslStoragePort;
+            this.serverEncOptions = serverEncryptionOptions;
         }
 
+        @Override
         public void init(String keyspace)
         {
             Iterator<InetAddress> hostiter = hosts.iterator();
@@ -234,7 +257,7 @@ public class BulkLoader
 
                     for (TokenRange tr : client.describe_ring(keyspace))
                     {
-                        Range<Token> range = new Range<>(tkFactory.fromString(tr.start_token), tkFactory.fromString(tr.end_token));
+                        Range<Token> range = new Range<>(tkFactory.fromString(tr.start_token), tkFactory.fromString(tr.end_token), getPartitioner());
                         for (String ep : tr.endpoints)
                         {
                             addRangeForEndpoint(range, InetAddress.getByName(ep));
@@ -261,6 +284,13 @@ public class BulkLoader
             }
         }
 
+        @Override
+        public StreamConnectionFactory getConnectionFactory()
+        {
+            return new BulkLoadConnectionFactory(storagePort, sslStoragePort, serverEncOptions, false);
+        }
+
+        @Override
         public CFMetaData getCFMetaData(String keyspace, String cfName)
         {
             return knownCfs.get(cfName);
@@ -273,7 +303,7 @@ public class BulkLoader
             Cassandra.Client client = new Cassandra.Client(protocol);
             if (user != null && passwd != null)
             {
-                Map<String, String> credentials = new HashMap<String, String>();
+                Map<String, String> credentials = new HashMap<>();
                 credentials.put(IAuthenticator.USERNAME_KEY, user);
                 credentials.put(IAuthenticator.PASSWORD_KEY, passwd);
                 AuthenticationRequest authenticationRequest = new AuthenticationRequest(credentials);
@@ -294,11 +324,14 @@ public class BulkLoader
         public String user;
         public String passwd;
         public int throttle = 0;
+        public int storagePort;
+        public int sslStoragePort;
         public ITransportFactory transportFactory = new TFramedTransportFactory();
         public EncryptionOptions encOptions = new EncryptionOptions.ClientEncryptionOptions();
+        public EncryptionOptions.ServerEncryptionOptions serverEncOptions = new EncryptionOptions.ServerEncryptionOptions();
 
-        public final Set<InetAddress> hosts = new HashSet<InetAddress>();
-        public final Set<InetAddress> ignores = new HashSet<InetAddress>();
+        public final Set<InetAddress> hosts = new HashSet<>();
+        public final Set<InetAddress> ignores = new HashSet<>();
 
         LoaderOptions(File directory)
         {
@@ -349,9 +382,6 @@ public class BulkLoader
                 opts.verbose = cmd.hasOption(VERBOSE_OPTION);
                 opts.noProgress = cmd.hasOption(NOPROGRESS_OPTION);
 
-                if (cmd.hasOption(THROTTLE_MBITS))
-                    opts.throttle = Integer.parseInt(cmd.getOptionValue(THROTTLE_MBITS));
-
                 if (cmd.hasOption(RPC_PORT_OPTION))
                     opts.rpcPort = Integer.parseInt(cmd.getOptionValue(RPC_PORT_OPTION));
 
@@ -400,44 +430,71 @@ public class BulkLoader
                     }
                 }
 
-                if(cmd.hasOption(SSL_TRUSTSTORE))
+                // try to load config file first, so that values can be rewritten with other option values.
+                // otherwise use default config.
+                Config config;
+                if (cmd.hasOption(CONFIG_PATH))
+                {
+                    File configFile = new File(cmd.getOptionValue(CONFIG_PATH));
+                    if (!configFile.exists())
+                    {
+                        errorMsg("Config file not found", options);
+                    }
+                    config = new YamlConfigurationLoader().loadConfig(configFile.toURI().toURL());
+                }
+                else
+                {
+                    config = new Config();
+                }
+                opts.storagePort = config.storage_port;
+                opts.sslStoragePort = config.ssl_storage_port;
+                opts.throttle = config.stream_throughput_outbound_megabits_per_sec;
+                opts.encOptions = config.client_encryption_options;
+                opts.serverEncOptions = config.server_encryption_options;
+
+                if (cmd.hasOption(THROTTLE_MBITS))
+                {
+                    opts.throttle = Integer.parseInt(cmd.getOptionValue(THROTTLE_MBITS));
+                }
+
+                if (cmd.hasOption(SSL_TRUSTSTORE))
                 {
                     opts.encOptions.truststore = cmd.getOptionValue(SSL_TRUSTSTORE);
                 }
 
-                if(cmd.hasOption(SSL_TRUSTSTORE_PW))
+                if (cmd.hasOption(SSL_TRUSTSTORE_PW))
                 {
                     opts.encOptions.truststore_password = cmd.getOptionValue(SSL_TRUSTSTORE_PW);
                 }
 
-                if(cmd.hasOption(SSL_KEYSTORE))
+                if (cmd.hasOption(SSL_KEYSTORE))
                 {
                     opts.encOptions.keystore = cmd.getOptionValue(SSL_KEYSTORE);
                     // if a keystore was provided, lets assume we'll need to use it
                     opts.encOptions.require_client_auth = true;
                 }
 
-                if(cmd.hasOption(SSL_KEYSTORE_PW))
+                if (cmd.hasOption(SSL_KEYSTORE_PW))
                 {
                     opts.encOptions.keystore_password = cmd.getOptionValue(SSL_KEYSTORE_PW);
                 }
 
-                if(cmd.hasOption(SSL_PROTOCOL))
+                if (cmd.hasOption(SSL_PROTOCOL))
                 {
                     opts.encOptions.protocol = cmd.getOptionValue(SSL_PROTOCOL);
                 }
 
-                if(cmd.hasOption(SSL_ALGORITHM))
+                if (cmd.hasOption(SSL_ALGORITHM))
                 {
                     opts.encOptions.algorithm = cmd.getOptionValue(SSL_ALGORITHM);
                 }
 
-                if(cmd.hasOption(SSL_STORE_TYPE))
+                if (cmd.hasOption(SSL_STORE_TYPE))
                 {
                     opts.encOptions.store_type = cmd.getOptionValue(SSL_STORE_TYPE);
                 }
 
-                if(cmd.hasOption(SSL_CIPHER_SUITES))
+                if (cmd.hasOption(SSL_CIPHER_SUITES))
                 {
                     opts.encOptions.cipher_suites = cmd.getOptionValue(SSL_CIPHER_SUITES).split(",");
                 }
@@ -451,7 +508,7 @@ public class BulkLoader
 
                 return opts;
             }
-            catch (ParseException e)
+            catch (ParseException | ConfigurationException | MalformedURLException e)
             {
                 errorMsg(e.getMessage(), options);
                 return null;
@@ -508,6 +565,7 @@ public class BulkLoader
             printUsage(options);
             System.exit(1);
         }
+
         private static CmdLineOptions getCmdLineOptions()
         {
             CmdLineOptions options = new CmdLineOptions();
@@ -516,37 +574,38 @@ public class BulkLoader
             options.addOption("h",  HELP_OPTION,         "display this help message");
             options.addOption(null, NOPROGRESS_OPTION,   "don't display progress");
             options.addOption("i",  IGNORE_NODES_OPTION, "NODES", "don't stream to this (comma separated) list of nodes");
-            options.addOption("d",  INITIAL_HOST_ADDRESS_OPTION, "initial hosts", "try to connect to these hosts (comma separated) initially for ring information");
+            options.addOption("d",  INITIAL_HOST_ADDRESS_OPTION, "initial hosts", "Required. try to connect to these hosts (comma separated) initially for ring information");
             options.addOption("p",  RPC_PORT_OPTION, "rpc port", "port used for rpc (default 9160)");
             options.addOption("t",  THROTTLE_MBITS, "throttle", "throttle speed in Mbits (default unlimited)");
             options.addOption("u",  USER_OPTION, "username", "username for cassandra authentication");
             options.addOption("pw", PASSWD_OPTION, "password", "password for cassandra authentication");
             options.addOption("tf", TRANSPORT_FACTORY, "transport factory", "Fully-qualified ITransportFactory class name for creating a connection to cassandra");
             // ssl connection-related options
-            options.addOption("ts", SSL_TRUSTSTORE, "TRUSTSTORE", "SSL: full path to truststore");
-            options.addOption("tspw", SSL_TRUSTSTORE_PW, "TRUSTSTORE-PASSWORD", "SSL: password of the truststore");
-            options.addOption("ks", SSL_KEYSTORE, "KEYSTORE", "SSL: full path to keystore");
-            options.addOption("kspw", SSL_KEYSTORE_PW, "KEYSTORE-PASSWORD", "SSL: password of the keystore");
-            options.addOption("prtcl", SSL_PROTOCOL, "PROTOCOL", "SSL: connections protocol to use (default: TLS)");
-            options.addOption("alg", SSL_ALGORITHM, "ALGORITHM", "SSL: algorithm (default: SunX509)");
-            options.addOption("st", SSL_STORE_TYPE, "STORE-TYPE", "SSL: type of store");
-            options.addOption("ciphers", SSL_CIPHER_SUITES, "CIPHER-SUITES", "SSL: comma-separated list of encryption suites to use");
+            options.addOption("ts", SSL_TRUSTSTORE, "TRUSTSTORE", "Client SSL: full path to truststore");
+            options.addOption("tspw", SSL_TRUSTSTORE_PW, "TRUSTSTORE-PASSWORD", "Client SSL: password of the truststore");
+            options.addOption("ks", SSL_KEYSTORE, "KEYSTORE", "Client SSL: full path to keystore");
+            options.addOption("kspw", SSL_KEYSTORE_PW, "KEYSTORE-PASSWORD", "Client SSL: password of the keystore");
+            options.addOption("prtcl", SSL_PROTOCOL, "PROTOCOL", "Client SSL: connections protocol to use (default: TLS)");
+            options.addOption("alg", SSL_ALGORITHM, "ALGORITHM", "Client SSL: algorithm (default: SunX509)");
+            options.addOption("st", SSL_STORE_TYPE, "STORE-TYPE", "Client SSL: type of store");
+            options.addOption("ciphers", SSL_CIPHER_SUITES, "CIPHER-SUITES", "Client SSL: comma-separated list of encryption suites to use");
+            options.addOption("f", CONFIG_PATH, "path to config file", "cassandra.yaml file path for streaming throughput and client/server SSL.");
             return options;
         }
 
         public static void printUsage(Options options)
         {
             String usage = String.format("%s [options] <dir_path>", TOOL_NAME);
-            StringBuilder header = new StringBuilder();
-            header.append("--\n");
-            header.append("Bulk load the sstables found in the directory <dir_path> to the configured cluster." );
-            header.append("The parent directory of <dir_path> is used as the keyspace name. ");
-            header.append("So for instance, to load an sstable named Standard1-g-1-Data.db into keyspace Keyspace1, ");
-            header.append("you will need to have the files Standard1-g-1-Data.db and Standard1-g-1-Index.db in a ");
-            header.append("directory Keyspace1/Standard1/ in the directory and call: sstableloader Keyspace1/Standard1");
-            header.append("\n--\n");
-            header.append("Options are:");
-            new HelpFormatter().printHelp(usage, header.toString(), options, "");
+            String header = System.lineSeparator() +
+                            "Bulk load the sstables found in the directory <dir_path> to the configured cluster." +
+                            "The parent directories of <dir_path> are used as the target keyspace/table name. " +
+                            "So for instance, to load an sstable named Standard1-g-1-Data.db into Keyspace1/Standard1, " +
+                            "you will need to have the files Standard1-g-1-Data.db and Standard1-g-1-Index.db into a directory /path/to/Keyspace1/Standard1/.";
+            String footer = System.lineSeparator() +
+                            "You can provide cassandra.yaml file with -f command line option to set up streaming throughput, client and server encryption options. " +
+                            "Only stream_throughput_outbound_megabits_per_sec, server_encryption_options and client_encryption_options are read from yaml. " +
+                            "You can override options read from cassandra.yaml with corresponding command line options.";
+            new HelpFormatter().printHelp(usage, header, options, footer);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/563cea14/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index 9b02817..ce0f9d0 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@ -43,7 +43,7 @@ public class StreamTransferTaskTest extends SchemaLoader
         String ks = "Keyspace1";
         String cf = "Standard1";
 
-        StreamSession session = new StreamSession(FBUtilities.getBroadcastAddress());
+        StreamSession session = new StreamSession(FBUtilities.getBroadcastAddress(), null);
         ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore(cf);
 
         // create two sstables