You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2014/08/15 20:27:00 UTC
[3/6] git commit: Fix sstableloader unable to connect encrypted node
Fix sstableloader unable to connect encrypted node
patch by yukim; reviewed by krummas for CASSANDRA-7585
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/563cea14
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/563cea14
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/563cea14
Branch: refs/heads/trunk
Commit: 563cea14b4bb87cd37ab10399904f08757c34d27
Parents: ad6ba3d
Author: Yuki Morishita <yu...@apache.org>
Authored: Fri Aug 15 12:31:59 2014 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Fri Aug 15 12:31:59 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../config/YamlConfigurationLoader.java | 6 +-
.../cassandra/io/sstable/SSTableLoader.java | 22 ++-
.../cassandra/streaming/ConnectionHandler.java | 48 +-----
.../streaming/DefaultConnectionFactory.java | 74 +++++++++
.../streaming/StreamConnectionFactory.java | 30 ++++
.../apache/cassandra/streaming/StreamPlan.java | 16 +-
.../cassandra/streaming/StreamResultFuture.java | 2 +-
.../cassandra/streaming/StreamSession.java | 13 +-
.../tools/BulkLoadConnectionFactory.java | 68 +++++++++
.../org/apache/cassandra/tools/BulkLoader.java | 149 +++++++++++++------
.../streaming/StreamTransferTaskTest.java | 2 +-
12 files changed, 330 insertions(+), 101 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/563cea14/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4306de5..e335484 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -47,6 +47,7 @@
* Backport CASSANDRA-6747 (CASSANDRA-7560)
* Track max/min timestamps for range tombstones (CASSANDRA-7647)
* Fix NPE when listing saved caches dir (CASSANDRA-7632)
+ * Fix sstableloader unable to connect encrypted node (CASSANDRA-7585)
Merged from 1.2:
* Remove duplicates from StorageService.getJoiningNodes (CASSANDRA-7478)
* Clone token map outside of hot gossip loops (CASSANDRA-7758)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/563cea14/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java b/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
index 6b5a152..b520d07 100644
--- a/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
+++ b/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
@@ -69,10 +69,14 @@ public class YamlConfigurationLoader implements ConfigurationLoader
public Config loadConfig() throws ConfigurationException
{
+ return loadConfig(getStorageConfigURL());
+ }
+
+ public Config loadConfig(URL url) throws ConfigurationException
+ {
InputStream input = null;
try
{
- URL url = getStorageConfigURL();
logger.info("Loading settings from {}", url);
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/563cea14/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 4a1604d..85dc0e4 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -50,7 +50,7 @@ public class SSTableLoader implements StreamEventHandler
private final OutputHandler outputHandler;
private final Set<InetAddress> failedHosts = new HashSet<>();
- private final List<SSTableReader> sstables = new ArrayList<SSTableReader>();
+ private final List<SSTableReader> sstables = new ArrayList<>();
private final Multimap<InetAddress, StreamSession.SSTableStreamingSections> streamingDetails = HashMultimap.create();
static
@@ -94,7 +94,7 @@ public class SSTableLoader implements StreamEventHandler
return false;
}
- Set<Component> components = new HashSet<Component>();
+ Set<Component> components = new HashSet<>();
components.add(Component.DATA);
components.add(Component.PRIMARY_INDEX);
if (new File(desc.filenameFor(Component.SUMMARY)).exists())
@@ -149,7 +149,7 @@ public class SSTableLoader implements StreamEventHandler
client.init(keyspace);
outputHandler.output("Established connection to initial hosts");
- StreamPlan plan = new StreamPlan("Bulk Load");
+ StreamPlan plan = new StreamPlan("Bulk Load").connectionFactory(client.getConnectionFactory());
Map<InetAddress, Collection<Range<Token>>> endpointToRanges = client.getEndpointToRangesMap();
openSSTables(endpointToRanges);
@@ -220,7 +220,7 @@ public class SSTableLoader implements StreamEventHandler
public static abstract class Client
{
- private final Map<InetAddress, Collection<Range<Token>>> endpointToRanges = new HashMap<InetAddress, Collection<Range<Token>>>();
+ private final Map<InetAddress, Collection<Range<Token>>> endpointToRanges = new HashMap<>();
private IPartitioner partitioner;
/**
@@ -240,6 +240,17 @@ public class SSTableLoader implements StreamEventHandler
public void stop() {}
/**
+ * Provides connection factory.
+ * By default, it uses DefaultConnectionFactory.
+ *
+ * @return StreamConnectionFactory to use
+ */
+ public StreamConnectionFactory getConnectionFactory()
+ {
+ return new DefaultConnectionFactory();
+ }
+
+ /**
* Validate that {@code keyspace} is an existing keyspace and {@code
* cfName} one of its existing column family.
*/
@@ -258,6 +269,7 @@ public class SSTableLoader implements StreamEventHandler
protected void setPartitioner(IPartitioner partitioner)
{
this.partitioner = partitioner;
+ // the following is still necessary since Range/Token reference partitioner through StorageService.getPartitioner
DatabaseDescriptor.setPartitioner(partitioner);
}
@@ -271,7 +283,7 @@ public class SSTableLoader implements StreamEventHandler
Collection<Range<Token>> ranges = endpointToRanges.get(endpoint);
if (ranges == null)
{
- ranges = new HashSet<Range<Token>>();
+ ranges = new HashSet<>();
endpointToRanges.put(endpoint, ranges);
}
ranges.add(range);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/563cea14/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index b06a818..8fba41b 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.streaming;
import java.io.IOException;
-import java.net.InetAddress;
import java.net.Socket;
import java.net.SocketException;
import java.nio.ByteBuffer;
@@ -37,8 +36,6 @@ import com.google.common.util.concurrent.SettableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.net.OutboundTcpConnectionPool;
import org.apache.cassandra.streaming.messages.StreamInitMessage;
import org.apache.cassandra.streaming.messages.StreamMessage;
import org.apache.cassandra.utils.FBUtilities;
@@ -55,8 +52,6 @@ public class ConnectionHandler
{
private static final Logger logger = LoggerFactory.getLogger(ConnectionHandler.class);
- private static final int MAX_CONNECT_ATTEMPTS = 3;
-
private final StreamSession session;
private IncomingMessageHandler incoming;
@@ -79,12 +74,12 @@ public class ConnectionHandler
public void initiate() throws IOException
{
logger.debug("[Stream #{}] Sending stream init for incoming stream", session.planId());
- Socket incomingSocket = connect(session.peer);
+ Socket incomingSocket = session.createConnection();
incoming.start(incomingSocket, StreamMessage.CURRENT_VERSION);
incoming.sendInitMessage(incomingSocket, true);
logger.debug("[Stream #{}] Sending stream init for outgoing stream", session.planId());
- Socket outgoingSocket = connect(session.peer);
+ Socket outgoingSocket = session.createConnection();
outgoing.start(outgoingSocket, StreamMessage.CURRENT_VERSION);
outgoing.sendInitMessage(outgoingSocket, false);
}
@@ -104,45 +99,6 @@ public class ConnectionHandler
incoming.start(socket, version);
}
- /**
- * Connect to peer and start exchanging message.
- * When connect attempt fails, this retries for maximum of MAX_CONNECT_ATTEMPTS times.
- *
- * @param peer the peer to connect to.
- * @return the created socket.
- *
- * @throws IOException when connection failed.
- */
- private static Socket connect(InetAddress peer) throws IOException
- {
- int attempts = 0;
- while (true)
- {
- try
- {
- Socket socket = OutboundTcpConnectionPool.newSocket(peer);
- socket.setSoTimeout(DatabaseDescriptor.getStreamingSocketTimeout());
- return socket;
- }
- catch (IOException e)
- {
- if (++attempts >= MAX_CONNECT_ATTEMPTS)
- throw e;
-
- long waitms = DatabaseDescriptor.getRpcTimeout() * (long)Math.pow(2, attempts);
- logger.warn("Failed attempt " + attempts + " to connect to " + peer + ". Retrying in " + waitms + " ms. (" + e + ")");
- try
- {
- Thread.sleep(waitms);
- }
- catch (InterruptedException wtf)
- {
- throw new IOException("interrupted", wtf);
- }
- }
- }
- }
-
public ListenableFuture<?> close()
{
logger.debug("[Stream #{}] Closing stream connection handler on {}", session.planId(), session.peer);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/563cea14/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java b/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java
new file mode 100644
index 0000000..53af4c8
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.OutboundTcpConnectionPool;
+
+public class DefaultConnectionFactory implements StreamConnectionFactory
+{
+ private static final Logger logger = LoggerFactory.getLogger(DefaultConnectionFactory.class);
+
+ private static final int MAX_CONNECT_ATTEMPTS = 3;
+
+ /**
+ * Connect to peer and start exchanging message.
+ * When connect attempt fails, this retries for maximum of MAX_CONNECT_ATTEMPTS times.
+ *
+ * @param peer the peer to connect to.
+ * @return the created socket.
+ *
+ * @throws IOException when connection failed.
+ */
+ public Socket createConnection(InetAddress peer) throws IOException
+ {
+ int attempts = 0;
+ while (true)
+ {
+ try
+ {
+ Socket socket = OutboundTcpConnectionPool.newSocket(peer);
+ socket.setSoTimeout(DatabaseDescriptor.getStreamingSocketTimeout());
+ return socket;
+ }
+ catch (IOException e)
+ {
+ if (++attempts >= MAX_CONNECT_ATTEMPTS)
+ throw e;
+
+ long waitms = DatabaseDescriptor.getRpcTimeout() * (long)Math.pow(2, attempts);
+ logger.warn("Failed attempt " + attempts + " to connect to " + peer + ". Retrying in " + waitms + " ms. (" + e + ")");
+ try
+ {
+ Thread.sleep(waitms);
+ }
+ catch (InterruptedException wtf)
+ {
+ throw new IOException("interrupted", wtf);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/563cea14/src/java/org/apache/cassandra/streaming/StreamConnectionFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamConnectionFactory.java b/src/java/org/apache/cassandra/streaming/StreamConnectionFactory.java
new file mode 100644
index 0000000..dd99611
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/StreamConnectionFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+
+/**
+ * Interface that creates connection used by streaming.
+ */
+public interface StreamConnectionFactory
+{
+ Socket createConnection(InetAddress peer) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/563cea14/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java
index b57e097..e582c79 100644
--- a/src/java/org/apache/cassandra/streaming/StreamPlan.java
+++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java
@@ -38,6 +38,8 @@ public class StreamPlan
// sessions per InetAddress of the other end.
private final Map<InetAddress, StreamSession> sessions = new HashMap<>();
+ private StreamConnectionFactory connectionFactory = new DefaultConnectionFactory();
+
private boolean flushBeforeTransfer = true;
/**
@@ -132,6 +134,18 @@ public class StreamPlan
}
/**
+ * Set custom StreamConnectionFactory to be used for establishing connection
+ *
+ * @param factory StreamConnectionFactory to use
+ * @return self
+ */
+ public StreamPlan connectionFactory(StreamConnectionFactory factory)
+ {
+ this.connectionFactory = factory;
+ return this;
+ }
+
+ /**
* @return true if this plan has no plan to execute
*/
public boolean isEmpty()
@@ -167,7 +181,7 @@ public class StreamPlan
StreamSession session = sessions.get(peer);
if (session == null)
{
- session = new StreamSession(peer);
+ session = new StreamSession(peer, connectionFactory);
sessions.put(peer, session);
}
return session;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/563cea14/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index dcffaff..add14f7 100644
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@ -106,7 +106,7 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
StreamResultFuture future = StreamManager.instance.getReceivingStream(planId);
if (future == null)
{
- final StreamSession session = new StreamSession(from);
+ final StreamSession session = new StreamSession(from, null);
// The main reason we create a StreamResultFuture on the receiving side is for JMX exposure.
future = new StreamResultFuture(planId, description, Collections.singleton(session));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/563cea14/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 55e30f0..4fcbe36 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.streaming;
import java.io.IOException;
import java.net.InetAddress;
+import java.net.Socket;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -128,6 +129,8 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
// data receivers, filled after receiving prepare message
private final Map<UUID, StreamReceiveTask> receivers = new ConcurrentHashMap<>();
private final StreamingMetrics metrics;
+ /* can be null when session is created in remote */
+ private final StreamConnectionFactory factory;
public final ConnectionHandler handler;
@@ -152,10 +155,12 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
* Create new streaming session with the peer.
*
* @param peer Address of streaming peer
+ * @param factory is used for establishing connection
*/
- public StreamSession(InetAddress peer)
+ public StreamSession(InetAddress peer, StreamConnectionFactory factory)
{
this.peer = peer;
+ this.factory = factory;
this.handler = new ConnectionHandler(this);
this.metrics = StreamingMetrics.get(peer);
}
@@ -211,6 +216,12 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
});
}
+ public Socket createConnection() throws IOException
+ {
+ assert factory != null;
+ return factory.createConnection(peer);
+ }
+
/**
* Request data fetch task to this session.
*
http://git-wip-us.apache.org/repos/asf/cassandra/blob/563cea14/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java b/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java
new file mode 100644
index 0000000..399344e
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.tools;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.channels.SocketChannel;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.cassandra.config.EncryptionOptions;
+import org.apache.cassandra.security.SSLFactory;
+import org.apache.cassandra.streaming.StreamConnectionFactory;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class BulkLoadConnectionFactory implements StreamConnectionFactory
+{
+ private final boolean outboundBindAny;
+ private final int storagePort;
+ private final int secureStoragePort;
+ private final EncryptionOptions.ServerEncryptionOptions encryptionOptions;
+
+ public BulkLoadConnectionFactory(int storagePort, int secureStoragePort, EncryptionOptions.ServerEncryptionOptions encryptionOptions, boolean outboundBindAny)
+ {
+ this.storagePort = storagePort;
+ this.secureStoragePort = secureStoragePort;
+ this.encryptionOptions = encryptionOptions;
+ this.outboundBindAny = outboundBindAny;
+ }
+
+ public Socket createConnection(InetAddress peer) throws IOException
+ {
+ // Connect to secure port for all peers if ServerEncryptionOptions is configured other than 'none'
+ // When 'all', 'dc' and 'rack', server nodes always have SSL port open, and since thin client like sstableloader
+ // does not know which node is in which dc/rack, connecting to SSL port is always the option.
+ if (encryptionOptions != null && encryptionOptions.internode_encryption != EncryptionOptions.ServerEncryptionOptions.InternodeEncryption.none)
+ {
+ if (outboundBindAny)
+ return SSLFactory.getSocket(encryptionOptions, peer, secureStoragePort);
+ else
+ return SSLFactory.getSocket(encryptionOptions, peer, secureStoragePort, FBUtilities.getLocalAddress(), 0);
+ }
+ else
+ {
+ Socket socket = SocketChannel.open(new InetSocketAddress(peer, storagePort)).socket();
+ if (outboundBindAny && !socket.isBound())
+ socket.bind(new InetSocketAddress(FBUtilities.getLocalAddress(), 0));
+ return socket;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/563cea14/src/java/org/apache/cassandra/tools/BulkLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/BulkLoader.java b/src/java/org/apache/cassandra/tools/BulkLoader.java
index 37ec635..4077722 100644
--- a/src/java/org/apache/cassandra/tools/BulkLoader.java
+++ b/src/java/org/apache/cassandra/tools/BulkLoader.java
@@ -18,29 +18,27 @@
package org.apache.cassandra.tools;
import java.io.File;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
+import java.net.*;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Joiner;
import com.google.common.collect.Sets;
-import org.apache.cassandra.config.EncryptionOptions;
+
+import org.apache.cassandra.config.*;
+
import org.apache.commons.cli.*;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.cassandra.auth.IAuthenticator;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.sstable.SSTableLoader;
import org.apache.cassandra.streaming.*;
import org.apache.cassandra.thrift.*;
@@ -60,7 +58,10 @@ public class BulkLoader
private static final String USER_OPTION = "username";
private static final String PASSWD_OPTION = "password";
private static final String THROTTLE_MBITS = "throttle";
+
private static final String TRANSPORT_FACTORY = "transport-factory";
+
+ /* client encryption options */
private static final String SSL_TRUSTSTORE = "truststore";
private static final String SSL_TRUSTSTORE_PW = "truststore-password";
private static final String SSL_KEYSTORE = "keystore";
@@ -69,12 +70,20 @@ public class BulkLoader
private static final String SSL_ALGORITHM = "ssl-alg";
private static final String SSL_STORE_TYPE = "store-type";
private static final String SSL_CIPHER_SUITES = "ssl-ciphers";
+ private static final String CONFIG_PATH = "conf-path";
public static void main(String args[])
{
LoaderOptions options = LoaderOptions.parseArgs(args);
OutputHandler handler = new OutputHandler.SystemOutput(options.verbose, options.debug);
- SSTableLoader loader = new SSTableLoader(options.directory, new ExternalClient(options.hosts, options.rpcPort, options.user, options.passwd, options.transportFactory), handler);
+ SSTableLoader loader = new SSTableLoader(options.directory, new ExternalClient(options.hosts,
+ options.rpcPort,
+ options.user,
+ options.passwd,
+ options.transportFactory,
+ options.storagePort,
+ options.sslStoragePort,
+ options.serverEncOptions), handler);
DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(options.throttle);
StreamResultFuture future = null;
try
@@ -207,8 +216,18 @@ public class BulkLoader
private final String user;
private final String passwd;
private final ITransportFactory transportFactory;
-
- public ExternalClient(Set<InetAddress> hosts, int port, String user, String passwd, ITransportFactory transportFactory)
+ private final int storagePort;
+ private final int sslStoragePort;
+ private final EncryptionOptions.ServerEncryptionOptions serverEncOptions;
+
+ public ExternalClient(Set<InetAddress> hosts,
+ int port,
+ String user,
+ String passwd,
+ ITransportFactory transportFactory,
+ int storagePort,
+ int sslStoragePort,
+ EncryptionOptions.ServerEncryptionOptions serverEncryptionOptions)
{
super();
this.hosts = hosts;
@@ -216,8 +235,12 @@ public class BulkLoader
this.user = user;
this.passwd = passwd;
this.transportFactory = transportFactory;
+ this.storagePort = storagePort;
+ this.sslStoragePort = sslStoragePort;
+ this.serverEncOptions = serverEncryptionOptions;
}
+ @Override
public void init(String keyspace)
{
Iterator<InetAddress> hostiter = hosts.iterator();
@@ -234,7 +257,7 @@ public class BulkLoader
for (TokenRange tr : client.describe_ring(keyspace))
{
- Range<Token> range = new Range<>(tkFactory.fromString(tr.start_token), tkFactory.fromString(tr.end_token));
+ Range<Token> range = new Range<>(tkFactory.fromString(tr.start_token), tkFactory.fromString(tr.end_token), getPartitioner());
for (String ep : tr.endpoints)
{
addRangeForEndpoint(range, InetAddress.getByName(ep));
@@ -261,6 +284,13 @@ public class BulkLoader
}
}
+ @Override
+ public StreamConnectionFactory getConnectionFactory()
+ {
+ return new BulkLoadConnectionFactory(storagePort, sslStoragePort, serverEncOptions, false);
+ }
+
+ @Override
public CFMetaData getCFMetaData(String keyspace, String cfName)
{
return knownCfs.get(cfName);
@@ -273,7 +303,7 @@ public class BulkLoader
Cassandra.Client client = new Cassandra.Client(protocol);
if (user != null && passwd != null)
{
- Map<String, String> credentials = new HashMap<String, String>();
+ Map<String, String> credentials = new HashMap<>();
credentials.put(IAuthenticator.USERNAME_KEY, user);
credentials.put(IAuthenticator.PASSWORD_KEY, passwd);
AuthenticationRequest authenticationRequest = new AuthenticationRequest(credentials);
@@ -294,11 +324,14 @@ public class BulkLoader
public String user;
public String passwd;
public int throttle = 0;
+ public int storagePort;
+ public int sslStoragePort;
public ITransportFactory transportFactory = new TFramedTransportFactory();
public EncryptionOptions encOptions = new EncryptionOptions.ClientEncryptionOptions();
+ public EncryptionOptions.ServerEncryptionOptions serverEncOptions = new EncryptionOptions.ServerEncryptionOptions();
- public final Set<InetAddress> hosts = new HashSet<InetAddress>();
- public final Set<InetAddress> ignores = new HashSet<InetAddress>();
+ public final Set<InetAddress> hosts = new HashSet<>();
+ public final Set<InetAddress> ignores = new HashSet<>();
LoaderOptions(File directory)
{
@@ -349,9 +382,6 @@ public class BulkLoader
opts.verbose = cmd.hasOption(VERBOSE_OPTION);
opts.noProgress = cmd.hasOption(NOPROGRESS_OPTION);
- if (cmd.hasOption(THROTTLE_MBITS))
- opts.throttle = Integer.parseInt(cmd.getOptionValue(THROTTLE_MBITS));
-
if (cmd.hasOption(RPC_PORT_OPTION))
opts.rpcPort = Integer.parseInt(cmd.getOptionValue(RPC_PORT_OPTION));
@@ -400,44 +430,71 @@ public class BulkLoader
}
}
- if(cmd.hasOption(SSL_TRUSTSTORE))
+ // try to load config file first, so that values can be rewritten with other option values.
+ // otherwise use default config.
+ Config config;
+ if (cmd.hasOption(CONFIG_PATH))
+ {
+ File configFile = new File(cmd.getOptionValue(CONFIG_PATH));
+ if (!configFile.exists())
+ {
+ errorMsg("Config file not found", options);
+ }
+ config = new YamlConfigurationLoader().loadConfig(configFile.toURI().toURL());
+ }
+ else
+ {
+ config = new Config();
+ }
+ opts.storagePort = config.storage_port;
+ opts.sslStoragePort = config.ssl_storage_port;
+ opts.throttle = config.stream_throughput_outbound_megabits_per_sec;
+ opts.encOptions = config.client_encryption_options;
+ opts.serverEncOptions = config.server_encryption_options;
+
+ if (cmd.hasOption(THROTTLE_MBITS))
+ {
+ opts.throttle = Integer.parseInt(cmd.getOptionValue(THROTTLE_MBITS));
+ }
+
+ if (cmd.hasOption(SSL_TRUSTSTORE))
{
opts.encOptions.truststore = cmd.getOptionValue(SSL_TRUSTSTORE);
}
- if(cmd.hasOption(SSL_TRUSTSTORE_PW))
+ if (cmd.hasOption(SSL_TRUSTSTORE_PW))
{
opts.encOptions.truststore_password = cmd.getOptionValue(SSL_TRUSTSTORE_PW);
}
- if(cmd.hasOption(SSL_KEYSTORE))
+ if (cmd.hasOption(SSL_KEYSTORE))
{
opts.encOptions.keystore = cmd.getOptionValue(SSL_KEYSTORE);
// if a keystore was provided, lets assume we'll need to use it
opts.encOptions.require_client_auth = true;
}
- if(cmd.hasOption(SSL_KEYSTORE_PW))
+ if (cmd.hasOption(SSL_KEYSTORE_PW))
{
opts.encOptions.keystore_password = cmd.getOptionValue(SSL_KEYSTORE_PW);
}
- if(cmd.hasOption(SSL_PROTOCOL))
+ if (cmd.hasOption(SSL_PROTOCOL))
{
opts.encOptions.protocol = cmd.getOptionValue(SSL_PROTOCOL);
}
- if(cmd.hasOption(SSL_ALGORITHM))
+ if (cmd.hasOption(SSL_ALGORITHM))
{
opts.encOptions.algorithm = cmd.getOptionValue(SSL_ALGORITHM);
}
- if(cmd.hasOption(SSL_STORE_TYPE))
+ if (cmd.hasOption(SSL_STORE_TYPE))
{
opts.encOptions.store_type = cmd.getOptionValue(SSL_STORE_TYPE);
}
- if(cmd.hasOption(SSL_CIPHER_SUITES))
+ if (cmd.hasOption(SSL_CIPHER_SUITES))
{
opts.encOptions.cipher_suites = cmd.getOptionValue(SSL_CIPHER_SUITES).split(",");
}
@@ -451,7 +508,7 @@ public class BulkLoader
return opts;
}
- catch (ParseException e)
+ catch (ParseException | ConfigurationException | MalformedURLException e)
{
errorMsg(e.getMessage(), options);
return null;
@@ -508,6 +565,7 @@ public class BulkLoader
printUsage(options);
System.exit(1);
}
+
private static CmdLineOptions getCmdLineOptions()
{
CmdLineOptions options = new CmdLineOptions();
@@ -516,37 +574,38 @@ public class BulkLoader
options.addOption("h", HELP_OPTION, "display this help message");
options.addOption(null, NOPROGRESS_OPTION, "don't display progress");
options.addOption("i", IGNORE_NODES_OPTION, "NODES", "don't stream to this (comma separated) list of nodes");
- options.addOption("d", INITIAL_HOST_ADDRESS_OPTION, "initial hosts", "try to connect to these hosts (comma separated) initially for ring information");
+ options.addOption("d", INITIAL_HOST_ADDRESS_OPTION, "initial hosts", "Required. try to connect to these hosts (comma separated) initially for ring information");
options.addOption("p", RPC_PORT_OPTION, "rpc port", "port used for rpc (default 9160)");
options.addOption("t", THROTTLE_MBITS, "throttle", "throttle speed in Mbits (default unlimited)");
options.addOption("u", USER_OPTION, "username", "username for cassandra authentication");
options.addOption("pw", PASSWD_OPTION, "password", "password for cassandra authentication");
options.addOption("tf", TRANSPORT_FACTORY, "transport factory", "Fully-qualified ITransportFactory class name for creating a connection to cassandra");
// ssl connection-related options
- options.addOption("ts", SSL_TRUSTSTORE, "TRUSTSTORE", "SSL: full path to truststore");
- options.addOption("tspw", SSL_TRUSTSTORE_PW, "TRUSTSTORE-PASSWORD", "SSL: password of the truststore");
- options.addOption("ks", SSL_KEYSTORE, "KEYSTORE", "SSL: full path to keystore");
- options.addOption("kspw", SSL_KEYSTORE_PW, "KEYSTORE-PASSWORD", "SSL: password of the keystore");
- options.addOption("prtcl", SSL_PROTOCOL, "PROTOCOL", "SSL: connections protocol to use (default: TLS)");
- options.addOption("alg", SSL_ALGORITHM, "ALGORITHM", "SSL: algorithm (default: SunX509)");
- options.addOption("st", SSL_STORE_TYPE, "STORE-TYPE", "SSL: type of store");
- options.addOption("ciphers", SSL_CIPHER_SUITES, "CIPHER-SUITES", "SSL: comma-separated list of encryption suites to use");
+ options.addOption("ts", SSL_TRUSTSTORE, "TRUSTSTORE", "Client SSL: full path to truststore");
+ options.addOption("tspw", SSL_TRUSTSTORE_PW, "TRUSTSTORE-PASSWORD", "Client SSL: password of the truststore");
+ options.addOption("ks", SSL_KEYSTORE, "KEYSTORE", "Client SSL: full path to keystore");
+ options.addOption("kspw", SSL_KEYSTORE_PW, "KEYSTORE-PASSWORD", "Client SSL: password of the keystore");
+ options.addOption("prtcl", SSL_PROTOCOL, "PROTOCOL", "Client SSL: connections protocol to use (default: TLS)");
+ options.addOption("alg", SSL_ALGORITHM, "ALGORITHM", "Client SSL: algorithm (default: SunX509)");
+ options.addOption("st", SSL_STORE_TYPE, "STORE-TYPE", "Client SSL: type of store");
+ options.addOption("ciphers", SSL_CIPHER_SUITES, "CIPHER-SUITES", "Client SSL: comma-separated list of encryption suites to use");
+ options.addOption("f", CONFIG_PATH, "path to config file", "cassandra.yaml file path for streaming throughput and client/server SSL.");
return options;
}
public static void printUsage(Options options)
{
String usage = String.format("%s [options] <dir_path>", TOOL_NAME);
- StringBuilder header = new StringBuilder();
- header.append("--\n");
- header.append("Bulk load the sstables found in the directory <dir_path> to the configured cluster." );
- header.append("The parent directory of <dir_path> is used as the keyspace name. ");
- header.append("So for instance, to load an sstable named Standard1-g-1-Data.db into keyspace Keyspace1, ");
- header.append("you will need to have the files Standard1-g-1-Data.db and Standard1-g-1-Index.db in a ");
- header.append("directory Keyspace1/Standard1/ in the directory and call: sstableloader Keyspace1/Standard1");
- header.append("\n--\n");
- header.append("Options are:");
- new HelpFormatter().printHelp(usage, header.toString(), options, "");
+ String header = System.lineSeparator() +
+ "Bulk load the sstables found in the directory <dir_path> to the configured cluster." +
+ "The parent directories of <dir_path> are used as the target keyspace/table name. " +
+ "So for instance, to load an sstable named Standard1-g-1-Data.db into Keyspace1/Standard1, " +
+ "you will need to have the files Standard1-g-1-Data.db and Standard1-g-1-Index.db into a directory /path/to/Keyspace1/Standard1/.";
+ String footer = System.lineSeparator() +
+ "You can provide cassandra.yaml file with -f command line option to set up streaming throughput, client and server encryption options. " +
+ "Only stream_throughput_outbound_megabits_per_sec, server_encryption_options and client_encryption_options are read from yaml. " +
+ "You can override options read from cassandra.yaml with corresponding command line options.";
+ new HelpFormatter().printHelp(usage, header, options, footer);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/563cea14/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index 9b02817..ce0f9d0 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@ -43,7 +43,7 @@ public class StreamTransferTaskTest extends SchemaLoader
String ks = "Keyspace1";
String cf = "Standard1";
- StreamSession session = new StreamSession(FBUtilities.getBroadcastAddress());
+ StreamSession session = new StreamSession(FBUtilities.getBroadcastAddress(), null);
ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore(cf);
// create two sstables