You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sn...@apache.org on 2015/09/05 18:25:49 UTC
cassandra git commit: Support for both encrypted and unencrypted
native transport connections
Repository: cassandra
Updated Branches:
refs/heads/cassandra-3.0 17528910b -> a7895cfb2
Support for both encrypted and unencrypted native transport connections
patch by Stefan Podkowinski; reviewed by Robert Stupp for CASSANDRA-9590
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a7895cfb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a7895cfb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a7895cfb
Branch: refs/heads/cassandra-3.0
Commit: a7895cfb2cdc667041402648bf922265cb0d34c3
Parents: 1752891
Author: Stefan Podkowinski <ji...@midnightdrift.com>
Authored: Sat Sep 5 18:23:10 2015 +0200
Committer: Robert Stupp <sn...@snazy.de>
Committed: Sat Sep 5 18:23:10 2015 +0200
----------------------------------------------------------------------
CHANGES.txt | 4 +
conf/cassandra.yaml | 8 +
.../org/apache/cassandra/config/Config.java | 1 +
.../cassandra/config/DatabaseDescriptor.java | 25 +++
.../cassandra/service/CassandraDaemon.java | 38 +++-
.../service/NativeTransportService.java | 199 +++++++++++++++++++
.../cassandra/service/StorageService.java | 11 +-
.../org/apache/cassandra/transport/Server.java | 170 +++++++++-------
.../org/apache/cassandra/cql3/CQLTester.java | 2 +-
.../service/NativeTransportServiceTest.java | 193 ++++++++++++++++++
10 files changed, 565 insertions(+), 86 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7895cfb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d4e6771..afd45e5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,3 +1,7 @@
+3.0.0-rc1
+ * Support for both encrypted and unencrypted native transport connections (CASSANDRA-9590)
+
+
3.0.0-beta2
* Fix columns returned by AbstractBtreePartitions (CASSANDRA-10220)
* Fix backward compatibility issue due to AbstractBounds serialization bug (CASSANDRA-9857)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7895cfb/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 0f8b829..28caa1e 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -489,6 +489,14 @@ start_native_transport: true
# port for the CQL native transport to listen for clients on
# For security reasons, you should not expose this port to the internet. Firewall it if needed.
native_transport_port: 9042
+# Enabling native transport encryption in client_encryption_options allows you to either use
+# encryption for the standard port or to use a dedicated, additional port along with the unencrypted
+# standard native_transport_port.
+# Enabling client encryption and keeping native_transport_port_ssl disabled will use encryption
+# for native_transport_port. Setting native_transport_port_ssl to a different value
+# from native_transport_port will use encryption for native_transport_port_ssl while
+# keeping native_transport_port unencrypted.
+# native_transport_port_ssl: 9142
# The maximum threads for handling requests when the native transport is used.
# This is similar to rpc_max_threads though the default differs slightly (and
# there is no native_transport_min_threads, idle threads will always be stopped
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7895cfb/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 22b09d3..164dab2 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -131,6 +131,7 @@ public class Config
public Boolean start_native_transport = false;
public Integer native_transport_port = 9042;
+ public Integer native_transport_port_ssl = null;
public Integer native_transport_max_threads = 128;
public Integer native_transport_max_frame_size_in_mb = 256;
public volatile Long native_transport_max_concurrent_connections = -1L;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7895cfb/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 2e68418..99cd563 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -680,6 +680,14 @@ public class DatabaseDescriptor
conf.max_mutation_size_in_kb = conf.commitlog_segment_size_in_mb * 1024 / 2;
else if (conf.commitlog_segment_size_in_mb * 1024 < 2 * conf.max_mutation_size_in_kb)
throw new ConfigurationException("commitlog_segment_size_in_mb must be at least twice the size of max_mutation_size_in_kb / 1024", false);
+
+ // native transport encryption options
+ if (conf.native_transport_port_ssl != null
+ && conf.native_transport_port_ssl.intValue() != conf.native_transport_port.intValue()
+ && !conf.client_encryption_options.enabled)
+ {
+ throw new ConfigurationException("Encryption must be enabled in client_encryption_options for native_transport_port_ssl", false);
+ }
}
private static FileStore guessFileStore(String dir) throws IOException
@@ -1341,6 +1349,23 @@ public class DatabaseDescriptor
return Integer.parseInt(System.getProperty("cassandra.native_transport_port", conf.native_transport_port.toString()));
}
+ @VisibleForTesting
+ public static void setNativeTransportPort(int port)
+ {
+ conf.native_transport_port = port;
+ }
+
+ public static int getNativeTransportPortSSL()
+ {
+ return conf.native_transport_port_ssl == null ? getNativeTransportPort() : conf.native_transport_port_ssl;
+ }
+
+ @VisibleForTesting
+ public static void setNativeTransportPortSSL(Integer port)
+ {
+ conf.native_transport_port_ssl = port;
+ }
+
public static Integer getNativeTransportMaxThreads()
{
return conf.native_transport_max_threads;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7895cfb/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index c8b9677..230b46a 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -127,7 +127,7 @@ public class CassandraDaemon
private static final CassandraDaemon instance = new CassandraDaemon();
public Server thriftServer;
- public Server nativeServer;
+ private NativeTransportService nativeTransportService;
private final boolean runManaged;
protected final StartupChecks startupChecks;
@@ -365,9 +365,7 @@ public class CassandraDaemon
thriftServer = new ThriftServer(rpcAddr, rpcPort, listenBacklog);
// Native transport
- InetAddress nativeAddr = DatabaseDescriptor.getRpcAddress();
- int nativePort = DatabaseDescriptor.getNativeTransportPort();
- nativeServer = new org.apache.cassandra.transport.Server(nativeAddr, nativePort);
+ nativeTransportService = new NativeTransportService();
completeSetup();
}
@@ -431,7 +429,8 @@ public class CassandraDaemon
String nativeFlag = System.getProperty("cassandra.start_native_transport");
if ((nativeFlag != null && Boolean.parseBoolean(nativeFlag)) || (nativeFlag == null && DatabaseDescriptor.startNativeTransport()))
{
- nativeServer.start();
+ startNativeTransport();
+ StorageService.instance.setRpcReady(true);
}
else
logger.info("Not starting native transport as requested. Use JMX (StorageService->startNativeTransport()) or nodetool (enablebinary) to start it");
@@ -453,9 +452,12 @@ public class CassandraDaemon
// On linux, this doesn't entirely shut down Cassandra, just the RPC server.
// jsvc takes care of taking the rest down
logger.info("Cassandra shutting down...");
- thriftServer.stop();
- nativeServer.stop();
-
+ if (thriftServer != null)
+ thriftServer.stop();
+ if (nativeTransportService != null)
+ nativeTransportService.destroy();
+ StorageService.instance.setRpcReady(false);
+
// On windows, we need to stop the entire system as prunsrv doesn't have the jsvc hooks
// We rely on the shutdown hook to drain the node
if (FBUtilities.isWindows())
@@ -556,6 +558,26 @@ public class CassandraDaemon
}
}
+ public void startNativeTransport()
+ {
+ if (nativeTransportService == null)
+ throw new IllegalStateException("setup() must be called first for CassandraDaemon");
+ else
+ nativeTransportService.start();
+ }
+
+ public void stopNativeTransport()
+ {
+ if (nativeTransportService != null)
+ nativeTransportService.stop();
+ }
+
+ public boolean isNativeTransportRunning()
+ {
+ return nativeTransportService != null ? nativeTransportService.isRunning() : false;
+ }
+
+
/**
* A convenience method to stop and destroy the daemon in one shot.
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7895cfb/src/java/org/apache/cassandra/service/NativeTransportService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/NativeTransportService.java b/src/java/org/apache/cassandra/service/NativeTransportService.java
new file mode 100644
index 0000000..eff3a89
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/NativeTransportService.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+import java.net.InetAddress;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.Epoll;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.Future;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.metrics.ClientMetrics;
+import org.apache.cassandra.transport.RequestThreadPoolExecutor;
+import org.apache.cassandra.transport.Server;
+
+/**
+ * Handles native transport server lifecycle and associated resources. Lazily initialized.
+ */
+public class NativeTransportService
+{
+
+ private static final Logger logger = LoggerFactory.getLogger(NativeTransportService.class);
+
+ private Collection<Server> servers = Collections.emptyList();
+
+ private boolean initialized = false;
+ private EventLoopGroup workerGroup;
+ private EventExecutor eventExecutorGroup;
+
+ /**
+ * Creates netty thread pools and event loops.
+ */
+ @VisibleForTesting
+ synchronized void initialize()
+ {
+ if (initialized)
+ return;
+
+ // prepare netty resources
+ eventExecutorGroup = new RequestThreadPoolExecutor();
+
+ if (useEpoll())
+ {
+ workerGroup = new EpollEventLoopGroup();
+ logger.info("Netty using native Epoll event loop");
+ }
+ else
+ {
+ workerGroup = new NioEventLoopGroup();
+ logger.info("Netty using Java NIO event loop");
+ }
+
+ int nativePort = DatabaseDescriptor.getNativeTransportPort();
+ int nativePortSSL = DatabaseDescriptor.getNativeTransportPortSSL();
+ InetAddress nativeAddr = DatabaseDescriptor.getRpcAddress();
+
+ org.apache.cassandra.transport.Server.Builder builder = new org.apache.cassandra.transport.Server.Builder()
+ .withEventExecutor(eventExecutorGroup)
+ .withEventLoopGroup(workerGroup)
+ .withHost(nativeAddr);
+
+ if (!DatabaseDescriptor.getClientEncryptionOptions().enabled)
+ {
+ servers = Collections.singleton(builder.withSSL(false).withPort(nativePort).build());
+ }
+ else
+ {
+ if (nativePort != nativePortSSL)
+ {
+ // user asked for dedicated ssl port for supporting both non-ssl and ssl connections
+ servers = Collections.unmodifiableList(
+ Arrays.asList(
+ builder.withSSL(false).withPort(nativePort).build(),
+ builder.withSSL(true).withPort(nativePortSSL).build()
+ )
+ );
+ }
+ else
+ {
+ // ssl only mode using configured native port
+ servers = Collections.singleton(builder.withSSL(true).withPort(nativePort).build());
+ }
+ }
+
+ // register metrics
+ ClientMetrics.instance.addCounter("connectedNativeClients", () ->
+ {
+ int ret = 0;
+ for (Server server : servers)
+ ret += server.getConnectedClients();
+ return ret;
+ });
+
+ initialized = true;
+ }
+
+ /**
+ * Starts native transport servers.
+ */
+ public void start()
+ {
+ initialize();
+ servers.forEach(Server::start);
+ }
+
+ /**
+ * Stops currently running native transport servers.
+ */
+ public void stop()
+ {
+ servers.forEach(Server::stop);
+ }
+
+ /**
+ * Ultimately stops servers and closes all resources.
+ */
+ public void destroy()
+ {
+ stop();
+ servers = Collections.emptyList();
+
+ // shutdown executors used by netty for native transport server
+ Future<?> wgStop = workerGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS);
+
+ try
+ {
+ wgStop.await(5000);
+ }
+ catch (InterruptedException e1)
+ {
+ Thread.currentThread().interrupt();
+ }
+
+ // shutdownGracefully not implemented yet in RequestThreadPoolExecutor
+ eventExecutorGroup.shutdown();
+ }
+
+ /**
+ * @return intend to use epoll bassed event looping
+ */
+ public static boolean useEpoll()
+ {
+ final boolean enableEpoll = Boolean.valueOf(System.getProperty("cassandra.native.epoll.enabled", "true"));
+ return enableEpoll && Epoll.isAvailable();
+ }
+
+ /**
+ * @return true in case native transport server is running
+ */
+ public boolean isRunning()
+ {
+ for (Server server : servers)
+ if (server.isRunning()) return true;
+ return false;
+ }
+
+ @VisibleForTesting
+ EventLoopGroup getWorkerGroup()
+ {
+ return workerGroup;
+ }
+
+ @VisibleForTesting
+ EventExecutor getEventExecutor()
+ {
+ return eventExecutorGroup;
+ }
+
+ @VisibleForTesting
+ Collection<Server> getServers()
+ {
+ return servers;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7895cfb/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index a7ffc04..2d9bbec 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -405,10 +405,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
{
throw new IllegalStateException("No configured daemon");
}
-
+
try
{
- daemon.nativeServer.start();
+ daemon.startNativeTransport();
}
catch (Exception e)
{
@@ -422,17 +422,16 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
{
throw new IllegalStateException("No configured daemon");
}
- if (daemon.nativeServer != null)
- daemon.nativeServer.stop();
+ daemon.stopNativeTransport();
}
public boolean isNativeTransportRunning()
{
- if ((daemon == null) || (daemon.nativeServer == null))
+ if (daemon == null)
{
return false;
}
- return daemon.nativeServer.isRunning();
+ return daemon.isNativeTransportRunning();
}
public void stopTransports()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7895cfb/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index 72a1b60..cafc0ce 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -22,9 +22,8 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.EnumMap;
-import java.util.Map;
import java.util.List;
-import java.util.concurrent.Callable;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.ssl.SSLContext;
@@ -35,7 +34,6 @@ import org.slf4j.LoggerFactory;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
-import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.group.ChannelGroup;
@@ -51,7 +49,6 @@ import io.netty.util.internal.logging.Slf4JLoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.EncryptionOptions;
import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.metrics.ClientMetrics;
import org.apache.cassandra.security.SSLFactory;
import org.apache.cassandra.service.*;
import org.apache.cassandra.transport.messages.EventMessage;
@@ -64,7 +61,7 @@ public class Server implements CassandraDaemon.Server
}
private static final Logger logger = LoggerFactory.getLogger(Server.class);
- private static final boolean enableEpoll = Boolean.valueOf(System.getProperty("cassandra.native.epoll.enabled", "true"));
+ private static final boolean useEpoll = NativeTransportService.useEpoll();
public static final int VERSION_1 = 1;
public static final int VERSION_2 = 2;
@@ -83,41 +80,32 @@ public class Server implements CassandraDaemon.Server
};
public final InetSocketAddress socket;
+ public boolean useSSL = false;
private final AtomicBoolean isRunning = new AtomicBoolean(false);
private EventLoopGroup workerGroup;
private EventExecutor eventExecutorGroup;
- public Server(InetSocketAddress socket)
+ private Server (Builder builder)
{
- this.socket = socket;
+ this.socket = builder.getSocket();
+ this.useSSL = builder.useSSL;
+ if (builder.workerGroup != null)
+ {
+ workerGroup = builder.workerGroup;
+ }
+ else
+ {
+ if (useEpoll)
+ workerGroup = new EpollEventLoopGroup();
+ else
+ workerGroup = new NioEventLoopGroup();
+ }
+ if (builder.eventExecutorGroup != null)
+ eventExecutorGroup = builder.eventExecutorGroup;
EventNotifier notifier = new EventNotifier(this);
StorageService.instance.register(notifier);
MigrationManager.instance.register(notifier);
- registerMetrics();
- }
-
- public Server(String hostname, int port)
- {
- this(new InetSocketAddress(hostname, port));
- }
-
- public Server(InetAddress host, int port)
- {
- this(new InetSocketAddress(host, port));
- }
-
- public Server(int port)
- {
- this(new InetSocketAddress(port));
- }
-
- public void start()
- {
- if(!isRunning())
- {
- run();
- }
}
public void stop()
@@ -131,35 +119,25 @@ public class Server implements CassandraDaemon.Server
return isRunning.get();
}
- private void run()
+ public synchronized void start()
{
- // Configure the server.
- eventExecutorGroup = new RequestThreadPoolExecutor();
-
- boolean hasEpoll = enableEpoll ? Epoll.isAvailable() : false;
- if (hasEpoll)
- {
- workerGroup = new EpollEventLoopGroup();
- logger.info("Netty using native Epoll event loop");
- }
- else
- {
- workerGroup = new NioEventLoopGroup();
- logger.info("Netty using Java NIO event loop");
- }
+ if(isRunning())
+ return;
+ // Configure the server.
ServerBootstrap bootstrap = new ServerBootstrap()
- .group(workerGroup)
- .channel(hasEpoll ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
+ .channel(useEpoll ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_LINGER, 0)
.childOption(ChannelOption.SO_KEEPALIVE, DatabaseDescriptor.getRpcKeepAlive())
.childOption(ChannelOption.ALLOCATOR, CBUtil.allocator)
.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024)
.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
+ if (workerGroup != null)
+ bootstrap = bootstrap.group(workerGroup);
final EncryptionOptions.ClientEncryptionOptions clientEnc = DatabaseDescriptor.getClientEncryptionOptions();
- if (clientEnc.enabled)
+ if (this.useSSL)
{
logger.info("Enabling encrypted CQL connections between client and server");
bootstrap.childHandler(new SecureInitializer(this, clientEnc));
@@ -171,7 +149,7 @@ public class Server implements CassandraDaemon.Server
// Bind and start to accept incoming connections.
logger.info("Using Netty Version: {}", Version.identify().entrySet());
- logger.info("Starting listening for CQL clients on {}...", socket);
+ logger.info("Starting listening for CQL clients on {} ({})...", socket, this.useSSL ? "encrypted" : "unencrypted");
ChannelFuture bindFuture = bootstrap.bind(socket);
if (!bindFuture.awaitUninterruptibly().isSuccess())
@@ -179,36 +157,83 @@ public class Server implements CassandraDaemon.Server
connectionTracker.allChannels.add(bindFuture.channel());
isRunning.set(true);
-
- StorageService.instance.setRpcReady(true);
}
- private void registerMetrics()
+ public int getConnectedClients()
{
- ClientMetrics.instance.addCounter("connectedNativeClients", new Callable<Integer>()
- {
- @Override
- public Integer call() throws Exception
- {
- return connectionTracker.getConnectedClients();
- }
- });
+ return connectionTracker.getConnectedClients();
}
-
+
private void close()
{
// Close opened connections
connectionTracker.closeAll();
- workerGroup.shutdownGracefully();
- workerGroup = null;
-
- eventExecutorGroup.shutdown();
- eventExecutorGroup = null;
+
logger.info("Stop listening for CQL clients");
-
- StorageService.instance.setRpcReady(false);
}
+ public static class Builder
+ {
+ private EventLoopGroup workerGroup;
+ private EventExecutor eventExecutorGroup;
+ private boolean useSSL = false;
+ private InetAddress hostAddr;
+ private int port = -1;
+ private InetSocketAddress socket;
+
+ public Builder withSSL(boolean useSSL)
+ {
+ this.useSSL = useSSL;
+ return this;
+ }
+
+ public Builder withEventLoopGroup(EventLoopGroup eventLoopGroup)
+ {
+ this.workerGroup = eventLoopGroup;
+ return this;
+ }
+
+ public Builder withEventExecutor(EventExecutor eventExecutor)
+ {
+ this.eventExecutorGroup = eventExecutor;
+ return this;
+ }
+
+ public Builder withHost(InetAddress host)
+ {
+ this.hostAddr = host;
+ this.socket = null;
+ return this;
+ }
+
+ public Builder withPort(int port)
+ {
+ this.port = port;
+ this.socket = null;
+ return this;
+ }
+
+ public Server build()
+ {
+ return new Server(this);
+ }
+
+ private InetSocketAddress getSocket()
+ {
+ if (this.socket != null)
+ return this.socket;
+ else
+ {
+ if (this.port == -1)
+ throw new IllegalStateException("Missing port number");
+ if (this.hostAddr != null)
+ this.socket = new InetSocketAddress(this.hostAddr, this.port);
+ else
+ throw new IllegalStateException("Missing host");
+ return this.socket;
+ }
+ }
+ }
public static class ConnectionTracker implements Connection.Tracker
{
@@ -253,7 +278,7 @@ public class Server implements CassandraDaemon.Server
}
}
- private static class Initializer extends ChannelInitializer
+ private static class Initializer extends ChannelInitializer<Channel>
{
// Stateless handlers
private static final Message.ProtocolDecoder messageDecoder = new Message.ProtocolDecoder();
@@ -294,7 +319,10 @@ public class Server implements CassandraDaemon.Server
pipeline.addLast("messageDecoder", messageDecoder);
pipeline.addLast("messageEncoder", messageEncoder);
- pipeline.addLast(server.eventExecutorGroup, "executor", dispatcher);
+ if (server.eventExecutorGroup != null)
+ pipeline.addLast(server.eventExecutorGroup, "executor", dispatcher);
+ else
+ pipeline.addLast("executor", dispatcher);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7895cfb/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index 349975d..3d3729a 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -316,7 +316,7 @@ public abstract class CQLTester
StorageService.instance.initServer();
SchemaLoader.startGossiper();
- server = new org.apache.cassandra.transport.Server(nativeAddr, nativePort);
+ server = new Server.Builder().withHost(nativeAddr).withPort(nativePort).build();
server.start();
for (int version = 1; version <= maxProtocolVersion; version++)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7895cfb/test/unit/org/apache/cassandra/service/NativeTransportServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/NativeTransportServiceTest.java b/test/unit/org/apache/cassandra/service/NativeTransportServiceTest.java
new file mode 100644
index 0000000..7eb664f
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/NativeTransportServiceTest.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+import java.util.Arrays;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import com.google.common.collect.Sets;
+import org.junit.After;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.transport.Server;
+import org.apache.cassandra.utils.Pair;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class NativeTransportServiceTest
+{
+
+ @After
+ public void resetConfig()
+ {
+ DatabaseDescriptor.getClientEncryptionOptions().enabled = false;
+ DatabaseDescriptor.setNativeTransportPortSSL(null);
+ }
+
+ @Test
+ public void testServiceCanBeStopped()
+ {
+ withService((NativeTransportService service) -> {
+ service.stop();
+ assertFalse(service.isRunning());
+ });
+ }
+
+ @Test
+ public void testIgnoresStartOnAlreadyStarted()
+ {
+ withService((NativeTransportService service) -> {
+ service.start();
+ service.start();
+ service.start();
+ });
+ }
+
+ @Test
+ public void testIgnoresStoppedOnAlreadyStopped()
+ {
+ withService((NativeTransportService service) -> {
+ service.stop();
+ service.stop();
+ service.stop();
+ });
+ }
+
+ @Test
+ public void testDestroy()
+ {
+ withService((NativeTransportService service) -> {
+ Supplier<Boolean> allTerminated = () ->
+ service.getWorkerGroup().isShutdown() && service.getWorkerGroup().isTerminated() &&
+ service.getEventExecutor().isShutdown() && service.getEventExecutor().isTerminated();
+ assertFalse(allTerminated.get());
+ service.destroy();
+ assertTrue(allTerminated.get());
+ });
+ }
+
+ @Test
+ public void testConcurrentStarts()
+ {
+ withService(NativeTransportService::start, false, 20);
+ }
+
+ @Test
+ public void testConcurrentStops()
+ {
+ withService(NativeTransportService::stop, true, 20);
+ }
+
+ @Test
+ public void testConcurrentDestroys()
+ {
+ withService(NativeTransportService::destroy, true, 20);
+ }
+
+ @Test
+ public void testPlainDefaultPort()
+ {
+ // default plain settings: client encryption disabled and default native transport port
+ withService((NativeTransportService service) ->
+ {
+ assertEquals(1, service.getServers().size());
+ Server server = service.getServers().iterator().next();
+ assertFalse(server.useSSL);
+ assertEquals(server.socket.getPort(), DatabaseDescriptor.getNativeTransportPort());
+ });
+ }
+
+ @Test
+ public void testSSLOnly()
+ {
+ // default ssl settings: client encryption enabled and default native transport port used for ssl only
+ DatabaseDescriptor.getClientEncryptionOptions().enabled = true;
+
+ withService((NativeTransportService service) ->
+ {
+ service.initialize();
+ assertEquals(1, service.getServers().size());
+ Server server = service.getServers().iterator().next();
+ assertTrue(server.useSSL);
+ assertEquals(server.socket.getPort(), DatabaseDescriptor.getNativeTransportPort());
+ }, false, 1);
+ }
+
+ @Test
+ public void testSSLWithNonSSL()
+ {
+ // ssl+non-ssl settings: client encryption enabled and additional ssl port specified
+ DatabaseDescriptor.getClientEncryptionOptions().enabled = true;
+ DatabaseDescriptor.setNativeTransportPortSSL(8432);
+
+ withService((NativeTransportService service) ->
+ {
+ service.initialize();
+ assertEquals(2, service.getServers().size());
+ assertEquals(
+ Sets.newHashSet(Arrays.asList(
+ Pair.create(true, DatabaseDescriptor.getNativeTransportPortSSL()),
+ Pair.create(false, DatabaseDescriptor.getNativeTransportPort())
+ )
+ ),
+ service.getServers().stream().map((Server s) ->
+ Pair.create(s.useSSL, s.socket.getPort())).collect(Collectors.toSet())
+ );
+ }, false, 1);
+ }
+
+ private static void withService(Consumer<NativeTransportService> f)
+ {
+ withService(f, true, 1);
+ }
+
+ private static void withService(Consumer<NativeTransportService> f, boolean start, int concurrently)
+ {
+ NativeTransportService service = new NativeTransportService();
+ assertFalse(service.isRunning());
+ if (start)
+ {
+ service.start();
+ assertTrue(service.isRunning());
+ }
+ try
+ {
+ if (concurrently == 1)
+ {
+ f.accept(service);
+ }
+ else
+ {
+ IntStream.range(0, concurrently).parallel().map((int i) -> {
+ f.accept(service);
+ return 1;
+ }).sum();
+ }
+ }
+ finally
+ {
+ service.stop();
+ }
+ }
+}