You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2017/01/27 15:53:00 UTC
incubator-streams git commit: STREAMS-483: streams-persist-cassandra
SSL
Repository: incubator-streams
Updated Branches:
refs/heads/master cc46ab69a -> 1797a7073
STREAMS-483: streams-persist-cassandra SSL
resolves #353
Squashed commit of the following:
commit 5ef324b94bccaf087ea5b30ba2080f051aac24dd
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Date: Fri Jan 27 09:51:43 2017 -0600
remove unnecessary guava methods
commit cc80a98947bbde19183924c3b98686cd291cdc1b
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Date: Wed Jan 25 18:12:42 2017 -0600
STREAMS-483: add support for SSL connections to streams-persist-cassandra
also ensures the ITs can run & pass repeatedly
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/1797a707
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/1797a707
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/1797a707
Branch: refs/heads/master
Commit: 1797a707320f2f5ed9640e9191950435625094c4
Parents: cc46ab6
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Fri Jan 27 09:52:52 2017 -0600
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Fri Jan 27 09:52:52 2017 -0600
----------------------------------------------------------------------
.../streams-persist-cassandra/pom.xml | 12 ++
.../streams/cassandra/CassandraClient.java | 160 +++++++++++++++++++
.../cassandra/CassandraPersistReader.java | 62 ++-----
.../cassandra/CassandraPersistWriter.java | 63 ++++----
.../cassandra/CassandraConfiguration.json | 28 +++-
.../cassandra/test/CassandraPersistIT.java | 2 +-
.../src/test/resources/CassandraPersistIT.conf | 2 +-
7 files changed, 244 insertions(+), 85 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1797a707/streams-contrib/streams-persist-cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/pom.xml b/streams-contrib/streams-persist-cassandra/pom.xml
index f29b9e5..75785dd 100644
--- a/streams-contrib/streams-persist-cassandra/pom.xml
+++ b/streams-contrib/streams-persist-cassandra/pom.xml
@@ -30,6 +30,7 @@
<description>Cassandra Module</description>
<properties>
+ <cassandra.version>3.9</cassandra.version>
<cassandra-driver.version>3.1.2</cassandra-driver.version>
</properties>
@@ -55,6 +56,17 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.cassandra</groupId>
+ <artifactId>cassandra-all</artifactId>
+ <version>${cassandra.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.datastax.cassandra</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>${cassandra-driver.version}</version>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1797a707/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraClient.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraClient.java b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraClient.java
new file mode 100644
index 0000000..bbb6b51
--- /dev/null
+++ b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraClient.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.JdkSSLOptions;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.SSLOptions;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SocketOptions;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.security.KeyStore;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Objects;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManagerFactory;
+
+import static com.datastax.driver.core.SocketOptions.DEFAULT_CONNECT_TIMEOUT_MILLIS;
+import static com.datastax.driver.core.SocketOptions.DEFAULT_READ_TIMEOUT_MILLIS;
+
+public class CassandraClient {
+
+ private static final Logger LOGGER = LoggerFactory
+ .getLogger(CassandraClient.class);
+
+ private Cluster cluster;
+ private Session session;
+
+ public CassandraConfiguration config;
+
+ public CassandraClient(CassandraConfiguration config) throws Exception {
+ this.config = config;
+ org.apache.cassandra.config.Config.setClientMode(true);
+ }
+
+ public void start() throws Exception {
+
+ Objects.nonNull(config);
+
+ LOGGER.info("CassandraClient.start {}", config);
+
+ Cluster.Builder builder = Cluster.builder()
+ .withPort(config.getPort().intValue())
+ .withoutJMXReporting()
+ .withoutMetrics()
+ .withSocketOptions(
+ new SocketOptions()
+ .setConnectTimeoutMillis(DEFAULT_CONNECT_TIMEOUT_MILLIS*10)
+ .setReadTimeoutMillis(DEFAULT_READ_TIMEOUT_MILLIS*10)
+ );
+
+ if( config.getSsl() != null && config.getSsl().getEnabled() == true) {
+
+ Ssl ssl = config.getSsl();
+
+ KeyStore ks = KeyStore.getInstance("JKS");
+
+ InputStream trustStore = new FileInputStream(ssl.getTrustStore());
+ ks.load(trustStore, ssl.getTrustStorePassword().toCharArray());
+ InputStream keyStore = new FileInputStream(ssl.getKeyStore());
+ ks.load(keyStore, ssl.getKeyStorePassword().toCharArray());
+
+ TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+ tmf.init(ks);
+
+ KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+ kmf.init(ks, ssl.getKeyStorePassword().toCharArray());
+
+ SSLContext sslContext = SSLContext.getInstance("SSLv3");
+ sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
+
+ SSLOptions sslOptions = JdkSSLOptions.builder()
+ .withSSLContext(sslContext)
+ .build();
+
+ builder = builder.withSSL(sslOptions);
+ }
+
+ Collection<InetSocketAddress> addresses = new ArrayList<>();
+ for (String h : config.getHosts()) {
+ LOGGER.info("Adding Host: {}", h);
+ InetSocketAddress socketAddress = new InetSocketAddress(h, config.getPort().intValue());
+ addresses.add(socketAddress);
+ }
+ builder.addContactPointsWithPorts(addresses);
+
+ if( StringUtils.isNotBlank(config.getUser()) &&
+ StringUtils.isNotBlank(config.getPassword())) {
+ builder.withCredentials(config.getUser(), config.getPassword());
+ }
+ cluster = builder.build();
+
+ Objects.nonNull(cluster);
+
+ try {
+ Metadata metadata = cluster.getMetadata();
+ LOGGER.info("Connected to cluster: {}\n",
+ metadata.getClusterName());
+ for ( Host host : metadata.getAllHosts() ) {
+ LOGGER.info("Datacenter: {}; Host: {}; Rack: {}\n",
+ host.getDatacenter(), host.getAddress(), host.getRack());
+ }
+ } catch( Exception e ) {
+ LOGGER.error("Exception: {}", e);
+ throw e;
+ }
+
+ try {
+ session = cluster.connect();
+ } catch( Exception e ) {
+ LOGGER.error("Exception: {}", e);
+ throw e;
+ }
+
+ Objects.nonNull(session);
+
+ }
+
+ public void stop() throws Exception {
+ session.close();
+ cluster.close();
+ }
+
+ public CassandraConfiguration config() {
+ return config;
+ }
+
+ public Session client() {
+ return session;
+ }
+
+ public Cluster cluster() {
+ return cluster;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1797a707/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraPersistReader.java b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraPersistReader.java
index aaa40fe..251a02d 100644
--- a/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraPersistReader.java
+++ b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraPersistReader.java
@@ -76,12 +76,8 @@ public class CassandraPersistReader implements StreamsPersistReader {
private CompletableFuture<Boolean> readerTaskFuture = new CompletableFuture<>();
private CassandraConfiguration config;
+ private CassandraClient client;
- protected Cluster cluster;
- protected Session session;
-
- protected String keyspace;
- protected String table;
protected Iterator<Row> rowIterator;
protected final ReadWriteLock lock = new ReentrantReadWriteLock();
@@ -130,14 +126,20 @@ public class CassandraPersistReader implements StreamsPersistReader {
@Override
public void prepare(Object configurationObject) {
- connectToCassandra();
+ try {
+ connectToCassandra();
+ client.start();
+ } catch (Exception e) {
+ LOGGER.error("Exception", e);
+ return;
+ }
String selectStatement = getSelectStatement();
- ResultSet rs = session.execute(selectStatement);
+ ResultSet rs = client.client().execute(selectStatement);
rowIterator = rs.iterator();
if (!rowIterator.hasNext()) {
- throw new RuntimeException("Table" + table + "is empty!");
+ throw new RuntimeException("Table" + config.getTable() + "is empty!");
}
persistQueue = constructQueue();
@@ -164,50 +166,15 @@ public class CassandraPersistReader implements StreamsPersistReader {
return new StreamsDatum(objectNode);
}
- private synchronized void connectToCassandra() {
- Cluster.Builder clusterBuilder = Cluster.builder()
- .addContactPoints(config.getHost().toArray(new String[config.getHost().size()]))
- .withPort(config.getPort().intValue());
-
- keyspace = config.getKeyspace();
- table = config.getTable();
-
- if (StringUtils.isNotEmpty(config.getUser()) && StringUtils.isNotEmpty(config.getPassword())) {
- cluster = clusterBuilder.withCredentials(config.getUser(), config.getPassword()).build();
- } else {
- cluster = clusterBuilder.build();
- }
-
- Metadata metadata = cluster.getMetadata();
- if (Objects.isNull(metadata.getKeyspace(keyspace))) {
- LOGGER.info("Keyspace {} does not exist. Creating Keyspace", keyspace);
- Map<String, Object> replication = new HashMap<>();
- replication.put("class", "SimpleStrategy");
- replication.put("replication_factor", 1);
-
- String createKeyspaceStmt = SchemaBuilder.createKeyspace(keyspace).with()
- .replication(replication).getQueryString();
- cluster.connect().execute(createKeyspaceStmt);
- }
-
- session = cluster.connect(keyspace);
-
- KeyspaceMetadata ks = metadata.getKeyspace(keyspace);
- TableMetadata tableMetadata = ks.getTable(table);
+ private synchronized void connectToCassandra() throws Exception {
- if (Objects.isNull(tableMetadata)) {
- LOGGER.info("Table {} does not exist in Keyspace {}. Creating Table", table, keyspace);
- String createTableStmt = SchemaBuilder.createTable(table)
- .addPartitionKey(config.getPartitionKeyColumn(), DataType.varchar())
- .addColumn(config.getColumn(), DataType.blob()).getQueryString();
+ client = new CassandraClient(config);
- session.execute(createTableStmt);
- }
}
@Override
public StreamsResultSet readAll() {
- ResultSet rs = session.execute(getSelectStatement());
+ ResultSet rs = client.client().execute(getSelectStatement());
Iterator<Row> rowsIterator = rs.iterator();
while (rowsIterator.hasNext()) {
@@ -289,7 +256,8 @@ public class CassandraPersistReader implements StreamsPersistReader {
private String getSelectStatement() {
return QueryBuilder.select().all()
- .from(table).getQueryString();
+ .from(config.getKeyspace(), config.getTable())
+ .getQueryString();
}
public class CassandraPersistReaderTask implements Runnable {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1797a707/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraPersistWriter.java b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraPersistWriter.java
index 81c0e9e..9f419bd 100644
--- a/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraPersistWriter.java
+++ b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraPersistWriter.java
@@ -77,12 +77,10 @@ public class CassandraPersistWriter implements StreamsPersistWriter, Runnable, F
private ScheduledExecutorService backgroundFlushTask = Executors.newSingleThreadScheduledExecutor();
private CassandraConfiguration config;
+ private CassandraClient client;
- protected Cluster cluster;
- protected Session session;
+ private Session session;
- protected String keyspace;
- protected String table;
protected PreparedStatement insertStatement;
protected List<BoundStatement> insertBatch = new ArrayList<>();
@@ -140,8 +138,8 @@ public class CassandraPersistWriter implements StreamsPersistWriter, Runnable, F
byte[] value = node.toString().getBytes();
String key = GuidUtils.generateGuid(node.toString());
- if(!Objects.isNull(streamsDatum.getMetadata().get("id"))) {
- key = streamsDatum.getMetadata().get("id").toString();
+ if(!Objects.isNull(streamsDatum.getId())) {
+ key = streamsDatum.getId();
}
BoundStatement statement = insertStatement.bind(key, ByteBuffer.wrap(value));
@@ -175,7 +173,7 @@ public class CassandraPersistWriter implements StreamsPersistWriter, Runnable, F
@Override
public synchronized void close() throws IOException {
session.close();
- cluster.close();
+ client.cluster().close();
backgroundFlushTask.shutdownNow();
}
@@ -183,7 +181,15 @@ public class CassandraPersistWriter implements StreamsPersistWriter, Runnable, F
* start write thread.
*/
public void start() {
- connectToCassandra();
+ try {
+ connectToCassandra();
+ client.start();
+ createKeyspaceAndTable();
+ createInsertStatement();
+ } catch (Exception e) {
+ LOGGER.error("Exception", e);
+ return;
+ }
backgroundFlushTask.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
@@ -269,51 +275,40 @@ public class CassandraPersistWriter implements StreamsPersistWriter, Runnable, F
}
}
- private synchronized void connectToCassandra() {
- Cluster.Builder clusterBuilder = Cluster.builder()
- .addContactPoints(config.getHost().toArray(new String[config.getHost().size()]))
- .withPort(config.getPort().intValue());
-
- keyspace = config.getKeyspace();
- table = config.getTable();
-
- if (StringUtils.isNotEmpty(config.getUser()) && StringUtils.isNotEmpty(config.getPassword())) {
- cluster = clusterBuilder.withCredentials(config.getUser(), config.getPassword()).build();
- } else {
- cluster = clusterBuilder.build();
- }
+ private synchronized void connectToCassandra() throws Exception {
+ client = new CassandraClient(config);
+ }
- Metadata metadata = cluster.getMetadata();
- if (Objects.isNull(metadata.getKeyspace(keyspace))) {
- LOGGER.info("Keyspace {} does not exist. Creating Keyspace", keyspace);
+ private void createKeyspaceAndTable() {
+ Metadata metadata = client.cluster().getMetadata();
+ if (Objects.isNull(metadata.getKeyspace(config.getKeyspace()))) {
+ LOGGER.info("Keyspace {} does not exist. Creating Keyspace", config.getKeyspace());
Map<String, Object> replication = new HashMap<>();
replication.put("class", "SimpleStrategy");
replication.put("replication_factor", 1);
- String createKeyspaceStmt = SchemaBuilder.createKeyspace(keyspace).with()
+ String createKeyspaceStmt = SchemaBuilder.createKeyspace(config.getKeyspace()).with()
.replication(replication).getQueryString();
- cluster.connect().execute(createKeyspaceStmt);
+ client.cluster().connect().execute(createKeyspaceStmt);
}
- session = cluster.connect(keyspace);
+ session = client.cluster().connect(config.getKeyspace());
- KeyspaceMetadata ks = metadata.getKeyspace(keyspace);
- TableMetadata tableMetadata = ks.getTable(table);
+ KeyspaceMetadata ks = metadata.getKeyspace(config.getKeyspace());
+ TableMetadata tableMetadata = ks.getTable(config.getTable());
if (Objects.isNull(tableMetadata)) {
- LOGGER.info("Table {} does not exist in Keyspace {}. Creating Table", table, keyspace);
- String createTableStmt = SchemaBuilder.createTable(table)
+ LOGGER.info("Table {} does not exist in Keyspace {}. Creating Table", config.getTable(), config.getKeyspace());
+ String createTableStmt = SchemaBuilder.createTable(config.getTable())
.addPartitionKey(config.getPartitionKeyColumn(), DataType.varchar())
.addColumn(config.getColumn(), DataType.blob()).getQueryString();
session.execute(createTableStmt);
}
-
- createInsertStatement();
}
private void createInsertStatement() {
- Insert insertBuilder = QueryBuilder.insertInto(table);
+ Insert insertBuilder = QueryBuilder.insertInto(config.getTable());
insertBuilder.value(config.getPartitionKeyColumn(), new Object());
insertBuilder.value(config.getColumn(), new Object());
insertStatement = session.prepare(insertBuilder.getQueryString());
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1797a707/streams-contrib/streams-persist-cassandra/src/main/jsonschema/org/apache/streams/cassandra/CassandraConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/src/main/jsonschema/org/apache/streams/cassandra/CassandraConfiguration.json b/streams-contrib/streams-persist-cassandra/src/main/jsonschema/org/apache/streams/cassandra/CassandraConfiguration.json
index 4b4cf1f..4b9d825 100644
--- a/streams-contrib/streams-persist-cassandra/src/main/jsonschema/org/apache/streams/cassandra/CassandraConfiguration.json
+++ b/streams-contrib/streams-persist-cassandra/src/main/jsonschema/org/apache/streams/cassandra/CassandraConfiguration.json
@@ -8,12 +8,12 @@
"javaType": "org.apache.streams.cassandra.CassandraConfiguration",
"javaInterfaces": ["java.io.Serializable"],
"properties": {
- "host": {
+ "hosts": {
"type": "array",
"items": {
"type": "string"
},
- "description": "Cassandra host"
+ "description": "Cassandra hosts"
},
"port": {
"type": "integer",
@@ -42,6 +42,30 @@
"column": {
"type": "string",
"description": "Column name"
+ },
+ "ssl": {
+ "type": "object",
+ "description": "ssl details",
+ "javaInterfaces": ["java.io.Serializable"],
+ "properties": {
+ "enabled": {
+ "type": "boolean",
+ "description": "ssl enabled",
+ "default": "false"
+ },
+ "trustStore": {
+ "type": "string"
+ },
+ "trustStorePassword": {
+ "type": "string"
+ },
+ "keyStore": {
+ "type": "string"
+ },
+ "keyStorePassword": {
+ "type": "string"
+ }
+ }
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1797a707/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/test/CassandraPersistIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/test/CassandraPersistIT.java b/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/test/CassandraPersistIT.java
index ca675b9..aa2f16d 100644
--- a/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/test/CassandraPersistIT.java
+++ b/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/test/CassandraPersistIT.java
@@ -101,6 +101,6 @@ public class CassandraPersistIT {
StreamsResultSet resultSet = reader.readAll();
LOGGER.info("Total Read: {}", resultSet.size() );
- Assert.assertEquals(89, resultSet.size());
+ Assert.assertEquals(resultSet.size(), 89);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1797a707/streams-contrib/streams-persist-cassandra/src/test/resources/CassandraPersistIT.conf
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/src/test/resources/CassandraPersistIT.conf b/streams-contrib/streams-persist-cassandra/src/test/resources/CassandraPersistIT.conf
index 62fec7d..9db9da5 100644
--- a/streams-contrib/streams-persist-cassandra/src/test/resources/CassandraPersistIT.conf
+++ b/streams-contrib/streams-persist-cassandra/src/test/resources/CassandraPersistIT.conf
@@ -16,7 +16,7 @@
# under the License.
cassandra {
- host = [${cassandra.tcp.host}]
+ hosts = [${cassandra.tcp.host}]
port = ${cassandra.tcp.port}
user = cassandra
password = cassandra