You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2023/01/25 07:55:54 UTC
[hudi] branch master updated: [HUDI-4991] Allow kafka-like configs to set truststore and keystore for the SchemaProvider
This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f49b5d34342 [HUDI-4991] Allow kafka-like configs to set truststore and keystore for the SchemaProvider
f49b5d34342 is described below
commit f49b5d3434285f6670f1d1c8d6204d5e975bd73f
Author: Jon Vexler <jb...@gmail.com>
AuthorDate: Wed Jan 25 02:55:47 2023 -0500
[HUDI-4991] Allow kafka-like configs to set truststore and keystore for the SchemaProvider
Update the SchemaRegistryProvider to take in kafka-like configs for truststore and keystore.
Co-authored-by: Jonathan Vexler <=>
---
.../utilities/schema/SchemaRegistryProvider.java | 64 ++++++++++++++++++++--
1 file changed, 59 insertions(+), 5 deletions(-)
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
index 8303fc2260b..9ade80ecce4 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
@@ -26,13 +26,25 @@ import org.apache.hudi.exception.HoodieIOException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.avro.Schema;
+import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.apache.http.ssl.SSLContexts;
import org.apache.spark.api.java.JavaSparkContext;
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.SSLSocketFactory;
+
+import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
import java.util.Base64;
import java.util.Collections;
import java.util.regex.Matcher;
@@ -54,6 +66,11 @@ public class SchemaRegistryProvider extends SchemaProvider {
public static final String TARGET_SCHEMA_REGISTRY_URL_PROP =
"hoodie.deltastreamer.schemaprovider.registry.targetUrl";
public static final String SCHEMA_CONVERTER_PROP = "hoodie.deltastreamer.schemaprovider.registry.schemaconverter";
+ public static final String SSL_KEYSTORE_LOCATION_PROP = "schema.registry.ssl.keystore.location";
+ public static final String SSL_TRUSTSTORE_LOCATION_PROP = "schema.registry.ssl.truststore.location";
+ public static final String SSL_KEYSTORE_PASSWORD_PROP = "schema.registry.ssl.keystore.password";
+ public static final String SSL_TRUSTSTORE_PASSWORD_PROP = "schema.registry.ssl.truststore.password";
+ public static final String SSL_KEY_PASSWORD_PROP = "schema.registry.ssl.key.password";
}
@FunctionalInterface
@@ -86,24 +103,34 @@ public class SchemaRegistryProvider extends SchemaProvider {
* @throws IOException
*/
public String fetchSchemaFromRegistry(String registryUrl) throws IOException {
- URL registry;
HttpURLConnection connection;
Matcher matcher = Pattern.compile("://(.*?)@").matcher(registryUrl);
if (matcher.find()) {
String creds = matcher.group(1);
String urlWithoutCreds = registryUrl.replace(creds + "@", "");
- registry = new URL(urlWithoutCreds);
- connection = (HttpURLConnection) registry.openConnection();
+ connection = getConnection(urlWithoutCreds);
setAuthorizationHeader(matcher.group(1), connection);
} else {
- registry = new URL(registryUrl);
- connection = (HttpURLConnection) registry.openConnection();
+ connection = getConnection(registryUrl);
}
ObjectMapper mapper = new ObjectMapper();
JsonNode node = mapper.readTree(getStream(connection));
return node.get("schema").asText();
}
+ private SSLSocketFactory sslSocketFactory;
+
+ protected HttpURLConnection getConnection(String url) throws IOException {
+ URL registry = new URL(url);
+ if (sslSocketFactory != null) {
+ // we cannot cast to HttpsURLConnection if url is http so only cast when sslSocketFactory is set
+ HttpsURLConnection connection = (HttpsURLConnection) registry.openConnection();
+ connection.setSSLSocketFactory(sslSocketFactory);
+ return connection;
+ }
+ return (HttpURLConnection) registry.openConnection();
+ }
+
protected void setAuthorizationHeader(String creds, HttpURLConnection connection) {
String encodedAuth = Base64.getEncoder().encodeToString(creds.getBytes(StandardCharsets.UTF_8));
connection.setRequestProperty("Authorization", "Basic " + encodedAuth);
@@ -116,6 +143,33 @@ public class SchemaRegistryProvider extends SchemaProvider {
public SchemaRegistryProvider(TypedProperties props, JavaSparkContext jssc) {
super(props, jssc);
DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.SRC_SCHEMA_REGISTRY_URL_PROP));
+ if (config.containsKey(Config.SSL_KEYSTORE_LOCATION_PROP)
+ || config.containsKey(Config.SSL_TRUSTSTORE_LOCATION_PROP)) {
+ setUpSSLStores();
+ }
+ }
+
+ private void setUpSSLStores() {
+ SSLContextBuilder sslContextBuilder = SSLContexts.custom();
+ try {
+ if (config.containsKey(Config.SSL_TRUSTSTORE_LOCATION_PROP)) {
+ sslContextBuilder.loadTrustMaterial(
+ new File(config.getString(Config.SSL_TRUSTSTORE_LOCATION_PROP)),
+ config.getString(Config.SSL_TRUSTSTORE_PASSWORD_PROP).toCharArray(),
+ new TrustSelfSignedStrategy());
+ }
+ if (config.containsKey(Config.SSL_KEYSTORE_LOCATION_PROP)) {
+ sslContextBuilder.loadKeyMaterial(
+ new File(config.getString(Config.SSL_KEYSTORE_LOCATION_PROP)),
+ config.getString(Config.SSL_KEYSTORE_PASSWORD_PROP).toCharArray(),
+ config.getString(Config.SSL_KEY_PASSWORD_PROP).toCharArray()
+ );
+ }
+ sslSocketFactory = sslContextBuilder.build().getSocketFactory();
+ } catch (UnrecoverableKeyException | IOException | KeyStoreException | NoSuchAlgorithmException | CertificateException | KeyManagementException e) {
+ throw new RuntimeException(e);
+ }
+
}
@Override