You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by rg...@apache.org on 2022/10/08 04:00:14 UTC
[flume] 01/14: FLUME-3315 fix kafka ssl https verification (#382)
This is an automated email from the ASF dual-hosted git repository.
rgoers pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/flume.git
commit 93fbd74336e573b5add3b81760d57a459f8bc440
Author: zhou zhuohan <84...@qq.com>
AuthorDate: Sun Sep 18 04:14:26 2022 +0800
FLUME-3315 fix kafka ssl https verification (#382)
Authored-by: ninjazhou <ni...@tencent.com>
---
.../apache/flume/channel/kafka/KafkaChannel.java | 47 +++++++++++-
flume-ng-doc/sphinx/FlumeUserGuide.rst | 33 +++++----
.../org/apache/flume/sink/kafka/KafkaSink.java | 12 +++-
.../org/apache/flume/sink/kafka/TestKafkaSink.java | 41 +++++++++++
.../org/apache/flume/sink/kafka/util/TestUtil.java | 30 +++++++-
.../src/test/resources/keystorefile.jks | Bin 0 -> 1294 bytes
.../src/test/resources/truststorefile.jks | Bin 0 -> 887 bytes
.../org/apache/flume/source/kafka/KafkaSource.java | 80 +++++++++++++++------
.../flume/source/kafka/KafkaSourceConstants.java | 2 +
.../source/kafka/KafkaSourceEmbeddedKafka.java | 31 +++++++-
.../apache/flume/source/kafka/TestKafkaSource.java | 65 ++++++++++++++++-
.../src/test/resources/keystorefile.jks | Bin 0 -> 1294 bytes
.../src/test/resources/truststorefile.jks | Bin 0 -> 887 bytes
.../apache/flume/shared/kafka/KafkaSSLUtil.java | 4 +-
14 files changed, 295 insertions(+), 50 deletions(-)
diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
index 275a614e3..65cad9937 100644
--- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
+++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
@@ -52,6 +52,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.security.JaasUtils;
import org.apache.kafka.common.utils.Time;
@@ -78,7 +79,33 @@ import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.*;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.BROKER_LIST_FLUME_KEY;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_ACKS;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_AUTO_OFFSET_RESET;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_GROUP_ID;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_KEY_DESERIALIZER;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_KEY_SERIALIZER;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_PARSE_AS_FLUME_EVENT;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_POLL_TIMEOUT;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_TOPIC;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_VALUE_DESERIAIZER;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_VALUE_SERIAIZER;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.GROUP_ID_FLUME;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.KAFKA_CONSUMER_PREFIX;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.KAFKA_PRODUCER_PREFIX;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.KEY_HEADER;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.MIGRATE_ZOOKEEPER_OFFSETS;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.PARSE_AS_FLUME_EVENT;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.PARTITION_HEADER_NAME;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.POLL_TIMEOUT;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.READ_SMALLEST_OFFSET;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.STATIC_PARTITION_CONF;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.TOPIC_CONFIG;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.ZOOKEEPER_CONNECT_FLUME_KEY;
+import static org.apache.flume.shared.kafka.KafkaSSLUtil.SSL_DISABLE_FQDN_CHECK;
+import static org.apache.flume.shared.kafka.KafkaSSLUtil.isSSLEnabled;
public class KafkaChannel extends BasicChannelSemantics {
@@ -269,7 +296,14 @@ public class KafkaChannel extends BasicChannelSemantics {
// Defaults overridden based on config
producerProps.putAll(ctx.getSubProperties(KAFKA_PRODUCER_PREFIX));
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
-
+ // The default value of `ssl.endpoint.identification.algorithm`
+ // is changed to `https`, since kafka client 2.0+
+ // And because flume does not accept an empty string as property value,
+ // so we need to use an alternative custom property
+ // `ssl.disableTLSHostnameVerification` to check if enable fqdn check.
+ if (isSSLEnabled(producerProps) && "true".equalsIgnoreCase(producerProps.getProperty(SSL_DISABLE_FQDN_CHECK))) {
+ producerProps.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
+ }
KafkaSSLUtil.addGlobalSSLParameters(producerProps);
}
@@ -288,7 +322,14 @@ public class KafkaChannel extends BasicChannelSemantics {
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
-
+ // The default value of `ssl.endpoint.identification.algorithm`
+ // is changed to `https`, since kafka client 2.0+
+ // And because flume does not accept an empty string as property value,
+ // so we need to use an alternative custom property
+ // `ssl.disableTLSHostnameVerification` to check if enable fqdn check.
+ if (isSSLEnabled(consumerProps) && "true".equalsIgnoreCase(consumerProps.getProperty(SSL_DISABLE_FQDN_CHECK))) {
+ consumerProps.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
+ }
KafkaSSLUtil.addGlobalSSLParameters(consumerProps);
}
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 2f8938c95..9355fad16 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1665,14 +1665,15 @@ Specifying the truststore is optional here, the global truststore can be used in
For more details about the global SSL setup, see the `SSL/TLS support`_ section.
Note: By default the property ``ssl.endpoint.identification.algorithm``
-is not defined, so hostname verification is not performed.
-In order to enable hostname verification, set the following properties
+is not defined, so hostname verification is performed.
+Since flume does not accept an empty string as property value, in order to disable hostname verification,
+we need to set the following properties instead of setting ``ssl.endpoint.identification.algorithm`` to an empty string.
.. code-block:: properties
- a1.sources.source1.kafka.consumer.ssl.endpoint.identification.algorithm=HTTPS
+ a1.sources.source1.kafka.consumer.ssl.disableTLSHostnameVerification = true
-Once enabled, clients will verify the server's fully qualified domain name (FQDN)
+If not set to true, clients will verify the server's fully qualified domain name (FQDN)
against one of the following two fields:
#) Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3
@@ -3279,18 +3280,19 @@ Example configuration with server side authentication and data encryption.
a1.sinks.sink1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
a1.sinks.sink1.kafka.producer.ssl.truststore.password = <password to access the truststore>
-Specyfing the truststore is optional here, the global truststore can be used instead.
+Specifying the truststore is optional here, the global truststore can be used instead.
For more details about the global SSL setup, see the `SSL/TLS support`_ section.
Note: By default the property ``ssl.endpoint.identification.algorithm``
-is not defined, so hostname verification is not performed.
-In order to enable hostname verification, set the following properties
+is not defined, so hostname verification is performed.
+Since flume does not allow an empty string as configuration value, in order to disable hostname verification,
+we need to set the following properties instead of setting ``ssl.endpoint.identification.algorithm`` to an empty string.
.. code-block:: properties
- a1.sinks.sink1.kafka.producer.ssl.endpoint.identification.algorithm = HTTPS
+ a1.sinks.sink1.kafka.producer.ssl.disableTLSHostnameVerification = true
-Once enabled, clients will verify the server's fully qualified domain name (FQDN)
+If not set to true, clients will verify the server's fully qualified domain name (FQDN)
against one of the following two fields:
#) Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3
@@ -3685,19 +3687,20 @@ Example configuration with server side authentication and data encryption.
a1.channels.channel1.kafka.consumer.ssl.truststore.location = /path/to/truststore.jks
a1.channels.channel1.kafka.consumer.ssl.truststore.password = <password to access the truststore>
-Specyfing the truststore is optional here, the global truststore can be used instead.
+Specifying the truststore is optional here, the global truststore can be used instead.
For more details about the global SSL setup, see the `SSL/TLS support`_ section.
Note: By default the property ``ssl.endpoint.identification.algorithm``
-is not defined, so hostname verification is not performed.
-In order to enable hostname verification, set the following properties
+is not defined, so hostname verification is performed.
+Since flume does not accept an empty string as property value, in order to disable hostname verification,
+we need to set the following properties instead of setting ``ssl.endpoint.identification.algorithm`` to an empty string.
.. code-block:: properties
- a1.channels.channel1.kafka.producer.ssl.endpoint.identification.algorithm = HTTPS
- a1.channels.channel1.kafka.consumer.ssl.endpoint.identification.algorithm = HTTPS
+ a1.channels.channel1.kafka.producer.ssl.disableTLSHostnameVerification = true
+ a1.channels.channel1.kafka.consumer.ssl.disableTLSHostnameVerification = true
-Once enabled, clients will verify the server's fully qualified domain name (FQDN)
+If not set to true, clients will verify the server's fully qualified domain name (FQDN)
against one of the following two fields:
#) Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
index 3b1a86697..e4c9ff7e2 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
@@ -43,6 +43,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.config.SslConfigs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,6 +57,8 @@ import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Future;
+import static org.apache.flume.shared.kafka.KafkaSSLUtil.SSL_DISABLE_FQDN_CHECK;
+import static org.apache.flume.shared.kafka.KafkaSSLUtil.isSSLEnabled;
import static org.apache.flume.sink.kafka.KafkaSinkConstants.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.flume.sink.kafka.KafkaSinkConstants.BATCH_SIZE;
import static org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_BATCH_SIZE;
@@ -421,7 +424,14 @@ public class KafkaSink extends AbstractSink implements Configurable, BatchSizeSu
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, DEFAULT_VALUE_SERIAIZER);
kafkaProps.putAll(context.getSubProperties(KAFKA_PRODUCER_PREFIX));
kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
-
+ // The default value of `ssl.endpoint.identification.algorithm`
+ // is changed to `https`, since kafka client 2.0+
+ // And because flume does not accept an empty string as property value,
+ // so we need to use an alternative custom property
+ // `ssl.disableTLSHostnameVerification` to check if enable fqdn check.
+ if (isSSLEnabled(kafkaProps) && "true".equalsIgnoreCase(kafkaProps.getProperty(SSL_DISABLE_FQDN_CHECK))) {
+ kafkaProps.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
+ }
KafkaSSLUtil.addGlobalSSLParameters(kafkaProps);
}
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
index 97dc0bdab..5a69b16fb 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
@@ -61,6 +61,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import static org.apache.flume.shared.kafka.KafkaSSLUtil.SSL_DISABLE_FQDN_CHECK;
import static org.apache.flume.sink.kafka.KafkaSinkConstants.AVRO_EVENT;
import static org.apache.flume.sink.kafka.KafkaSinkConstants.BATCH_SIZE;
import static org.apache.flume.sink.kafka.KafkaSinkConstants.BOOTSTRAP_SERVERS_CONFIG;
@@ -72,6 +73,9 @@ import static org.apache.flume.sink.kafka.KafkaSinkConstants.KAFKA_PRODUCER_PREF
import static org.apache.flume.sink.kafka.KafkaSinkConstants.OLD_BATCH_SIZE;
import static org.apache.flume.sink.kafka.KafkaSinkConstants.REQUIRED_ACKS_FLUME_KEY;
import static org.apache.flume.sink.kafka.KafkaSinkConstants.TOPIC_CONFIG;
+import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
+import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG;
+import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -715,4 +719,41 @@ public class TestKafkaSink {
return newTopic;
}
+ @Test
+ public void testSslTopic() {
+ Sink kafkaSink = new KafkaSink();
+ Context context = prepareDefaultContext();
+ context.put(BOOTSTRAP_SERVERS_CONFIG, testUtil.getKafkaServerSslUrl());
+ context.put(KAFKA_PRODUCER_PREFIX + SECURITY_PROTOCOL_CONFIG, "SSL");
+ context.put(KAFKA_PRODUCER_PREFIX + SSL_TRUSTSTORE_LOCATION_CONFIG, "src/test/resources/truststorefile.jks");
+ context.put(KAFKA_PRODUCER_PREFIX + SSL_TRUSTSTORE_PASSWORD_CONFIG, "password");
+ context.put(KAFKA_PRODUCER_PREFIX + SSL_DISABLE_FQDN_CHECK, "true");
+ Configurables.configure(kafkaSink, context);
+
+ Channel memoryChannel = new MemoryChannel();
+ context = prepareDefaultContext();
+ Configurables.configure(memoryChannel, context);
+ kafkaSink.setChannel(memoryChannel);
+ kafkaSink.start();
+
+ String msg = "default-topic-test";
+ Transaction tx = memoryChannel.getTransaction();
+ tx.begin();
+ Event event = EventBuilder.withBody(msg.getBytes());
+ memoryChannel.put(event);
+ tx.commit();
+ tx.close();
+
+ try {
+ Sink.Status status = kafkaSink.process();
+ if (status == Sink.Status.BACKOFF) {
+ fail("Error Occurred");
+ }
+ } catch (EventDeliveryException ex) {
+ // ignore
+ }
+
+ checkMessageArrived(msg, DEFAULT_TOPIC);
+ }
+
}
\ No newline at end of file
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java
index 1a87dc5ab..cfd57fc1c 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java
@@ -39,6 +39,11 @@ import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
+import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG;
+import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG;
+import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG;
+import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG;
+
/**
* A utility class for starting/stopping Kafka Server.
*/
@@ -50,8 +55,10 @@ public class TestUtil {
private KafkaLocal kafkaServer;
private boolean externalServers = true;
private String kafkaServerUrl;
+ private String kafkaServerSslUrl;
private String zkServerUrl;
private int kafkaLocalPort;
+ private int kafkaLocalSslPort;
private Properties clientProps;
private int zkLocalPort;
private KafkaConsumer<String, String> consumer;
@@ -80,7 +87,9 @@ public class TestUtil {
String hostname = InetAddress.getLocalHost().getHostName();
zkLocalPort = getNextPort();
kafkaLocalPort = getNextPort();
+ kafkaLocalSslPort = getNextPort();
kafkaServerUrl = hostname + ":" + kafkaLocalPort;
+ kafkaServerSslUrl = hostname + ":" + kafkaLocalSslPort;
zkServerUrl = hostname + ":" + zkLocalPort;
}
clientProps = createClientProperties();
@@ -112,12 +121,23 @@ public class TestUtil {
"/kafka-server.properties"));
// override the Zookeeper url.
kafkaProperties.setProperty("zookeeper.connect", getZkUrl());
- // override the Kafka server port
- kafkaProperties.setProperty("port", Integer.toString(kafkaLocalPort));
+ // to enable ssl feature,
+ // we need to use listeners instead of using port property
+ // kafkaProperties.setProperty("port", Integer.toString(kafkaLocalPort));
+ kafkaProperties.put("listeners",
+ String.format("PLAINTEXT://%s,SSL://%s",
+ getKafkaServerUrl(),
+ getKafkaServerSslUrl()
+ )
+ );
+ // ssl configuration
+ kafkaProperties.put(SSL_TRUSTSTORE_LOCATION_CONFIG, "src/test/resources/truststorefile.jks");
+ kafkaProperties.put(SSL_TRUSTSTORE_PASSWORD_CONFIG, "password");
+ kafkaProperties.put(SSL_KEYSTORE_LOCATION_CONFIG, "src/test/resources/keystorefile.jks");
+ kafkaProperties.put(SSL_KEYSTORE_PASSWORD_CONFIG, "password");
kafkaServer = new KafkaLocal(kafkaProperties);
kafkaServer.start();
logger.info("Kafka Server is successfully started on port " + kafkaLocalPort);
-
return true;
} catch (Exception e) {
@@ -241,4 +261,8 @@ public class TestUtil {
public String getKafkaServerUrl() {
return kafkaServerUrl;
}
+
+ public String getKafkaServerSslUrl() {
+ return kafkaServerSslUrl;
+ }
}
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/keystorefile.jks b/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/keystorefile.jks
new file mode 100644
index 000000000..20ac6a816
Binary files /dev/null and b/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/keystorefile.jks differ
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/truststorefile.jks b/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/truststorefile.jks
new file mode 100644
index 000000000..a98c4907e
Binary files /dev/null and b/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/truststorefile.jks differ
diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
index 0c65302b2..7a64ed742 100644
--- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
+++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
@@ -16,22 +16,8 @@
*/
package org.apache.flume.source.kafka;
-import java.io.ByteArrayInputStream;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
import kafka.cluster.Broker;
import kafka.cluster.BrokerEndPoint;
import kafka.zk.KafkaZkClient;
@@ -61,20 +47,61 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.network.ListenerName;
-import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.JaasUtils;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Optional;
-
-import static org.apache.flume.source.kafka.KafkaSourceConstants.*;
-
import scala.Option;
import scala.collection.JavaConverters;
+import java.io.ByteArrayInputStream;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.apache.flume.shared.kafka.KafkaSSLUtil.SSL_DISABLE_FQDN_CHECK;
+import static org.apache.flume.shared.kafka.KafkaSSLUtil.isSSLEnabled;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.AVRO_EVENT;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.BATCH_DURATION_MS;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.BATCH_SIZE;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.BOOTSTRAP_SERVERS;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_AUTO_COMMIT;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_AVRO_EVENT;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_BATCH_DURATION;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_BATCH_SIZE;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_GROUP_ID;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_KEY_DESERIALIZER;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_SET_TOPIC_HEADER;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_TOPIC_HEADER;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_VALUE_DESERIALIZER;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.KAFKA_CONSUMER_PREFIX;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.KEY_HEADER;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.MIGRATE_ZOOKEEPER_OFFSETS;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.OFFSET_HEADER;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.OLD_GROUP_ID;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.PARTITION_HEADER;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.SET_TOPIC_HEADER;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.TIMESTAMP_HEADER;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPIC;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPICS;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPICS_REGEX;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPIC_HEADER;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.ZOOKEEPER_CONNECT_FLUME_KEY;
+
/**
* A Source for Kafka which reads messages from kafka topics.
*
@@ -386,7 +413,7 @@ public class KafkaSource extends AbstractPollableSource
// For backwards compatibility look up the bootstrap from zookeeper
log.warn("{} is deprecated. Please use the parameter {}", ZOOKEEPER_CONNECT_FLUME_KEY, BOOTSTRAP_SERVERS);
- // Lookup configured security protocol, just in case its not default
+ // Lookup configured security protocol, just in case it's not default
String securityProtocolStr =
context.getSubProperties(KAFKA_CONSUMER_PREFIX)
.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG);
@@ -453,7 +480,14 @@ public class KafkaSource extends AbstractPollableSource
kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
}
kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, DEFAULT_AUTO_COMMIT);
-
+ // The default value of `ssl.endpoint.identification.algorithm`
+ // is changed to `https`, since kafka client 2.0+
+ // And because flume does not accept an empty string as property value,
+ // so we need to use an alternative custom property
+ // `ssl.disableTLSHostnameVerification` to check if enable fqdn check.
+ if (isSSLEnabled(kafkaProps) && "true".equalsIgnoreCase(kafkaProps.getProperty(SSL_DISABLE_FQDN_CHECK))) {
+ kafkaProps.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
+ }
KafkaSSLUtil.addGlobalSSLParameters(kafkaProps);
}
diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
index 0e15e7380..8ac437add 100644
--- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
+++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
@@ -59,4 +59,6 @@ public class KafkaSourceConstants {
public static final boolean DEFAULT_SET_TOPIC_HEADER = true;
public static final String TOPIC_HEADER = "topicHeader";
+
+
}
diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
index 0799664d6..397af64cf 100644
--- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
+++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
@@ -39,6 +39,11 @@ import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG;
+import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG;
+import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG;
+import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG;
+
public class KafkaSourceEmbeddedKafka {
public static String HOST = InetAddress.getLoopbackAddress().getCanonicalHostName();
@@ -57,6 +62,7 @@ public class KafkaSourceEmbeddedKafka {
private int zkPort = findFreePort(); // none-standard
private int serverPort = findFreePort();
+ private int serverSslPort = findFreePort();
KafkaProducer<String, byte[]> producer;
File dir;
@@ -73,10 +79,25 @@ public class KafkaSourceEmbeddedKafka {
props.put("zookeeper.connect",zookeeper.getConnectString());
props.put("broker.id","1");
props.put("host.name", "localhost");
- props.put("port", String.valueOf(serverPort));
+ // to enable ssl feature,
+ // we need to use listeners instead of using port property
+ // props.put("port", String.valueOf(serverPort));
+ props.put("listeners",
+ String.format("PLAINTEXT://%s:%d,SSL://%s:%d",
+ HOST,
+ serverPort,
+ HOST,
+ serverSslPort
+ )
+ );
props.put("log.dir", dir.getAbsolutePath());
props.put("offsets.topic.replication.factor", "1");
props.put("auto.create.topics.enable", "false");
+ // ssl configuration
+ props.put(SSL_TRUSTSTORE_LOCATION_CONFIG, "src/test/resources/truststorefile.jks");
+ props.put(SSL_TRUSTSTORE_PASSWORD_CONFIG, "password");
+ props.put(SSL_KEYSTORE_LOCATION_CONFIG, "src/test/resources/keystorefile.jks");
+ props.put(SSL_KEYSTORE_PASSWORD_CONFIG, "password");
if (properties != null) {
props.putAll(properties);
}
@@ -100,6 +121,14 @@ public class KafkaSourceEmbeddedKafka {
return HOST + ":" + serverPort;
}
+ public String getBootstrapSslServers() {
+ return String.format("%s:%s", HOST, serverSslPort);
+ }
+
+ public String getBootstrapSslIpPortServers() {
+ return String.format("%s:%s", "127.0.0.1", serverSslPort);
+ }
+
private void initProducer() {
Properties props = new Properties();
props.put("bootstrap.servers", HOST + ":" + serverPort);
diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
index 39e0e1951..ee913c9e6 100644
--- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
+++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
@@ -19,7 +19,6 @@ package org.apache.flume.source.kafka;
import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
-
import kafka.zk.KafkaZkClient;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.EncoderFactory;
@@ -37,7 +36,6 @@ import org.apache.flume.lifecycle.LifecycleState;
import org.apache.flume.source.avro.AvroFlumeEvent;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
@@ -45,6 +43,7 @@ import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.security.JaasUtils;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Time;
@@ -74,11 +73,13 @@ import java.util.Map;
import java.util.Properties;
import java.util.regex.Pattern;
+import static org.apache.flume.shared.kafka.KafkaSSLUtil.SSL_DISABLE_FQDN_CHECK;
import static org.apache.flume.source.kafka.KafkaSourceConstants.AVRO_EVENT;
import static org.apache.flume.source.kafka.KafkaSourceConstants.BATCH_DURATION_MS;
import static org.apache.flume.source.kafka.KafkaSourceConstants.BATCH_SIZE;
import static org.apache.flume.source.kafka.KafkaSourceConstants.BOOTSTRAP_SERVERS;
import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_AUTO_COMMIT;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_TOPIC_HEADER;
import static org.apache.flume.source.kafka.KafkaSourceConstants.KAFKA_CONSUMER_PREFIX;
import static org.apache.flume.source.kafka.KafkaSourceConstants.OLD_GROUP_ID;
import static org.apache.flume.source.kafka.KafkaSourceConstants.PARTITION_HEADER;
@@ -86,9 +87,13 @@ import static org.apache.flume.source.kafka.KafkaSourceConstants.TIMESTAMP_HEADE
import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPIC;
import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPICS;
import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPICS_REGEX;
-import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_TOPIC_HEADER;
import static org.apache.flume.source.kafka.KafkaSourceConstants.ZOOKEEPER_CONNECT_FLUME_KEY;
+import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
+import static org.apache.kafka.common.config.SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG;
+import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG;
+import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
@@ -1016,4 +1021,58 @@ public class TestKafkaSource {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
return props;
}
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testSslSource() throws EventDeliveryException,
+ SecurityException,
+ IllegalArgumentException,
+ InterruptedException {
+ context.put(TOPICS, topic0);
+ context.put(BATCH_SIZE, "1");
+ context.put(BOOTSTRAP_SERVERS, kafkaServer.getBootstrapSslServers());
+ context.put(KAFKA_CONSUMER_PREFIX + SECURITY_PROTOCOL_CONFIG, "SSL");
+ context.put(KAFKA_CONSUMER_PREFIX + SSL_TRUSTSTORE_LOCATION_CONFIG, "src/test/resources/truststorefile.jks");
+ context.put(KAFKA_CONSUMER_PREFIX + SSL_TRUSTSTORE_PASSWORD_CONFIG, "password");
+ context.put(KAFKA_CONSUMER_PREFIX + SSL_DISABLE_FQDN_CHECK, "true");
+ kafkaSource.configure(context);
+ startKafkaSource();
+
+ Thread.sleep(500L);
+ kafkaServer.produce(topic0, "", "hello, world");
+ Thread.sleep(500L);
+
+ Assert.assertEquals("", kafkaSource.getConsumerProps().get(SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG));
+ Assert.assertEquals(Status.READY, kafkaSource.process());
+ Assert.assertEquals(1, events.size());
+ Assert.assertEquals("hello, world",
+ new String(events.get(0).getBody(), Charsets.UTF_8)
+ );
+ }
+
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testSslWithFqdnValidationFailedSource() throws EventDeliveryException,
+ SecurityException,
+ IllegalArgumentException,
+ InterruptedException {
+ context.put(TOPICS, topic0);
+ context.put(BATCH_SIZE, "1");
+ context.put(BOOTSTRAP_SERVERS, kafkaServer.getBootstrapSslIpPortServers());
+ context.put(KAFKA_CONSUMER_PREFIX + SECURITY_PROTOCOL_CONFIG, "SSL");
+ context.put(KAFKA_CONSUMER_PREFIX + SSL_TRUSTSTORE_LOCATION_CONFIG, "src/test/resources/truststorefile.jks");
+ context.put(KAFKA_CONSUMER_PREFIX + SSL_TRUSTSTORE_PASSWORD_CONFIG, "password");
+ kafkaSource.configure(context);
+ startKafkaSource();
+
+ Thread.sleep(500L);
+ kafkaServer.produce(topic0, "", "hello, world");
+ Thread.sleep(500L);
+
+ assertNull(kafkaSource.getConsumerProps().get(SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG));
+ Assert.assertEquals(Status.BACKOFF, kafkaSource.process());
+ }
+
+
}
diff --git a/flume-ng-sources/flume-kafka-source/src/test/resources/keystorefile.jks b/flume-ng-sources/flume-kafka-source/src/test/resources/keystorefile.jks
new file mode 100644
index 000000000..20ac6a816
Binary files /dev/null and b/flume-ng-sources/flume-kafka-source/src/test/resources/keystorefile.jks differ
diff --git a/flume-ng-sources/flume-kafka-source/src/test/resources/truststorefile.jks b/flume-ng-sources/flume-kafka-source/src/test/resources/truststorefile.jks
new file mode 100644
index 000000000..a98c4907e
Binary files /dev/null and b/flume-ng-sources/flume-kafka-source/src/test/resources/truststorefile.jks differ
diff --git a/flume-shared/flume-shared-kafka/src/main/java/org/apache/flume/shared/kafka/KafkaSSLUtil.java b/flume-shared/flume-shared-kafka/src/main/java/org/apache/flume/shared/kafka/KafkaSSLUtil.java
index 78e6f639d..4f0300859 100644
--- a/flume-shared/flume-shared-kafka/src/main/java/org/apache/flume/shared/kafka/KafkaSSLUtil.java
+++ b/flume-shared/flume-shared-kafka/src/main/java/org/apache/flume/shared/kafka/KafkaSSLUtil.java
@@ -27,6 +27,8 @@ import java.util.Properties;
public class KafkaSSLUtil {
+ public static final String SSL_DISABLE_FQDN_CHECK = "ssl.disableTLSHostnameVerification";
+
private KafkaSSLUtil() {
}
@@ -61,7 +63,7 @@ public class KafkaSSLUtil {
}
}
- private static boolean isSSLEnabled(Properties kafkaProps) {
+ public static boolean isSSLEnabled(Properties kafkaProps) {
String securityProtocol =
kafkaProps.getProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG);