You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2017/01/27 15:53:00 UTC

incubator-streams git commit: STREAMS-483: streams-persist-cassandra SSL

Repository: incubator-streams
Updated Branches:
  refs/heads/master cc46ab69a -> 1797a7073


STREAMS-483: streams-persist-cassandra SSL

resolves #353

Squashed commit of the following:

commit 5ef324b94bccaf087ea5b30ba2080f051aac24dd
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Date:   Fri Jan 27 09:51:43 2017 -0600

    remove unnecessary guava methods

commit cc80a98947bbde19183924c3b98686cd291cdc1b
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Date:   Wed Jan 25 18:12:42 2017 -0600

    STREAMS-483: add support for SSL connections to streams-persist-cassandra

    also ensures the ITs can run & pass repeatedly


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/1797a707
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/1797a707
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/1797a707

Branch: refs/heads/master
Commit: 1797a707320f2f5ed9640e9191950435625094c4
Parents: cc46ab6
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Fri Jan 27 09:52:52 2017 -0600
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Fri Jan 27 09:52:52 2017 -0600

----------------------------------------------------------------------
 .../streams-persist-cassandra/pom.xml           |  12 ++
 .../streams/cassandra/CassandraClient.java      | 160 +++++++++++++++++++
 .../cassandra/CassandraPersistReader.java       |  62 ++-----
 .../cassandra/CassandraPersistWriter.java       |  63 ++++----
 .../cassandra/CassandraConfiguration.json       |  28 +++-
 .../cassandra/test/CassandraPersistIT.java      |   2 +-
 .../src/test/resources/CassandraPersistIT.conf  |   2 +-
 7 files changed, 244 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1797a707/streams-contrib/streams-persist-cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/pom.xml b/streams-contrib/streams-persist-cassandra/pom.xml
index f29b9e5..75785dd 100644
--- a/streams-contrib/streams-persist-cassandra/pom.xml
+++ b/streams-contrib/streams-persist-cassandra/pom.xml
@@ -30,6 +30,7 @@
     <description>Cassandra Module</description>
 
     <properties>
+        <cassandra.version>3.9</cassandra.version>
         <cassandra-driver.version>3.1.2</cassandra-driver.version>
     </properties>
 
@@ -55,6 +56,17 @@
             <version>${project.version}</version>
         </dependency>
         <dependency>
