You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by steveblackmon <gi...@git.apache.org> on 2017/01/26 00:14:55 UTC

[GitHub] incubator-streams pull request #353: STREAMS-483: add support for SSL connec...

GitHub user steveblackmon opened a pull request:

    https://github.com/apache/incubator-streams/pull/353

    STREAMS-483: add support for SSL connections to streams-persist-cassandra

    refactor client object to be shared by reader and writer
    also ensures the ITs can run & pass repeatedly

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/steveblackmon/incubator-streams STREAMS-483

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-streams/pull/353.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #353
    
----
commit cc80a98947bbde19183924c3b98686cd291cdc1b
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Date:   2017-01-26T00:12:42Z

    STREAMS-483: add support for SSL connections to streams-persist-cassandra
    
    also ensures the ITs can run & pass repeatedly

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request #353: STREAMS-483: add support for SSL connec...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-streams/pull/353


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request #353: STREAMS-483: add support for SSL connec...

Posted by steveblackmon <gi...@git.apache.org>.
Github user steveblackmon commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/353#discussion_r98104694
  
    --- Diff: streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraClient.java ---
    @@ -0,0 +1,160 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.streams.cassandra;
    +
    +import com.datastax.driver.core.Cluster;
    +import com.datastax.driver.core.Host;
    +import com.datastax.driver.core.JdkSSLOptions;
    +import com.datastax.driver.core.Metadata;
    +import com.datastax.driver.core.SSLOptions;
    +import com.datastax.driver.core.Session;
    +import com.datastax.driver.core.SocketOptions;
    +import com.google.common.base.Preconditions;
    +import com.google.common.base.Strings;
    +import com.google.common.collect.Lists;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.FileInputStream;
    +import java.io.InputStream;
    +import java.net.InetSocketAddress;
    +import java.security.KeyStore;
    +import java.util.Collection;
    +import javax.net.ssl.KeyManagerFactory;
    +import javax.net.ssl.SSLContext;
    +import javax.net.ssl.TrustManagerFactory;
    +
    +import static com.datastax.driver.core.SocketOptions.DEFAULT_CONNECT_TIMEOUT_MILLIS;
    +import static com.datastax.driver.core.SocketOptions.DEFAULT_READ_TIMEOUT_MILLIS;
    +
    +public class CassandraClient {
    +
    +  private static final Logger LOGGER = LoggerFactory
    +      .getLogger(CassandraClient.class);
    +
    +  private Cluster cluster;
    +  private Session session;
    +
    +  public CassandraConfiguration config;
    +
    +  public CassandraClient(CassandraConfiguration config) throws Exception {
    +    this.config = config;
    +    org.apache.cassandra.config.Config.setClientMode(true);
    +  }
    +
    +  public void start() throws Exception {
    +
    +    Preconditions.checkNotNull(config);
    +
    +    LOGGER.info("CassandraClient.start {}", config);
    +
    +    Cluster.Builder builder = Cluster.builder()
    +        .withPort(config.getPort().intValue())
    +        .withoutJMXReporting()
    +        .withoutMetrics()
    +        .withSocketOptions(
    +            new SocketOptions()
    +                .setConnectTimeoutMillis(DEFAULT_CONNECT_TIMEOUT_MILLIS*10)
    +                .setReadTimeoutMillis(DEFAULT_READ_TIMEOUT_MILLIS*10)
    +        );
    +
    +    if( config.getSsl() != null && config.getSsl().getEnabled() == true) {
    +
    +      Ssl ssl = config.getSsl();
    +
    +      KeyStore ks = KeyStore.getInstance("JKS");
    +
    +      InputStream trustStore = new FileInputStream(ssl.getTrustStore());
    +      ks.load(trustStore, ssl.getTrustStorePassword().toCharArray());
    +      InputStream keyStore = new FileInputStream(ssl.getKeyStore());
    +      ks.load(keyStore, ssl.getKeyStorePassword().toCharArray());
    +
    +      TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
    +      tmf.init(ks);
    +
    +      KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
    +      kmf.init(ks, ssl.getKeyStorePassword().toCharArray());
    +
    +      SSLContext sslContext = SSLContext.getInstance("SSLv3");
    +      sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
    +
    +      SSLOptions sslOptions = JdkSSLOptions.builder()
    +          .withSSLContext(sslContext)
    +          .build();
    +
    +      builder = builder.withSSL(sslOptions);
    +    }
    +
    +    Collection<InetSocketAddress> addresses = Lists.newArrayList();
    +    for (String h : config.getHosts()) {
    +      LOGGER.info("Adding Host: {}", h);
    +      InetSocketAddress socketAddress = new InetSocketAddress(h, config.getPort().intValue());
    +      addresses.add(socketAddress);
    +    }
    +    builder.addContactPointsWithPorts(addresses);
    +
    +    if( !Strings.isNullOrEmpty(config.getUser()) &&
    +        !Strings.isNullOrEmpty(config.getPassword())) {
    +      builder.withCredentials(config.getUser(), config.getPassword());
    +    }
    +    cluster = builder.build();
    +
    +    Preconditions.checkNotNull(cluster);
    --- End diff --
    
    i'll change it, to make it easier to remove once we have a substitute for Queues


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request #353: STREAMS-483: add support for SSL connec...

Posted by steveblackmon <gi...@git.apache.org>.
Github user steveblackmon commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/353#discussion_r98104675
  
    --- Diff: streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraClient.java ---
    @@ -0,0 +1,160 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.streams.cassandra;
    +
    +import com.datastax.driver.core.Cluster;
    +import com.datastax.driver.core.Host;
    +import com.datastax.driver.core.JdkSSLOptions;
    +import com.datastax.driver.core.Metadata;
    +import com.datastax.driver.core.SSLOptions;
    +import com.datastax.driver.core.Session;
    +import com.datastax.driver.core.SocketOptions;
    +import com.google.common.base.Preconditions;
    +import com.google.common.base.Strings;
    +import com.google.common.collect.Lists;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.FileInputStream;
    +import java.io.InputStream;
    +import java.net.InetSocketAddress;
    +import java.security.KeyStore;
    +import java.util.Collection;
    +import javax.net.ssl.KeyManagerFactory;
    +import javax.net.ssl.SSLContext;
    +import javax.net.ssl.TrustManagerFactory;
    +
    +import static com.datastax.driver.core.SocketOptions.DEFAULT_CONNECT_TIMEOUT_MILLIS;
    +import static com.datastax.driver.core.SocketOptions.DEFAULT_READ_TIMEOUT_MILLIS;
    +
    +public class CassandraClient {
    +
    +  private static final Logger LOGGER = LoggerFactory
    +      .getLogger(CassandraClient.class);
    +
    +  private Cluster cluster;
    +  private Session session;
    +
    +  public CassandraConfiguration config;
    +
    +  public CassandraClient(CassandraConfiguration config) throws Exception {
    +    this.config = config;
    +    org.apache.cassandra.config.Config.setClientMode(true);
    +  }
    +
    +  public void start() throws Exception {
    +
    +    Preconditions.checkNotNull(config);
    +
    +    LOGGER.info("CassandraClient.start {}", config);
    +
    +    Cluster.Builder builder = Cluster.builder()
    +        .withPort(config.getPort().intValue())
    +        .withoutJMXReporting()
    +        .withoutMetrics()
    +        .withSocketOptions(
    +            new SocketOptions()
    +                .setConnectTimeoutMillis(DEFAULT_CONNECT_TIMEOUT_MILLIS*10)
    +                .setReadTimeoutMillis(DEFAULT_READ_TIMEOUT_MILLIS*10)
    +        );
    +
    +    if( config.getSsl() != null && config.getSsl().getEnabled() == true) {
    +
    +      Ssl ssl = config.getSsl();
    +
    +      KeyStore ks = KeyStore.getInstance("JKS");
    +
    +      InputStream trustStore = new FileInputStream(ssl.getTrustStore());
    +      ks.load(trustStore, ssl.getTrustStorePassword().toCharArray());
    +      InputStream keyStore = new FileInputStream(ssl.getKeyStore());
    +      ks.load(keyStore, ssl.getKeyStorePassword().toCharArray());
    +
    +      TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
    +      tmf.init(ks);
    +
    +      KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
    +      kmf.init(ks, ssl.getKeyStorePassword().toCharArray());
    +
    +      SSLContext sslContext = SSLContext.getInstance("SSLv3");
    +      sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
    +
    +      SSLOptions sslOptions = JdkSSLOptions.builder()
    +          .withSSLContext(sslContext)
    +          .build();
    +
    +      builder = builder.withSSL(sslOptions);
    +    }
    +
    +    Collection<InetSocketAddress> addresses = Lists.newArrayList();
    --- End diff --
    
    i'll change it, to make it easier to remove once we have a substitute for Queues


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request #353: STREAMS-483: add support for SSL connec...

Posted by smarthi <gi...@git.apache.org>.
Github user smarthi commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/353#discussion_r98064071
  
    --- Diff: streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraClient.java ---
    @@ -0,0 +1,160 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.streams.cassandra;
    +
    +import com.datastax.driver.core.Cluster;
    +import com.datastax.driver.core.Host;
    +import com.datastax.driver.core.JdkSSLOptions;
    +import com.datastax.driver.core.Metadata;
    +import com.datastax.driver.core.SSLOptions;
    +import com.datastax.driver.core.Session;
    +import com.datastax.driver.core.SocketOptions;
    +import com.google.common.base.Preconditions;
    +import com.google.common.base.Strings;
    +import com.google.common.collect.Lists;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.FileInputStream;
    +import java.io.InputStream;
    +import java.net.InetSocketAddress;
    +import java.security.KeyStore;
    +import java.util.Collection;
    +import javax.net.ssl.KeyManagerFactory;
    +import javax.net.ssl.SSLContext;
    +import javax.net.ssl.TrustManagerFactory;
    +
    +import static com.datastax.driver.core.SocketOptions.DEFAULT_CONNECT_TIMEOUT_MILLIS;
    +import static com.datastax.driver.core.SocketOptions.DEFAULT_READ_TIMEOUT_MILLIS;
    +
    +public class CassandraClient {
    +
    +  private static final Logger LOGGER = LoggerFactory
    +      .getLogger(CassandraClient.class);
    +
    +  private Cluster cluster;
    +  private Session session;
    +
    +  public CassandraConfiguration config;
    +
    +  public CassandraClient(CassandraConfiguration config) throws Exception {
    +    this.config = config;
    +    org.apache.cassandra.config.Config.setClientMode(true);
    +  }
    +
    +  public void start() throws Exception {
    +
    +    Preconditions.checkNotNull(config);
    +
    +    LOGGER.info("CassandraClient.start {}", config);
    +
    +    Cluster.Builder builder = Cluster.builder()
    +        .withPort(config.getPort().intValue())
    +        .withoutJMXReporting()
    +        .withoutMetrics()
    +        .withSocketOptions(
    +            new SocketOptions()
    +                .setConnectTimeoutMillis(DEFAULT_CONNECT_TIMEOUT_MILLIS*10)
    +                .setReadTimeoutMillis(DEFAULT_READ_TIMEOUT_MILLIS*10)
    +        );
    +
    +    if( config.getSsl() != null && config.getSsl().getEnabled() == true) {
    +
    +      Ssl ssl = config.getSsl();
    +
    +      KeyStore ks = KeyStore.getInstance("JKS");
    +
    +      InputStream trustStore = new FileInputStream(ssl.getTrustStore());
    +      ks.load(trustStore, ssl.getTrustStorePassword().toCharArray());
    +      InputStream keyStore = new FileInputStream(ssl.getKeyStore());
    +      ks.load(keyStore, ssl.getKeyStorePassword().toCharArray());
    +
    +      TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
    +      tmf.init(ks);
    +
    +      KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
    +      kmf.init(ks, ssl.getKeyStorePassword().toCharArray());
    +
    +      SSLContext sslContext = SSLContext.getInstance("SSLv3");
    +      sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
    +
    +      SSLOptions sslOptions = JdkSSLOptions.builder()
    +          .withSSLContext(sslContext)
    +          .build();
    +
    +      builder = builder.withSSL(sslOptions);
    +    }
    +
    +    Collection<InetSocketAddress> addresses = Lists.newArrayList();
    --- End diff --
    
    Guava API??? Jeez!!!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request #353: STREAMS-483: add support for SSL connec...

Posted by smarthi <gi...@git.apache.org>.
Github user smarthi commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/353#discussion_r98064145
  
    --- Diff: streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraClient.java ---
    @@ -0,0 +1,160 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.streams.cassandra;
    +
    +import com.datastax.driver.core.Cluster;
    +import com.datastax.driver.core.Host;
    +import com.datastax.driver.core.JdkSSLOptions;
    +import com.datastax.driver.core.Metadata;
    +import com.datastax.driver.core.SSLOptions;
    +import com.datastax.driver.core.Session;
    +import com.datastax.driver.core.SocketOptions;
    +import com.google.common.base.Preconditions;
    +import com.google.common.base.Strings;
    +import com.google.common.collect.Lists;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.FileInputStream;
    +import java.io.InputStream;
    +import java.net.InetSocketAddress;
    +import java.security.KeyStore;
    +import java.util.Collection;
    +import javax.net.ssl.KeyManagerFactory;
    +import javax.net.ssl.SSLContext;
    +import javax.net.ssl.TrustManagerFactory;
    +
    +import static com.datastax.driver.core.SocketOptions.DEFAULT_CONNECT_TIMEOUT_MILLIS;
    +import static com.datastax.driver.core.SocketOptions.DEFAULT_READ_TIMEOUT_MILLIS;
    +
    +public class CassandraClient {
    +
    +  private static final Logger LOGGER = LoggerFactory
    +      .getLogger(CassandraClient.class);
    +
    +  private Cluster cluster;
    +  private Session session;
    +
    +  public CassandraConfiguration config;
    +
    +  public CassandraClient(CassandraConfiguration config) throws Exception {
    +    this.config = config;
    +    org.apache.cassandra.config.Config.setClientMode(true);
    +  }
    +
    +  public void start() throws Exception {
    +
    +    Preconditions.checkNotNull(config);
    +
    +    LOGGER.info("CassandraClient.start {}", config);
    +
    +    Cluster.Builder builder = Cluster.builder()
    +        .withPort(config.getPort().intValue())
    +        .withoutJMXReporting()
    +        .withoutMetrics()
    +        .withSocketOptions(
    +            new SocketOptions()
    +                .setConnectTimeoutMillis(DEFAULT_CONNECT_TIMEOUT_MILLIS*10)
    +                .setReadTimeoutMillis(DEFAULT_READ_TIMEOUT_MILLIS*10)
    +        );
    +
    +    if( config.getSsl() != null && config.getSsl().getEnabled() == true) {
    +
    +      Ssl ssl = config.getSsl();
    +
    +      KeyStore ks = KeyStore.getInstance("JKS");
    +
    +      InputStream trustStore = new FileInputStream(ssl.getTrustStore());
    +      ks.load(trustStore, ssl.getTrustStorePassword().toCharArray());
    +      InputStream keyStore = new FileInputStream(ssl.getKeyStore());
    +      ks.load(keyStore, ssl.getKeyStorePassword().toCharArray());
    +
    +      TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
    +      tmf.init(ks);
    +
    +      KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
    +      kmf.init(ks, ssl.getKeyStorePassword().toCharArray());
    +
    +      SSLContext sslContext = SSLContext.getInstance("SSLv3");
    +      sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
    +
    +      SSLOptions sslOptions = JdkSSLOptions.builder()
    +          .withSSLContext(sslContext)
    +          .build();
    +
    +      builder = builder.withSSL(sslOptions);
    +    }
    +
    +    Collection<InetSocketAddress> addresses = Lists.newArrayList();
    +    for (String h : config.getHosts()) {
    +      LOGGER.info("Adding Host: {}", h);
    +      InetSocketAddress socketAddress = new InetSocketAddress(h, config.getPort().intValue());
    +      addresses.add(socketAddress);
    +    }
    +    builder.addContactPointsWithPorts(addresses);
    +
    +    if( !Strings.isNullOrEmpty(config.getUser()) &&
    +        !Strings.isNullOrEmpty(config.getPassword())) {
    +      builder.withCredentials(config.getUser(), config.getPassword());
    +    }
    +    cluster = builder.build();
    +
    +    Preconditions.checkNotNull(cluster);
    --- End diff --
    
    Guava Again??? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---