You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2023/01/25 07:55:54 UTC

[hudi] branch master updated: [HUDI-4991] Allow kafka-like configs to set truststore and keystore for the SchemaProvider

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f49b5d34342 [HUDI-4991] Allow kafka-like configs to set truststore and keystore for the SchemaProvider
f49b5d34342 is described below

commit f49b5d3434285f6670f1d1c8d6204d5e975bd73f
Author: Jon Vexler <jb...@gmail.com>
AuthorDate: Wed Jan 25 02:55:47 2023 -0500

    [HUDI-4991] Allow kafka-like configs to set truststore and keystore for the SchemaProvider
    
    Update the SchemaRegistryProvider to take in kafka-like configs for truststore and keystore.
    
    Co-authored-by: Jonathan Vexler <=>
---
 .../utilities/schema/SchemaRegistryProvider.java   | 64 ++++++++++++++++++++--
 1 file changed, 59 insertions(+), 5 deletions(-)

diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
index 8303fc2260b..9ade80ecce4 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
@@ -26,13 +26,25 @@ import org.apache.hudi.exception.HoodieIOException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.avro.Schema;
+import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.apache.http.ssl.SSLContexts;
 import org.apache.spark.api.java.JavaSparkContext;
 
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.SSLSocketFactory;
+
+import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.HttpURLConnection;
 import java.net.URL;
 import java.nio.charset.StandardCharsets;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
 import java.util.Base64;
 import java.util.Collections;
 import java.util.regex.Matcher;
@@ -54,6 +66,11 @@ public class SchemaRegistryProvider extends SchemaProvider {
     public static final String TARGET_SCHEMA_REGISTRY_URL_PROP =
         "hoodie.deltastreamer.schemaprovider.registry.targetUrl";
     public static final String SCHEMA_CONVERTER_PROP = "hoodie.deltastreamer.schemaprovider.registry.schemaconverter";
+    public static final String SSL_KEYSTORE_LOCATION_PROP = "schema.registry.ssl.keystore.location";
+    public static final String SSL_TRUSTSTORE_LOCATION_PROP = "schema.registry.ssl.truststore.location";
+    public static final String SSL_KEYSTORE_PASSWORD_PROP = "schema.registry.ssl.keystore.password";
+    public static final String SSL_TRUSTSTORE_PASSWORD_PROP = "schema.registry.ssl.truststore.password";
+    public static final String SSL_KEY_PASSWORD_PROP = "schema.registry.ssl.key.password";
   }
 
   @FunctionalInterface
@@ -86,24 +103,34 @@ public class SchemaRegistryProvider extends SchemaProvider {
    * @throws IOException
    */
   public String fetchSchemaFromRegistry(String registryUrl) throws IOException {
-    URL registry;
     HttpURLConnection connection;
     Matcher matcher = Pattern.compile("://(.*?)@").matcher(registryUrl);
     if (matcher.find()) {
       String creds = matcher.group(1);
       String urlWithoutCreds = registryUrl.replace(creds + "@", "");
-      registry = new URL(urlWithoutCreds);
-      connection = (HttpURLConnection) registry.openConnection();
+      connection = getConnection(urlWithoutCreds);
       setAuthorizationHeader(matcher.group(1), connection);
     } else {
-      registry = new URL(registryUrl);
-      connection = (HttpURLConnection) registry.openConnection();
+      connection = getConnection(registryUrl);
     }
     ObjectMapper mapper = new ObjectMapper();
     JsonNode node = mapper.readTree(getStream(connection));
     return node.get("schema").asText();
   }
 
+  private SSLSocketFactory sslSocketFactory;
+
+  protected HttpURLConnection getConnection(String url) throws IOException {
+    URL registry = new URL(url);
+    if (sslSocketFactory != null) {
+      // we cannot cast to HttpsURLConnection if url is http so only cast when sslSocketFactory is set
+      HttpsURLConnection connection = (HttpsURLConnection) registry.openConnection();
+      connection.setSSLSocketFactory(sslSocketFactory);
+      return connection;
+    }
+    return (HttpURLConnection) registry.openConnection();
+  }
+
   protected void setAuthorizationHeader(String creds, HttpURLConnection connection) {
     String encodedAuth = Base64.getEncoder().encodeToString(creds.getBytes(StandardCharsets.UTF_8));
     connection.setRequestProperty("Authorization", "Basic " + encodedAuth);
@@ -116,6 +143,33 @@ public class SchemaRegistryProvider extends SchemaProvider {
   public SchemaRegistryProvider(TypedProperties props, JavaSparkContext jssc) {
     super(props, jssc);
     DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.SRC_SCHEMA_REGISTRY_URL_PROP));
+    if (config.containsKey(Config.SSL_KEYSTORE_LOCATION_PROP)
+        || config.containsKey(Config.SSL_TRUSTSTORE_LOCATION_PROP)) {
+      setUpSSLStores();
+    }
+  }
+
+  private void setUpSSLStores() {
+    SSLContextBuilder sslContextBuilder = SSLContexts.custom();
+    try {
+      if (config.containsKey(Config.SSL_TRUSTSTORE_LOCATION_PROP)) {
+        sslContextBuilder.loadTrustMaterial(
+            new File(config.getString(Config.SSL_TRUSTSTORE_LOCATION_PROP)),
+            config.getString(Config.SSL_TRUSTSTORE_PASSWORD_PROP).toCharArray(),
+            new TrustSelfSignedStrategy());
+      }
+      if (config.containsKey(Config.SSL_KEYSTORE_LOCATION_PROP)) {
+        sslContextBuilder.loadKeyMaterial(
+            new File(config.getString(Config.SSL_KEYSTORE_LOCATION_PROP)),
+            config.getString(Config.SSL_KEYSTORE_PASSWORD_PROP).toCharArray(),
+            config.getString(Config.SSL_KEY_PASSWORD_PROP).toCharArray()
+        );
+      }
+      sslSocketFactory = sslContextBuilder.build().getSocketFactory();
+    } catch (UnrecoverableKeyException | IOException | KeyStoreException | NoSuchAlgorithmException | CertificateException | KeyManagementException e) {
+      throw new RuntimeException(e);
+    }
+
   }
 
   @Override