+            <groupId>org.apache.cassandra</groupId>
+            <artifactId>cassandra-all</artifactId>
+            <version>${cassandra.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.datastax.cassandra</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
             <groupId>com.datastax.cassandra</groupId>
             <artifactId>cassandra-driver-core</artifactId>
             <version>${cassandra-driver.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1797a707/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraClient.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraClient.java b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraClient.java
new file mode 100644
index 0000000..bbb6b51
--- /dev/null
+++ b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraClient.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.JdkSSLOptions;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.SSLOptions;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SocketOptions;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.security.KeyStore;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Objects;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManagerFactory;
+
+import static com.datastax.driver.core.SocketOptions.DEFAULT_CONNECT_TIMEOUT_MILLIS;
+import static com.datastax.driver.core.SocketOptions.DEFAULT_READ_TIMEOUT_MILLIS;
+
+public class CassandraClient {
+
+  private static final Logger LOGGER = LoggerFactory
+      .getLogger(CassandraClient.class);
+
+  private Cluster cluster;
+  private Session session;
+
+  public CassandraConfiguration config;
+
+  public CassandraClient(CassandraConfiguration config) throws Exception {
+    this.config = config;
+    org.apache.cassandra.config.Config.setClientMode(true);
+  }
+
+  public void start() throws Exception {
+
+    Objects.nonNull(config);
+
+    LOGGER.info("CassandraClient.start {}", config);
+
+    Cluster.Builder builder = Cluster.builder()
+        .withPort(config.getPort().intValue())
+        .withoutJMXReporting()
+        .withoutMetrics()
+        .withSocketOptions(
+            new SocketOptions()
+                .setConnectTimeoutMillis(DEFAULT_CONNECT_TIMEOUT_MILLIS*10)
+                .setReadTimeoutMillis(DEFAULT_READ_TIMEOUT_MILLIS*10)
+        );
+
+    if( config.getSsl() != null && config.getSsl().getEnabled() == true) {
+
+      Ssl ssl = config.getSsl();
+
+      KeyStore ks = KeyStore.getInstance("JKS");
+
+      InputStream trustStore = new FileInputStream(ssl.getTrustStore());
+      ks.load(trustStore, ssl.getTrustStorePassword().toCharArray());
+      InputStream keyStore = new FileInputStream(ssl.getKeyStore());
+      ks.load(keyStore, ssl.getKeyStorePassword().toCharArray());
+
+      TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+      tmf.init(ks);
+
+      KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+      kmf.init(ks, ssl.getKeyStorePassword().toCharArray());
+
+      SSLContext sslContext = SSLContext.getInstance("SSLv3");
+      sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
+
+      SSLOptions sslOptions = JdkSSLOptions.builder()
+          .withSSLContext(sslContext)
+          .build();
+
+      builder = builder.withSSL(sslOptions);
+    }
+
+    Collection<InetSocketAddress> addresses = new ArrayList<>();
+    for (String h : config.getHosts()) {
+      LOGGER.info("Adding Host: {}", h);
+      InetSocketAddress socketAddress = new InetSocketAddress(h, config.getPort().intValue());
+      addresses.add(socketAddress);
+    }
+    builder.addContactPointsWithPorts(addresses);
+
+    if( StringUtils.isNotBlank(config.getUser()) &&
+        StringUtils.isNotBlank(config.getPassword())) {
+      builder.withCredentials(config.getUser(), config.getPassword());
+    }
+    cluster = builder.build();
+
+    Objects.nonNull(cluster);
+
+    try {
+      Metadata metadata = cluster.getMetadata();
+      LOGGER.info("Connected to cluster: {}\n",
+          metadata.getClusterName());
+      for ( Host host : metadata.getAllHosts() ) {
+        LOGGER.info("Datacenter: {}; Host: {}; Rack: {}\n",
+            host.getDatacenter(), host.getAddress(), host.getRack());
+      }
+    } catch( Exception e ) {
+      LOGGER.error("Exception: {}", e);
+      throw e;
+    }
+
+    try {
+      session = cluster.connect();
+    } catch( Exception e ) {
+      LOGGER.error("Exception: {}", e);
+      throw e;
+    }
+
+    Objects.nonNull(session);
+
+  }
+
+  public void stop() throws Exception {
+    session.close();
+    cluster.close();
+  }
+
+  public CassandraConfiguration config() {
+    return config;
+  }
+
+  public Session client() {
+    return session;
+  }
+
+  public Cluster cluster() {
+    return cluster;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1797a707/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraPersistReader.java b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraPersistReader.java
index aaa40fe..251a02d 100644
--- a/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraPersistReader.java
+++ b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraPersistReader.java
@@ -76,12 +76,8 @@ public class CassandraPersistReader implements StreamsPersistReader {
   private CompletableFuture<Boolean> readerTaskFuture = new CompletableFuture<>();
 
   private CassandraConfiguration config;
+  private CassandraClient client;
 
-  protected Cluster cluster;
-  protected Session session;
-
-  protected String keyspace;
-  protected String table;
   protected Iterator<Row> rowIterator;
 
   protected final ReadWriteLock lock = new ReentrantReadWriteLock();
@@ -130,14 +126,20 @@ public class CassandraPersistReader implements StreamsPersistReader {
 
   @Override
   public void prepare(Object configurationObject) {
-    connectToCassandra();
+    try {
+      connectToCassandra();
+      client.start();
+    } catch (Exception e) {
+      LOGGER.error("Exception", e);
+      return;
+    }
 
     String selectStatement = getSelectStatement();
-    ResultSet rs = session.execute(selectStatement);
+    ResultSet rs = client.client().execute(selectStatement);
     rowIterator = rs.iterator();
 
     if (!rowIterator.hasNext()) {
-      throw new RuntimeException("Table" + table + "is empty!");
+      throw new RuntimeException("Table" + config.getTable() + "is empty!");
     }
 
     persistQueue = constructQueue();
@@ -164,50 +166,15 @@ public class CassandraPersistReader implements StreamsPersistReader {
     return new StreamsDatum(objectNode);
   }
 
-  private synchronized void connectToCassandra() {
-    Cluster.Builder clusterBuilder = Cluster.builder()
-        .addContactPoints(config.getHost().toArray(new String[config.getHost().size()]))
-        .withPort(config.getPort().intValue());
-
-    keyspace = config.getKeyspace();
-    table = config.getTable();
-
-    if (StringUtils.isNotEmpty(config.getUser()) && StringUtils.isNotEmpty(config.getPassword())) {
-      cluster = clusterBuilder.withCredentials(config.getUser(), config.getPassword()).build();
-    } else {
-      cluster = clusterBuilder.build();
-    }
-
-    Metadata metadata = cluster.getMetadata();
-    if (Objects.isNull(metadata.getKeyspace(keyspace))) {
-      LOGGER.info("Keyspace {} does not exist. Creating Keyspace", keyspace);
-      Map<String, Object> replication = new HashMap<>();
-      replication.put("class", "SimpleStrategy");
-      replication.put("replication_factor", 1);
-
-      String createKeyspaceStmt = SchemaBuilder.createKeyspace(keyspace).with()
-          .replication(replication).getQueryString();
-      cluster.connect().execute(createKeyspaceStmt);
-    }
-
-    session = cluster.connect(keyspace);
-
-    KeyspaceMetadata ks = metadata.getKeyspace(keyspace);
-    TableMetadata tableMetadata = ks.getTable(table);
+  private synchronized void connectToCassandra() throws Exception {
 
-    if (Objects.isNull(tableMetadata)) {
-      LOGGER.info("Table {} does not exist in Keyspace {}. Creating Table", table, keyspace);
-      String createTableStmt = SchemaBuilder.createTable(table)
-                                .addPartitionKey(config.getPartitionKeyColumn(), DataType.varchar())
-                                .addColumn(config.getColumn(), DataType.blob()).getQueryString();
+    client = new CassandraClient(config);
 
-      session.execute(createTableStmt);
-    }
   }
 
   @Override
   public StreamsResultSet readAll() {
-    ResultSet rs = session.execute(getSelectStatement());
+    ResultSet rs = client.client().execute(getSelectStatement());
     Iterator<Row> rowsIterator = rs.iterator();
 
     while (rowsIterator.hasNext()) {
@@ -289,7 +256,8 @@ public class CassandraPersistReader implements StreamsPersistReader {
 
   private String getSelectStatement() {
     return QueryBuilder.select().all()
-      .from(table).getQueryString();
+      .from(config.getKeyspace(), config.getTable())
+      .getQueryString();
   }
 
   public class CassandraPersistReaderTask implements Runnable {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1797a707/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraPersistWriter.java b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraPersistWriter.java
index 81c0e9e..9f419bd 100644
--- a/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraPersistWriter.java
+++ b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraPersistWriter.java
@@ -77,12 +77,10 @@ public class CassandraPersistWriter implements StreamsPersistWriter, Runnable, F
   private ScheduledExecutorService backgroundFlushTask = Executors.newSingleThreadScheduledExecutor();
 
   private CassandraConfiguration config;
+  private CassandraClient client;
 
-  protected Cluster cluster;
-  protected Session session;
+  private Session session;
 
-  protected String keyspace;
-  protected String table;
   protected PreparedStatement insertStatement;
 
   protected List<BoundStatement> insertBatch = new ArrayList<>();
@@ -140,8 +138,8 @@ public class CassandraPersistWriter implements StreamsPersistWriter, Runnable, F
         byte[] value = node.toString().getBytes();
 
         String key = GuidUtils.generateGuid(node.toString());
-        if(!Objects.isNull(streamsDatum.getMetadata().get("id"))) {
-          key = streamsDatum.getMetadata().get("id").toString();
+        if(!Objects.isNull(streamsDatum.getId())) {
+          key = streamsDatum.getId();
         }
 
         BoundStatement statement = insertStatement.bind(key, ByteBuffer.wrap(value));
@@ -175,7 +173,7 @@ public class CassandraPersistWriter implements StreamsPersistWriter, Runnable, F
   @Override
   public synchronized void close() throws IOException {
     session.close();
-    cluster.close();
+    client.cluster().close();
     backgroundFlushTask.shutdownNow();
   }
 
@@ -183,7 +181,15 @@ public class CassandraPersistWriter implements StreamsPersistWriter, Runnable, F
    * start write thread.
    */
   public void start() {
-    connectToCassandra();
+    try {
+      connectToCassandra();
+      client.start();
+      createKeyspaceAndTable();
+      createInsertStatement();
+    } catch (Exception e) {
+      LOGGER.error("Exception", e);
+      return;
+    }
     backgroundFlushTask.scheduleAtFixedRate(new Runnable() {
       @Override
       public void run() {
@@ -269,51 +275,40 @@ public class CassandraPersistWriter implements StreamsPersistWriter, Runnable, F
     }
   }
 
-  private synchronized void connectToCassandra() {
-    Cluster.Builder clusterBuilder = Cluster.builder()
-        .addContactPoints(config.getHost().toArray(new String[config.getHost().size()]))
-        .withPort(config.getPort().intValue());
-
-    keyspace = config.getKeyspace();
-    table = config.getTable();
-
-    if (StringUtils.isNotEmpty(config.getUser()) && StringUtils.isNotEmpty(config.getPassword())) {
-      cluster = clusterBuilder.withCredentials(config.getUser(), config.getPassword()).build();
-    } else {
-      cluster = clusterBuilder.build();
-    }
+  private synchronized void connectToCassandra() throws Exception {
+    client = new CassandraClient(config);
+  }
 
-    Metadata metadata = cluster.getMetadata();
-    if (Objects.isNull(metadata.getKeyspace(keyspace))) {
-      LOGGER.info("Keyspace {} does not exist. Creating Keyspace", keyspace);
+  private void createKeyspaceAndTable() {
+    Metadata metadata = client.cluster().getMetadata();
+    if (Objects.isNull(metadata.getKeyspace(config.getKeyspace()))) {
+      LOGGER.info("Keyspace {} does not exist. Creating Keyspace", config.getKeyspace());
       Map<String, Object> replication = new HashMap<>();
       replication.put("class", "SimpleStrategy");
       replication.put("replication_factor", 1);
 
-      String createKeyspaceStmt = SchemaBuilder.createKeyspace(keyspace).with()
+      String createKeyspaceStmt = SchemaBuilder.createKeyspace(config.getKeyspace()).with()
           .replication(replication).getQueryString();
-      cluster.connect().execute(createKeyspaceStmt);
+      client.cluster().connect().execute(createKeyspaceStmt);
     }
 
-    session = cluster.connect(keyspace);
+    session = client.cluster().connect(config.getKeyspace());
 
-    KeyspaceMetadata ks = metadata.getKeyspace(keyspace);
-    TableMetadata tableMetadata = ks.getTable(table);
+    KeyspaceMetadata ks = metadata.getKeyspace(config.getKeyspace());
+    TableMetadata tableMetadata = ks.getTable(config.getTable());
 
     if (Objects.isNull(tableMetadata)) {
-      LOGGER.info("Table {} does not exist in Keyspace {}. Creating Table", table, keyspace);
-      String createTableStmt = SchemaBuilder.createTable(table)
+      LOGGER.info("Table {} does not exist in Keyspace {}. Creating Table", config.getTable(), config.getKeyspace());
+      String createTableStmt = SchemaBuilder.createTable(config.getTable())
                                 .addPartitionKey(config.getPartitionKeyColumn(), DataType.varchar())
                                 .addColumn(config.getColumn(), DataType.blob()).getQueryString();
 
       session.execute(createTableStmt);
     }
-
-    createInsertStatement();
   }
 
   private void createInsertStatement() {
-    Insert insertBuilder = QueryBuilder.insertInto(table);
+    Insert insertBuilder = QueryBuilder.insertInto(config.getTable());
     insertBuilder.value(config.getPartitionKeyColumn(), new Object());
     insertBuilder.value(config.getColumn(), new Object());
     insertStatement = session.prepare(insertBuilder.getQueryString());

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1797a707/streams-contrib/streams-persist-cassandra/src/main/jsonschema/org/apache/streams/cassandra/CassandraConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/src/main/jsonschema/org/apache/streams/cassandra/CassandraConfiguration.json b/streams-contrib/streams-persist-cassandra/src/main/jsonschema/org/apache/streams/cassandra/CassandraConfiguration.json
index 4b4cf1f..4b9d825 100644
--- a/streams-contrib/streams-persist-cassandra/src/main/jsonschema/org/apache/streams/cassandra/CassandraConfiguration.json
+++ b/streams-contrib/streams-persist-cassandra/src/main/jsonschema/org/apache/streams/cassandra/CassandraConfiguration.json
@@ -8,12 +8,12 @@
   "javaType": "org.apache.streams.cassandra.CassandraConfiguration",
   "javaInterfaces": ["java.io.Serializable"],
   "properties": {
-    "host": {
+    "hosts": {
       "type": "array",
       "items": {
         "type": "string"
       },
-      "description": "Cassandra host"
+      "description": "Cassandra hosts"
     },
     "port": {
       "type": "integer",
@@ -42,6 +42,30 @@
     "column": {
       "type": "string",
       "description": "Column name"
+    },
+    "ssl": {
+      "type": "object",
+      "description": "ssl details",
+      "javaInterfaces": ["java.io.Serializable"],
+      "properties": {
+        "enabled": {
+          "type": "boolean",
+          "description": "ssl enabled",
+          "default": "false"
+        },
+        "trustStore": {
+          "type": "string"
+        },
+        "trustStorePassword": {
+          "type": "string"
+        },
+        "keyStore": {
+          "type": "string"
+        },
+        "keyStorePassword": {
+          "type": "string"
+        }
+      }
     }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1797a707/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/test/CassandraPersistIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/test/CassandraPersistIT.java b/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/test/CassandraPersistIT.java
index ca675b9..aa2f16d 100644
--- a/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/test/CassandraPersistIT.java
+++ b/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/test/CassandraPersistIT.java
@@ -101,6 +101,6 @@ public class CassandraPersistIT {
     StreamsResultSet resultSet = reader.readAll();
 
     LOGGER.info("Total Read: {}", resultSet.size() );
-    Assert.assertEquals(89, resultSet.size());
+    Assert.assertEquals(resultSet.size(), 89);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1797a707/streams-contrib/streams-persist-cassandra/src/test/resources/CassandraPersistIT.conf
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/src/test/resources/CassandraPersistIT.conf b/streams-contrib/streams-persist-cassandra/src/test/resources/CassandraPersistIT.conf
index 62fec7d..9db9da5 100644
--- a/streams-contrib/streams-persist-cassandra/src/test/resources/CassandraPersistIT.conf
+++ b/streams-contrib/streams-persist-cassandra/src/test/resources/CassandraPersistIT.conf
@@ -16,7 +16,7 @@
 # under the License.
 
 cassandra {
-  host = [${cassandra.tcp.host}]
+  hosts = [${cassandra.tcp.host}]
   port = ${cassandra.tcp.port}
   user = cassandra
   password = cassandra