You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ni...@apache.org on 2022/06/08 12:37:59 UTC

[pulsar] branch master updated: [feat][pulsar-io] ElasticSearch Sink: option to disable SSL certificate validation (#14997)

This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b35c97d607d [feat][pulsar-io] ElasticSearch Sink: option to disable SSL certificate validation (#14997)
b35c97d607d is described below

commit b35c97d607dbb3f02dc4dbc0ad5b8be73a5cc4f6
Author: Nicolò Boschi <bo...@gmail.com>
AuthorDate: Wed Jun 8 14:37:50 2022 +0200

    [feat][pulsar-io] ElasticSearch Sink: option to disable SSL certificate validation (#14997)
    
    * [feat][pulsar-io] ElasticSearch Sink: option to disable SSL certificate validation
---
 .../io/elasticsearch/ElasticSearchSslConfig.java   | 11 +++++++-
 .../pulsar/io/elasticsearch/client/RestClient.java | 18 ++++++++++--
 .../elasticsearch/ElasticSearchClientSslTests.java | 33 ++++++++++++++++++++++
 site2/docs/io-elasticsearch-sink.md                |  1 +
 4 files changed, 59 insertions(+), 4 deletions(-)

diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSslConfig.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSslConfig.java
index 23c93eb3178..1c8fb64bc8d 100644
--- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSslConfig.java
+++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSslConfig.java
@@ -46,10 +46,19 @@ public class ElasticSearchSslConfig implements Serializable {
     @FieldDoc(
             required = false,
             defaultValue = "true",
-            help = "Whether or not to validate node hostnames when using SSL"
+            help = "Whether or not to validate node hostnames when using SSL. "
+                    + "Changing this value is high insecure and you should not use it in production environment."
     )
     private boolean hostnameVerification = true;
 
+    @FieldDoc(
+            required = false,
+            defaultValue = "false",
+            help = "Whether or not to disable the node certificate validation. "
+                    + "Changing this value is high insecure and you should not use it in production environment."
+    )
+    private boolean disableCertificateValidation;
+
     @FieldDoc(
             required = false,
             defaultValue = "",
diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/RestClient.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/RestClient.java
index a80964ffc68..61aa86213eb 100644
--- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/RestClient.java
+++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/RestClient.java
@@ -38,6 +38,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import javax.net.ssl.HostnameVerifier;
 import javax.net.ssl.SSLContext;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.http.Header;
 import org.apache.http.HttpHeaders;
@@ -49,6 +50,7 @@ import org.apache.http.config.Registry;
 import org.apache.http.config.RegistryBuilder;
 import org.apache.http.conn.ssl.NoopHostnameVerifier;
 import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.conn.ssl.TrustAllStrategy;
 import org.apache.http.impl.client.BasicCredentialsProvider;
 import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
 import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
@@ -67,6 +69,7 @@ import org.apache.pulsar.io.elasticsearch.ElasticSearchConnectionException;
 import org.apache.pulsar.io.elasticsearch.ElasticSearchSslConfig;
 import org.elasticsearch.client.RestClientBuilder;
 
+@Slf4j
 public abstract class RestClient implements Closeable {
 
     protected final ElasticSearchConfig config;
@@ -138,9 +141,14 @@ public abstract class RestClient implements Closeable {
                 PoolingNHttpClientConnectionManager connManager;
                 if (config.getSsl().isEnabled()) {
                     ElasticSearchSslConfig sslConfig = config.getSsl();
-                    HostnameVerifier hostnameVerifier = config.getSsl().isHostnameVerification()
-                            ? SSLConnectionSocketFactory.getDefaultHostnameVerifier()
-                            : new NoopHostnameVerifier();
+                    final boolean hostnameVerification = config.getSsl().isHostnameVerification();
+                    HostnameVerifier hostnameVerifier;
+                    if (hostnameVerification) {
+                        hostnameVerifier = SSLConnectionSocketFactory.getDefaultHostnameVerifier();
+                    } else {
+                        hostnameVerifier = NoopHostnameVerifier.INSTANCE;
+                        log.warn("Hostname verification is disabled.");
+                    }
                     String[] cipherSuites = null;
                     if (!Strings.isNullOrEmpty(sslConfig.getCipherSuites())) {
                         cipherSuites = sslConfig.getCipherSuites().split(",");
@@ -183,6 +191,10 @@ public abstract class RestClient implements Closeable {
                 sslContextBuilder.loadTrustMaterial(new File(sslConfig.getTruststorePath()),
                         sslConfig.getTruststorePassword().toCharArray());
             }
+            if (sslConfig.isDisableCertificateValidation()) {
+                sslContextBuilder.loadTrustMaterial(null, TrustAllStrategy.INSTANCE);
+                log.warn("Certificate validation is disabled, the identity of the target server will not be verified.");
+            }
             if (!Strings.isNullOrEmpty(sslConfig.getKeystorePath())
                     && !Strings.isNullOrEmpty(sslConfig.getKeystorePassword())) {
                 sslContextBuilder.loadKeyMaterial(new File(sslConfig.getKeystorePath()),
diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientSslTests.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientSslTests.java
index 24ec487a03d..7781b6724a3 100644
--- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientSslTests.java
+++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientSslTests.java
@@ -149,6 +149,39 @@ public abstract class ElasticSearchClientSslTests extends ElasticSearchTestBase
         }
     }
 
+    @Test
+    public void testSslDisableCertificateValidation() throws IOException {
+        try (ElasticsearchContainer container = createElasticsearchContainer()
+                .withFileSystemBind(sslResourceDir, configDir + "/ssl")
+                .withPassword("elastic")
+                .withEnv("xpack.license.self_generated.type", "trial")
+                .withEnv("xpack.security.enabled", "true")
+                .withEnv("xpack.security.http.ssl.enabled", "true")
+                .withEnv("xpack.security.http.ssl.client_authentication", "optional")
+                .withEnv("xpack.security.http.ssl.key", configDir + "/ssl/elasticsearch.key")
+                .withEnv("xpack.security.http.ssl.certificate", configDir + "/ssl/elasticsearch.crt")
+                .withEnv("xpack.security.http.ssl.certificate_authorities", configDir + "/ssl/cacert.crt")
+                .withEnv("xpack.security.transport.ssl.enabled", "true")
+                .withEnv("xpack.security.transport.ssl.verification_mode", "certificate")
+                .withEnv("xpack.security.transport.ssl.key", configDir + "/ssl/elasticsearch.key")
+                .withEnv("xpack.security.transport.ssl.certificate", configDir + "/ssl/elasticsearch.crt")
+                .withEnv("xpack.security.transport.ssl.certificate_authorities", configDir + "/ssl/cacert.crt")
+                .waitingFor(Wait.forLogMessage(".*(Security is enabled|Active license).*", 1)
+                        .withStartupTimeout(Duration.ofMinutes(2)))) {
+            container.start();
+
+            ElasticSearchConfig config = new ElasticSearchConfig()
+                    .setElasticSearchUrl("https://" + container.getHttpHostAddress())
+                    .setIndexName(INDEX)
+                    .setUsername("elastic")
+                    .setPassword("elastic")
+                    .setSsl(new ElasticSearchSslConfig()
+                            .setEnabled(true)
+                            .setDisableCertificateValidation(true));
+            testClientWithConfig(config);
+        }
+    }
+
     private void testClientWithConfig(ElasticSearchConfig config) throws IOException {
         try (ElasticSearchClient client = new ElasticSearchClient(config);) {
             testIndexExists(client);
diff --git a/site2/docs/io-elasticsearch-sink.md b/site2/docs/io-elasticsearch-sink.md
index 26ea81f6026..9cd6d0020e1 100644
--- a/site2/docs/io-elasticsearch-sink.md
+++ b/site2/docs/io-elasticsearch-sink.md
@@ -94,6 +94,7 @@ The configuration of the Elasticsearch sink connector has the following properti
 |------|----------|----------|---------|-------------|
 | `enabled` | Boolean| false | false | Enable SSL/TLS. |
 | `hostnameVerification` | Boolean| false | true | Whether or not to validate node hostnames when using SSL. |
+| `disableCertificateValidation` | Boolean| false | true | Whether or not to disable the node certificate validation. Changing this value is high insecure and you should not use it in production environment. |
 | `truststorePath` | String| false |" " (empty string)| The path to the truststore file. |
 | `truststorePassword` | String| false |" " (empty string)| Truststore password. |
 | `keystorePath` | String| false |" " (empty string)| The path to the keystore file. |