You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/08/09 13:34:38 UTC

[1/3] beam git commit: [BEAM-1274] Add SSL mutual authentication support

Repository: beam
Updated Branches:
  refs/heads/master db4b0939a -> 04f5bc6f8


[BEAM-1274] Add SSL mutual authentication support


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f48bb4be
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f48bb4be
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f48bb4be

Branch: refs/heads/master
Commit: f48bb4be1d3bb23f3cc978c4c25cf43842639296
Parents: aadbe36
Author: Jean-Baptiste Onofré <jb...@apache.org>
Authored: Mon Jul 24 17:53:15 2017 +0200
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Mon Aug 7 07:28:14 2017 +0200

----------------------------------------------------------------------
 .../sdk/io/elasticsearch/ElasticsearchIO.java   | 69 +++++++++++++++++++-
 1 file changed, 67 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f48bb4be/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
index 5046888..2cd3bcd 100644
--- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
+++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
@@ -25,10 +25,14 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
+
+import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.Serializable;
-import java.net.MalformedURLException;
 import java.net.URL;
+import java.security.KeyStore;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -39,6 +43,8 @@ import java.util.ListIterator;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import javax.annotation.Nullable;
+import javax.net.ssl.SSLContext;
+
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -60,7 +66,9 @@ import org.apache.http.entity.ContentType;
 import org.apache.http.impl.client.BasicCredentialsProvider;
 import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
 import org.apache.http.message.BasicHeader;
+import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
 import org.apache.http.nio.entity.NStringEntity;
+import org.apache.http.ssl.SSLContexts;
 import org.elasticsearch.client.Response;
 import org.elasticsearch.client.RestClient;
 import org.elasticsearch.client.RestClientBuilder;
