You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2017/01/13 13:43:20 UTC
camel git commit: CAMEL-10705 - Allow to use an SSLContextParameters
object for Kafka
Repository: camel
Updated Branches:
refs/heads/master 9584f3851 -> 39742f911
CAMEL-10705 - Allow to use an SSLContextParameters object for Kafka
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/39742f91
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/39742f91
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/39742f91
Branch: refs/heads/master
Commit: 39742f911c7cf7aeec71f880712ccd808e51ccbe
Parents: 9584f38
Author: Antoine DESSAIGNE <an...@gmail.com>
Authored: Fri Jan 13 14:35:42 2017 +0100
Committer: Antoine DESSAIGNE <an...@gmail.com>
Committed: Fri Jan 13 14:35:42 2017 +0100
----------------------------------------------------------------------
.../src/main/docs/kafka-component.adoc | 50 ++++++++++++-
.../component/kafka/KafkaConfiguration.java | 78 ++++++++++++++++++++
2 files changed, 127 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/39742f91/components/camel-kafka/src/main/docs/kafka-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc
index d100a32..8eacb10 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -49,7 +49,7 @@ The Kafka component supports 1 options which are listed below.
// endpoint options: START
-The Kafka component supports 78 endpoint options which are listed below:
+The Kafka component supports 79 endpoint options which are listed below:
{% raw %}
[width="100%",cols="2,1,1m,1m,5",options="header"]
@@ -120,6 +120,7 @@ The Kafka component supports 78 endpoint options which are listed below:
| saslMechanism | security | GSSAPI | String | The Simple Authentication and Security Layer (SASL) Mechanism used. For the valid values see http://www.iana.org/assignments/sasl-mechanisms/sasl-mechanisms.xhtml
| securityProtocol | security | PLAINTEXT | String | Protocol used to communicate with brokers. Currently only PLAINTEXT and SSL are supported.
| sslCipherSuites | security | | String | A list of cipher suites. This is a named combination of authentication encryption MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol.By default all the available cipher suites are supported.
+| sslContextParameters | security | | SSLContextParameters | SSL configuration using a Camel SSLContextParameters object. If configured it's applied before the other SSL endpoint parameters.
| sslEnabledProtocols | security | TLSv1.2,TLSv1.1,TLSv1 | String | The list of protocols enabled for SSL connections. TLSv1.2 TLSv1.1 and TLSv1 are enabled by default.
| sslEndpointAlgorithm | security | | String | The endpoint identification algorithm to validate server hostname using server certificate.
| sslKeymanagerAlgorithm | security | SunX509 | String | The algorithm used by key manager factory for SSL connections. Default value is the key manager factory algorithm configured for the Java Virtual Machine.
@@ -229,6 +230,53 @@ from("direct:start")
.to("kafka:localhost:9092?topic=test");
----------------------------------------------------------------------------
+
+#### SSL configuration
+
+You have 2 different ways to configure the SSL communication on the Kafka` component.
+
+The first way is through the many SSL endpoint parameters
+[source,java]
+-------------------------------------------------------------
+from("kafka:localhost:{{kafkaPort}}?topic=" + TOPIC +
+ "&groupId=A" +
+ "&sslKeystoreLocation=/path/to/keystore.jks" +
+ "&sslKeystorePassword=changeit" +
+ "&sslKeyPassword=changeit")
+ .to("mock:result");
+-------------------------------------------------------------
+
+The second way is to use the `sslContextParameters` endpoint parameter.
+[source,java]
+--------------------------------------------------------------------------------------------------
+// Configure the SSLContextParameters object
+KeyStoreParameters ksp = new KeyStoreParameters();
+ksp.setResource("/path/to/keystore.jks");
+ksp.setPassword("changeit");
+KeyManagersParameters kmp = new KeyManagersParameters();
+kmp.setKeyStore(ksp);
+kmp.setKeyPassword("changeit");
+SSLContextParameters scp = new SSLContextParameters();
+scp.setKeyManagers(kmp);
+
+// Bind this SSLContextParameters into the Camel registry
+JndiRegistry registry = new JndiRegistry();
+registry.bind("ssl", scp);
+
+// Configure the camel context
+DefaultCamelContext camelContext = new DefaultCamelContext(registry);
+camelContext.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("kafka:localhost:{{kafkaPort}}?topic=" + TOPIC + //
+ "&groupId=A" + //
+ "&sslContextParameters=#ssl") // Reference the SSL configuration
+ .to("mock:result");
+ }
+});
+--------------------------------------------------------------------------------------------------
+
+
### Endpoints
Camel supports the link:message-endpoint.html[Message Endpoint] pattern
http://git-wip-us.apache.org/repos/asf/camel/blob/39742f91/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 214fd2f..a10ace4 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -20,6 +20,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
import org.apache.camel.Exchange;
import org.apache.camel.spi.Metadata;
@@ -27,6 +28,12 @@ import org.apache.camel.spi.StateRepository;
import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriParams;
import org.apache.camel.spi.UriPath;
+import org.apache.camel.util.jsse.CipherSuitesParameters;
+import org.apache.camel.util.jsse.KeyManagersParameters;
+import org.apache.camel.util.jsse.KeyStoreParameters;
+import org.apache.camel.util.jsse.SSLContextParameters;
+import org.apache.camel.util.jsse.SecureSocketProtocolsParameters;
+import org.apache.camel.util.jsse.TrustManagersParameters;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
@@ -180,6 +187,10 @@ public class KafkaConfiguration {
//reconnect.backoff.ms
@UriParam(label = "producer", defaultValue = "50")
private Integer reconnectBackoffMs = 50;
+
+ // SSL
+ @UriParam(label = "common,security")
+ private SSLContextParameters sslContextParameters;
// SSL
// ssl.key.password
@UriParam(label = "producer,security", secret = true)
@@ -264,6 +275,7 @@ public class KafkaConfiguration {
addPropertyIfNotNull(props, ProducerConfig.COMPRESSION_TYPE_CONFIG, getCompressionCodec());
addPropertyIfNotNull(props, ProducerConfig.RETRIES_CONFIG, getRetries());
// SSL
+ applySslConfiguration(props, getSslContextParameters());
addPropertyIfNotNull(props, SslConfigs.SSL_KEY_PASSWORD_CONFIG, getSslKeyPassword());
addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, getSslKeystoreLocation());
addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, getSslKeystorePassword());
@@ -322,6 +334,7 @@ public class KafkaConfiguration {
addPropertyIfNotNull(props, ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, getSessionTimeoutMs());
addPropertyIfNotNull(props, ConsumerConfig.MAX_POLL_RECORDS_CONFIG, getMaxPollRecords());
// SSL
+ applySslConfiguration(props, getSslContextParameters());
addPropertyIfNotNull(props, SslConfigs.SSL_KEY_PASSWORD_CONFIG, getSslKeyPassword());
addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, getSslKeystoreLocation());
addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, getSslKeystorePassword());
@@ -368,6 +381,54 @@ public class KafkaConfiguration {
return props;
}
+ /**
+ * Uses the standard camel {@link SSLContextParameters} object to fill the Kafka SSL properties
+ *
+ * @param props Kafka properties
+ * @param sslContextParameters SSL configuration
+ */
+ private void applySslConfiguration(Properties props, SSLContextParameters sslContextParameters) {
+ if (sslContextParameters != null) {
+ addPropertyIfNotNull(props, SslConfigs.SSL_PROTOCOL_CONFIG, sslContextParameters.getSecureSocketProtocol());
+ addPropertyIfNotNull(props, SslConfigs.SSL_PROVIDER_CONFIG, sslContextParameters.getProvider());
+
+ CipherSuitesParameters cipherSuites = sslContextParameters.getCipherSuites();
+ if (cipherSuites != null) {
+ addCommaSeparatedList(props, SslConfigs.SSL_CIPHER_SUITES_CONFIG, cipherSuites.getCipherSuite());
+ }
+
+ SecureSocketProtocolsParameters secureSocketProtocols = sslContextParameters.getSecureSocketProtocols();
+ if (secureSocketProtocols != null) {
+ addCommaSeparatedList(props, SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, secureSocketProtocols.getSecureSocketProtocol());
+ }
+
+ KeyManagersParameters keyManagers = sslContextParameters.getKeyManagers();
+ if (keyManagers != null) {
+ addPropertyIfNotNull(props, SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, keyManagers.getAlgorithm());
+ addPropertyIfNotNull(props, SslConfigs.SSL_KEY_PASSWORD_CONFIG, keyManagers.getKeyPassword());
+
+ KeyStoreParameters keyStore = keyManagers.getKeyStore();
+ if (keyStore != null) {
+ addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, keyStore.getType());
+ addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keyStore.getResource());
+ addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, keyStore.getPassword());
+ }
+ }
+
+ TrustManagersParameters trustManagers = sslContextParameters.getTrustManagers();
+ if (trustManagers != null) {
+ addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, trustManagers.getAlgorithm());
+
+ KeyStoreParameters keyStore = trustManagers.getKeyStore();
+ if (keyStore != null) {
+ addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, keyStore.getType());
+ addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, keyStore.getResource());
+ addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, keyStore.getPassword());
+ }
+ }
+ }
+ }
+
private static <T> void addPropertyIfNotNull(Properties props, String key, T value) {
if (value != null) {
// Kafka expects all properties as String
@@ -384,6 +445,12 @@ public class KafkaConfiguration {
}
}
+ private static void addCommaSeparatedList(Properties props, String key, List<String> values) {
+ if (values != null && !values.isEmpty()) {
+ props.put(key, values.stream().collect(Collectors.joining(",")));
+ }
+ }
+
public String getGroupId() {
return groupId;
}
@@ -837,6 +904,17 @@ public class KafkaConfiguration {
this.securityProtocol = securityProtocol;
}
+ public SSLContextParameters getSslContextParameters() {
+ return sslContextParameters;
+ }
+
+ /**
+ * SSL configuration using a Camel {@link SSLContextParameters} object. If configured it's applied before the other SSL endpoint parameters.
+ */
+ public void setSslContextParameters(SSLContextParameters sslContextParameters) {
+ this.sslContextParameters = sslContextParameters;
+ }
+
public String getSslKeyPassword() {
return sslKeyPassword;
}