You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2017/07/15 00:25:00 UTC

zeppelin git commit: 0.7.1 with ssl

Repository: zeppelin
Updated Branches:
  refs/heads/branch-0.7 28310c2b9 -> 53ea135ce


0.7.1 with ssl

### What is this PR for?
The Cassandra Interpreter does not support talking to clusters that use SSL/client to node encryption. It does not have the properties needed to configure the SSL Context. This PR adds the properties to the driver config and sets up the SSL options when they are requested.

### What type of PR is it?
Feature

### What is the Jira issue?
[ZEPPELIN-1501](https://issues.apache.org/jira/browse/ZEPPELIN-1501)

### How should this be tested?
Using Cassandra/CQL interpreter connect to a Cassandra cluster that uses a client to node encryption, i.e:

```
%cassandra
describe keyspaces;
```
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? Yes.
    The new SSL-related properties should be added to the list (cassandra.ssl.enabled, cassandra.ssl.truststore.path and cassandra.ssl.truststore.password).

Author: Alex Lourie <dj...@gmail.com>
Author: Robert Marshall <ro...@Instaclustrs-MacBook-Pro.local>

Closes #2238 from alourie/0.7.1-withSSL and squashes the following commits:

2a6eee6 [Alex Lourie] Handle potential NPE in properties parsing
e9b4769 [Alex Lourie] Fix formatting for CI to pass
521b2d9 [Robert Marshall] Add SSL support to Cassandra interpreter


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/53ea135c
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/53ea135c
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/53ea135c

Branch: refs/heads/branch-0.7
Commit: 53ea135ce2de6be792aba6ebd9f67fb4d0996f7b
Parents: 28310c2
Author: Alex Lourie <dj...@gmail.com>
Authored: Thu Jul 13 19:43:21 2017 +0930
Committer: Lee moon soo <mo...@apache.org>
Committed: Sat Jul 15 09:24:56 2017 +0900

----------------------------------------------------------------------
 .../cassandra/CassandraInterpreter.java         | 100 +++++++++++++------
 .../src/main/resources/interpreter-setting.json |  18 ++++
 2 files changed, 89 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/53ea135c/cassandra/src/main/java/org/apache/zeppelin/cassandra/CassandraInterpreter.java
----------------------------------------------------------------------
diff --git a/cassandra/src/main/java/org/apache/zeppelin/cassandra/CassandraInterpreter.java b/cassandra/src/main/java/org/apache/zeppelin/cassandra/CassandraInterpreter.java
index a4984ad..166bb12 100644
--- a/cassandra/src/main/java/org/apache/zeppelin/cassandra/CassandraInterpreter.java
+++ b/cassandra/src/main/java/org/apache/zeppelin/cassandra/CassandraInterpreter.java
@@ -17,11 +17,11 @@
 package org.apache.zeppelin.cassandra;
 
 import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.JdkSSLOptions;
 import com.datastax.driver.core.ProtocolOptions.Compression;
 import com.datastax.driver.core.Session;
 import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
 import org.apache.zeppelin.scheduler.Scheduler;
@@ -29,11 +29,16 @@ import org.apache.zeppelin.scheduler.SchedulerFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManagerFactory;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.KeyStore;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
 
-import static com.datastax.driver.core.ProtocolOptions.DEFAULT_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS;
 import static java.lang.Integer.parseInt;
 
 /**
@@ -43,8 +48,8 @@ public class CassandraInterpreter extends Interpreter {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(CassandraInterpreter.class);
 
-  public static final String CASSANDRA_INTERPRETER_PARALLELISM = "cassandra.interpreter" +
-      ".parallelism";
+  public static final String CASSANDRA_INTERPRETER_PARALLELISM =
+          "cassandra.interpreter.parallelism";
   public static final String CASSANDRA_HOSTS = "cassandra.hosts";
   public static final String CASSANDRA_PORT = "cassandra.native.port";
   public static final String CASSANDRA_PROTOCOL_VERSION = "cassandra.protocol.version";
@@ -59,21 +64,21 @@ public class CassandraInterpreter extends Interpreter {
   public static final String CASSANDRA_SPECULATIVE_EXECUTION_POLICY =
           "cassandra.speculative.execution.policy";
   public static final String CASSANDRA_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS =
-        "cassandra.max.schema.agreement.wait.second";
+          "cassandra.max.schema.agreement.wait.second";
   public static final String CASSANDRA_POOLING_NEW_CONNECTION_THRESHOLD_LOCAL =
-        "cassandra.pooling.new.connection.threshold.local";
+          "cassandra.pooling.new.connection.threshold.local";
   public static final String CASSANDRA_POOLING_NEW_CONNECTION_THRESHOLD_REMOTE =
-        "cassandra.pooling.new.connection.threshold.remote";
+          "cassandra.pooling.new.connection.threshold.remote";
   public static final String CASSANDRA_POOLING_MAX_CONNECTION_PER_HOST_LOCAL =
-        "cassandra.pooling.max.connection.per.host.local";
+          "cassandra.pooling.max.connection.per.host.local";
   public static final String CASSANDRA_POOLING_MAX_CONNECTION_PER_HOST_REMOTE =
-        "cassandra.pooling.max.connection.per.host.remote";
+          "cassandra.pooling.max.connection.per.host.remote";
   public static final String CASSANDRA_POOLING_CORE_CONNECTION_PER_HOST_LOCAL =
           "cassandra.pooling.core.connection.per.host.local";
   public static final String CASSANDRA_POOLING_CORE_CONNECTION_PER_HOST_REMOTE =
           "cassandra.pooling.core.connection.per.host.remote";
   public static final String CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION_LOCAL =
-        "cassandra.pooling.max.request.per.connection.local";
+          "cassandra.pooling.max.request.per.connection.local";
   public static final String CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION_REMOTE =
           "cassandra.pooling.max.request.per.connection.remote";
   public static final String CASSANDRA_POOLING_IDLE_TIMEOUT_SECONDS =
@@ -106,6 +111,13 @@ public class CassandraInterpreter extends Interpreter {
           "cassandra.socket.soLinger";
   public static final String CASSANDRA_SOCKET_TCP_NO_DELAY =
           "cassandra.socket.tcp.no_delay";
+  public static final String CASSANDRA_WITH_SSL =
+          "cassandra.ssl.enabled";
+  public static final String CASSANDRA_TRUSTSTORE_PATH =
+          "cassandra.ssl.truststore.path";
+  public static final String CASSANDRA_TRUSTSTORE_PASSWORD =
+          "cassandra.ssl.truststore.password";
+
 
   public static final String DEFAULT_HOST = "localhost";
   public static final String DEFAULT_PORT = "9042";
@@ -143,6 +155,7 @@ public class CassandraInterpreter extends Interpreter {
   public static final List NO_COMPLETION = new ArrayList<>();
 
   InterpreterLogic helper;
+  Cluster.Builder clusterBuilder;
   Cluster cluster;
   Session session;
   private JavaDriverConfig driverConfig = new JavaDriverConfig();
@@ -162,29 +175,58 @@ public class CassandraInterpreter extends Interpreter {
     }
 
     LOGGER.info("Bootstrapping Cassandra Java Driver to connect to " + hosts.toString() +
-                  "on port " + port);
+            "on port " + port);
 
     Compression compression = driverConfig.getCompressionProtocol(this);
 
-    cluster  = Cluster.builder()
-      .addContactPoints(addresses)
-      .withPort(port)
-      .withProtocolVersion(driverConfig.getProtocolVersion(this))
-      .withClusterName(getProperty(CASSANDRA_CLUSTER_NAME))
-      .withCompression(compression)
-      .withCredentials(getProperty(CASSANDRA_CREDENTIALS_USERNAME),
-              getProperty(CASSANDRA_CREDENTIALS_PASSWORD))
-      .withLoadBalancingPolicy(driverConfig.getLoadBalancingPolicy(this))
-      .withRetryPolicy(driverConfig.getRetryPolicy(this))
-      .withReconnectionPolicy(driverConfig.getReconnectionPolicy(this))
-      .withSpeculativeExecutionPolicy(driverConfig.getSpeculativeExecutionPolicy(this))
-      .withMaxSchemaAgreementWaitSeconds(
-              parseInt(getProperty(CASSANDRA_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS)))
-      .withPoolingOptions(driverConfig.getPoolingOptions(this))
-      .withQueryOptions(driverConfig.getQueryOptions(this))
-      .withSocketOptions(driverConfig.getSocketOptions(this))
-      .build();
+    clusterBuilder = Cluster.builder()
+            .addContactPoints(addresses)
+            .withPort(port)
+            .withProtocolVersion(driverConfig.getProtocolVersion(this))
+            .withClusterName(getProperty(CASSANDRA_CLUSTER_NAME))
+            .withCompression(compression)
+            .withCredentials(getProperty(CASSANDRA_CREDENTIALS_USERNAME),
+                    getProperty(CASSANDRA_CREDENTIALS_PASSWORD))
+            .withLoadBalancingPolicy(driverConfig.getLoadBalancingPolicy(this))
+            .withRetryPolicy(driverConfig.getRetryPolicy(this))
+            .withReconnectionPolicy(driverConfig.getReconnectionPolicy(this))
+            .withSpeculativeExecutionPolicy(driverConfig.getSpeculativeExecutionPolicy(this))
+            .withMaxSchemaAgreementWaitSeconds(
+                    parseInt(getProperty(CASSANDRA_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS)))
+            .withPoolingOptions(driverConfig.getPoolingOptions(this))
+            .withQueryOptions(driverConfig.getQueryOptions(this))
+            .withSocketOptions(driverConfig.getSocketOptions(this));
+
+    final String runWithSSL = getProperty(CASSANDRA_WITH_SSL);
+    if (runWithSSL != null && runWithSSL.equals("true")) {
+      LOGGER.debug("Cassandra Interpreter: Using SSL");
+
+      try {
+        final SSLContext sslContext;
+        {
+          final KeyStore trustStore = KeyStore.getInstance("JKS");
+          final InputStream stream = Files.newInputStream(Paths.get(
+                  getProperty(CASSANDRA_TRUSTSTORE_PATH)));
+          trustStore.load(stream, getProperty(CASSANDRA_TRUSTSTORE_PASSWORD).toCharArray());
+
+          final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(
+                  TrustManagerFactory.getDefaultAlgorithm());
+          trustManagerFactory.init(trustStore);
+
+          sslContext = SSLContext.getInstance("TLS");
+          sslContext.init(null, trustManagerFactory.getTrustManagers(), null);
+        }
+        clusterBuilder = clusterBuilder.withSSL(JdkSSLOptions.builder()
+                .withSSLContext(sslContext)
+                .build());
+      } catch (Exception e) {
+        LOGGER.error(e.toString());
+      }
+    } else {
+      LOGGER.debug("Cassandra Interpreter: Not using SSL");
+    }
 
+    cluster = clusterBuilder.build();
     session = cluster.connect();
     helper = new InterpreterLogic(session);
   }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/53ea135c/cassandra/src/main/resources/interpreter-setting.json
----------------------------------------------------------------------
diff --git a/cassandra/src/main/resources/interpreter-setting.json b/cassandra/src/main/resources/interpreter-setting.json
index 3df120d..c878b92 100644
--- a/cassandra/src/main/resources/interpreter-setting.json
+++ b/cassandra/src/main/resources/interpreter-setting.json
@@ -189,6 +189,24 @@
         "propertyName": "cassandra.socket.tcp.no_delay",
         "defaultValue": "true",
         "description": "Cassandra socket TCP no delay. Default = true"
+      },
+      "cassandra.ssl.enabled": {
+        "envName": null,
+        "propertyName": "cassandra.ssl.enabled",
+        "defaultValue": "false",
+        "description": "Cassandra SSL"
+      },
+      "cassandra.ssl.truststore.path": {
+        "envName": null,
+        "propertyName": "cassandra.ssl.truststore.path",
+        "defaultValue": "none",
+        "description": "Cassandra truststore path. Default = none"
+      },
+      "cassandra.ssl.truststore.password": {
+        "envName": null,
+        "propertyName": "cassandra.ssl.truststore.password",
+        "defaultValue": "none",
+        "description": "Cassandra truststore password. Default = none"
       }
     },
     "editor": {