@@ -155,6 +163,12 @@ public class ElasticsearchIO {
     @Nullable
     abstract String getPassword();
 
+    @Nullable
+    abstract String getKeystorePath();
+
+    @Nullable
+    abstract String getKeystorePassword();
+
     abstract String getIndex();
 
     abstract String getType();
@@ -169,6 +183,10 @@ public class ElasticsearchIO {
 
       abstract Builder setPassword(String password);
 
+      abstract Builder setKeystorePath(String keystorePath);
+
+      abstract Builder setKeystorePassword(String password);
+
       abstract Builder setIndex(String index);
 
       abstract Builder setType(String type);
@@ -239,6 +257,32 @@ public class ElasticsearchIO {
       return builder().setPassword(password).build();
     }
 
+    /**
+     * If Elasticsearch uses SSL with mutual authentication (via shield),
+     * provide the keystore containing the client key.
+     *
+     * @param keystorePath the location of the keystore containing the client key.
+     * @return the {@link ConnectionConfiguration} object with keystore path set.
+     */
+    public ConnectionConfiguration withKeystorePath(String keystorePath) {
+      checkArgument(keystorePath != null, "ConnectionConfiguration.create()"
+          + ".withKeystorePath(keystorePath) called with null keystorePath");
+      return builder().setKeystorePath(keystorePath).build();
+    }
+
+    /**
+     * If Elasticsearch uses SSL with mutual authentication (via shield),
+     * provide the password to open the client keystore.
+     *
+     * @param keystorePassword the password of the client keystore.
+     * @return the {@link ConnectionConfiguration} object with keystore password set.
+     */
+    public ConnectionConfiguration withKeystorePassword(String keystorePassword) {
+        checkArgument(keystorePassword != null, "ConnectionConfiguration.create()"
+            + ".withKeystorePassword(keystorePassword) called with null keystorePassword");
+        return builder().setKeystorePassword(keystorePassword).build();
+    }
+
     private void populateDisplayData(DisplayData.Builder builder) {
       builder.add(DisplayData.item("address", getAddresses().toString()));
       builder.add(DisplayData.item("index", getIndex()));
@@ -246,7 +290,7 @@ public class ElasticsearchIO {
       builder.addIfNotNull(DisplayData.item("username", getUsername()));
     }
 
-    RestClient createClient() throws MalformedURLException {
+    RestClient createClient() throws IOException {
       HttpHost[] hosts = new HttpHost[getAddresses().size()];
       int i = 0;
       for (String address : getAddresses()) {
@@ -267,6 +311,27 @@ public class ElasticsearchIO {
               }
             });
       }
+      if (getKeystorePath() != null) {
+        try {
+          KeyStore keyStore = KeyStore.getInstance("jks");
+          try (InputStream is = new FileInputStream(new File(getKeystorePath()))) {
+            keyStore.load(is, getKeystorePassword().toCharArray());
+          }
+          final SSLContext sslContext = SSLContexts.custom()
+              .loadTrustMaterial(keyStore, null).build();
+          final SSLIOSessionStrategy sessionStrategy = new SSLIOSessionStrategy(sslContext);
+          restClientBuilder.setHttpClientConfigCallback(
+              new RestClientBuilder.HttpClientConfigCallback() {
+            @Override
+            public HttpAsyncClientBuilder customizeHttpClient(
+                HttpAsyncClientBuilder httpClientBuilder) {
+              return httpClientBuilder.setSSLContext(sslContext).setSSLStrategy(sessionStrategy);
+            }
+          });
+        } catch (Exception e) {
+          throw new IOException("Can't load the client certificate from the keystore", e);
+        }
+      }
       return restClientBuilder.build();
     }
   }


[2/3] beam git commit: [BEAM-1274] Add SSL/TLS in the comments, add the self signed policy support for the SSL context

Posted by jb...@apache.org.
[BEAM-1274] Add SSL/TLS in the comments, add the self signed policy support for the SSL context


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/02f11d3d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/02f11d3d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/02f11d3d

Branch: refs/heads/master
Commit: 02f11d3db98f33475ff1152d33e36161d59fd400
Parents: f48bb4b
Author: Jean-Baptiste Onofré <jb...@apache.org>
Authored: Mon Aug 7 07:49:36 2017 +0200
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Mon Aug 7 07:49:36 2017 +0200

----------------------------------------------------------------------
 .../beam/sdk/io/elasticsearch/ElasticsearchIO.java    | 14 +++++++++-----
 1 file changed, 9 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/02f11d3d/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
index 2cd3bcd..e6a6a9f 100644
--- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
+++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
@@ -62,6 +62,7 @@ import org.apache.http.HttpHost;
 import org.apache.http.auth.AuthScope;
 import org.apache.http.auth.UsernamePasswordCredentials;
 import org.apache.http.client.CredentialsProvider;
+import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
 import org.apache.http.entity.ContentType;
 import org.apache.http.impl.client.BasicCredentialsProvider;
 import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
@@ -258,7 +259,7 @@ public class ElasticsearchIO {
     }
 
     /**
-     * If Elasticsearch uses SSL with mutual authentication (via shield),
+     * If Elasticsearch uses SSL/TLS with mutual authentication (via shield),
      * provide the keystore containing the client key.
      *
      * @param keystorePath the location of the keystore containing the client key.
@@ -267,15 +268,17 @@ public class ElasticsearchIO {
     public ConnectionConfiguration withKeystorePath(String keystorePath) {
       checkArgument(keystorePath != null, "ConnectionConfiguration.create()"
           + ".withKeystorePath(keystorePath) called with null keystorePath");
+      checkArgument(!keystorePath.isEmpty(), "ConnectionConfiguration.create()"
+          + ".withKeystorePath(keystorePath) called with empty keystorePath");
       return builder().setKeystorePath(keystorePath).build();
     }
 
     /**
-     * If Elasticsearch uses SSL with mutual authentication (via shield),
+     * If Elasticsearch uses SSL/TLS with mutual authentication (via shield),
      * provide the password to open the client keystore.
      *
      * @param keystorePassword the password of the client keystore.
-     * @return the {@link ConnectionConfiguration} object with keystore password set.
+     * @return the {@link ConnectionConfiguration} object with keystore passwo:rd set.
      */
     public ConnectionConfiguration withKeystorePassword(String keystorePassword) {
         checkArgument(keystorePassword != null, "ConnectionConfiguration.create()"
@@ -288,6 +291,7 @@ public class ElasticsearchIO {
       builder.add(DisplayData.item("index", getIndex()));
       builder.add(DisplayData.item("type", getType()));
       builder.addIfNotNull(DisplayData.item("username", getUsername()));
+      builder.addIfNotNull(DisplayData.item("keystore.path", getKeystorePath()));
     }
 
     RestClient createClient() throws IOException {
@@ -311,14 +315,14 @@ public class ElasticsearchIO {
               }
             });
       }
-      if (getKeystorePath() != null) {
+      if (getKeystorePath() != null && !getKeystorePath().isEmpty()) {
         try {
           KeyStore keyStore = KeyStore.getInstance("jks");
           try (InputStream is = new FileInputStream(new File(getKeystorePath()))) {
             keyStore.load(is, getKeystorePassword().toCharArray());
           }
           final SSLContext sslContext = SSLContexts.custom()
-              .loadTrustMaterial(keyStore, null).build();
+              .loadTrustMaterial(keyStore, new TrustSelfSignedStrategy()).build();
           final SSLIOSessionStrategy sessionStrategy = new SSLIOSessionStrategy(sslContext);
           restClientBuilder.setHttpClientConfigCallback(
               new RestClientBuilder.HttpClientConfigCallback() {


[3/3] beam git commit: [BEAM-1274] This closes #3626

Posted by jb...@apache.org.
[BEAM-1274] This closes #3626


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/04f5bc6f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/04f5bc6f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/04f5bc6f

Branch: refs/heads/master
Commit: 04f5bc6f805868f9ed1494128b90d33bb2fe0e66
Parents: db4b093 02f11d3
Author: Jean-Baptiste Onofré <jb...@apache.org>
Authored: Wed Aug 9 15:34:32 2017 +0200
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Wed Aug 9 15:34:32 2017 +0200

----------------------------------------------------------------------
 .../sdk/io/elasticsearch/ElasticsearchIO.java   | 73 +++++++++++++++++++-
 1 file changed, 71 insertions(+), 2 deletions(-)
----------------------------------------------------------------------