You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/08/09 13:34:38 UTC
[1/3] beam git commit: [BEAM-1274] Add SSL mutual authentication
support
Repository: beam
Updated Branches:
refs/heads/master db4b0939a -> 04f5bc6f8
[BEAM-1274] Add SSL mutual authentication support
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f48bb4be
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f48bb4be
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f48bb4be
Branch: refs/heads/master
Commit: f48bb4be1d3bb23f3cc978c4c25cf43842639296
Parents: aadbe36
Author: Jean-Baptiste Onofré <jb...@apache.org>
Authored: Mon Jul 24 17:53:15 2017 +0200
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Mon Aug 7 07:28:14 2017 +0200
----------------------------------------------------------------------
.../sdk/io/elasticsearch/ElasticsearchIO.java | 69 +++++++++++++++++++-
1 file changed, 67 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/f48bb4be/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
index 5046888..2cd3bcd 100644
--- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
+++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
@@ -25,10 +25,14 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
+
+import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.io.Serializable;
-import java.net.MalformedURLException;
import java.net.URL;
+import java.security.KeyStore;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -39,6 +43,8 @@ import java.util.ListIterator;
import java.util.Map;
import java.util.NoSuchElementException;
import javax.annotation.Nullable;
+import javax.net.ssl.SSLContext;
+
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -60,7 +66,9 @@ import org.apache.http.entity.ContentType;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.message.BasicHeader;
+import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
import org.apache.http.nio.entity.NStringEntity;
+import org.apache.http.ssl.SSLContexts;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
@@ -155,6 +163,12 @@ public class ElasticsearchIO {
@Nullable
abstract String getPassword();
+ @Nullable
+ abstract String getKeystorePath();
+
+ @Nullable
+ abstract String getKeystorePassword();
+
abstract String getIndex();
abstract String getType();
@@ -169,6 +183,10 @@ public class ElasticsearchIO {
abstract Builder setPassword(String password);
+ abstract Builder setKeystorePath(String keystorePath);
+
+ abstract Builder setKeystorePassword(String password);
+
abstract Builder setIndex(String index);
abstract Builder setType(String type);
@@ -239,6 +257,32 @@ public class ElasticsearchIO {
return builder().setPassword(password).build();
}
+ /**
+ * If Elasticsearch uses SSL with mutual authentication (via shield),
+ * provide the keystore containing the client key.
+ *
+ * @param keystorePath the location of the keystore containing the client key.
+ * @return the {@link ConnectionConfiguration} object with keystore path set.
+ */
+ public ConnectionConfiguration withKeystorePath(String keystorePath) {
+ checkArgument(keystorePath != null, "ConnectionConfiguration.create()"
+ + ".withKeystorePath(keystorePath) called with null keystorePath");
+ return builder().setKeystorePath(keystorePath).build();
+ }
+
+ /**
+ * If Elasticsearch uses SSL with mutual authentication (via shield),
+ * provide the password to open the client keystore.
+ *
+ * @param keystorePassword the password of the client keystore.
+ * @return the {@link ConnectionConfiguration} object with keystore password set.
+ */
+ public ConnectionConfiguration withKeystorePassword(String keystorePassword) {
+ checkArgument(keystorePassword != null, "ConnectionConfiguration.create()"
+ + ".withKeystorePassword(keystorePassword) called with null keystorePassword");
+ return builder().setKeystorePassword(keystorePassword).build();
+ }
+
private void populateDisplayData(DisplayData.Builder builder) {
builder.add(DisplayData.item("address", getAddresses().toString()));
builder.add(DisplayData.item("index", getIndex()));
@@ -246,7 +290,7 @@ public class ElasticsearchIO {
builder.addIfNotNull(DisplayData.item("username", getUsername()));
}
- RestClient createClient() throws MalformedURLException {
+ RestClient createClient() throws IOException {
HttpHost[] hosts = new HttpHost[getAddresses().size()];
int i = 0;
for (String address : getAddresses()) {
@@ -267,6 +311,27 @@ public class ElasticsearchIO {
}
});
}
+ if (getKeystorePath() != null) {
+ try {
+ KeyStore keyStore = KeyStore.getInstance("jks");
+ try (InputStream is = new FileInputStream(new File(getKeystorePath()))) {
+ keyStore.load(is, getKeystorePassword().toCharArray());
+ }
+ final SSLContext sslContext = SSLContexts.custom()
+ .loadTrustMaterial(keyStore, null).build();
+ final SSLIOSessionStrategy sessionStrategy = new SSLIOSessionStrategy(sslContext);
+ restClientBuilder.setHttpClientConfigCallback(
+ new RestClientBuilder.HttpClientConfigCallback() {
+ @Override
+ public HttpAsyncClientBuilder customizeHttpClient(
+ HttpAsyncClientBuilder httpClientBuilder) {
+ return httpClientBuilder.setSSLContext(sslContext).setSSLStrategy(sessionStrategy);
+ }
+ });
+ } catch (Exception e) {
+ throw new IOException("Can't load the client certificate from the keystore", e);
+ }
+ }
return restClientBuilder.build();
}
}
[2/3] beam git commit: [BEAM-1274] Add SSL/TLS in the comments, add the self signed policy support for the SSL context
Posted by jb...@apache.org.
[BEAM-1274] Add SSL/TLS in the comments, add the self signed policy support for the SSL context
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/02f11d3d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/02f11d3d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/02f11d3d
Branch: refs/heads/master
Commit: 02f11d3db98f33475ff1152d33e36161d59fd400
Parents: f48bb4b
Author: Jean-Baptiste Onofré <jb...@apache.org>
Authored: Mon Aug 7 07:49:36 2017 +0200
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Mon Aug 7 07:49:36 2017 +0200
----------------------------------------------------------------------
.../beam/sdk/io/elasticsearch/ElasticsearchIO.java | 14 +++++++++-----
1 file changed, 9 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/02f11d3d/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
index 2cd3bcd..e6a6a9f 100644
--- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
+++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
@@ -62,6 +62,7 @@ import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
+import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
@@ -258,7 +259,7 @@ public class ElasticsearchIO {
}
/**
- * If Elasticsearch uses SSL with mutual authentication (via shield),
+ * If Elasticsearch uses SSL/TLS with mutual authentication (via shield),
* provide the keystore containing the client key.
*
* @param keystorePath the location of the keystore containing the client key.
@@ -267,15 +268,17 @@ public class ElasticsearchIO {
public ConnectionConfiguration withKeystorePath(String keystorePath) {
checkArgument(keystorePath != null, "ConnectionConfiguration.create()"
+ ".withKeystorePath(keystorePath) called with null keystorePath");
+ checkArgument(!keystorePath.isEmpty(), "ConnectionConfiguration.create()"
+ + ".withKeystorePath(keystorePath) called with empty keystorePath");
return builder().setKeystorePath(keystorePath).build();
}
/**
- * If Elasticsearch uses SSL with mutual authentication (via shield),
+ * If Elasticsearch uses SSL/TLS with mutual authentication (via shield),
* provide the password to open the client keystore.
*
* @param keystorePassword the password of the client keystore.
- * @return the {@link ConnectionConfiguration} object with keystore password set.
+ * @return the {@link ConnectionConfiguration} object with keystore passwo:rd set.
*/
public ConnectionConfiguration withKeystorePassword(String keystorePassword) {
checkArgument(keystorePassword != null, "ConnectionConfiguration.create()"
@@ -288,6 +291,7 @@ public class ElasticsearchIO {
builder.add(DisplayData.item("index", getIndex()));
builder.add(DisplayData.item("type", getType()));
builder.addIfNotNull(DisplayData.item("username", getUsername()));
+ builder.addIfNotNull(DisplayData.item("keystore.path", getKeystorePath()));
}
RestClient createClient() throws IOException {
@@ -311,14 +315,14 @@ public class ElasticsearchIO {
}
});
}
- if (getKeystorePath() != null) {
+ if (getKeystorePath() != null && !getKeystorePath().isEmpty()) {
try {
KeyStore keyStore = KeyStore.getInstance("jks");
try (InputStream is = new FileInputStream(new File(getKeystorePath()))) {
keyStore.load(is, getKeystorePassword().toCharArray());
}
final SSLContext sslContext = SSLContexts.custom()
- .loadTrustMaterial(keyStore, null).build();
+ .loadTrustMaterial(keyStore, new TrustSelfSignedStrategy()).build();
final SSLIOSessionStrategy sessionStrategy = new SSLIOSessionStrategy(sslContext);
restClientBuilder.setHttpClientConfigCallback(
new RestClientBuilder.HttpClientConfigCallback() {
[3/3] beam git commit: [BEAM-1274] This closes #3626
Posted by jb...@apache.org.
[BEAM-1274] This closes #3626
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/04f5bc6f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/04f5bc6f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/04f5bc6f
Branch: refs/heads/master
Commit: 04f5bc6f805868f9ed1494128b90d33bb2fe0e66
Parents: db4b093 02f11d3
Author: Jean-Baptiste Onofré <jb...@apache.org>
Authored: Wed Aug 9 15:34:32 2017 +0200
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Wed Aug 9 15:34:32 2017 +0200
----------------------------------------------------------------------
.../sdk/io/elasticsearch/ElasticsearchIO.java | 73 +++++++++++++++++++-
1 file changed, 71 insertions(+), 2 deletions(-)
----------------------------------------------------------------------