You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by rg...@apache.org on 2022/10/08 04:00:14 UTC

[flume] 01/14: FLUME-3315 fix kafka ssl https verification (#382)

This is an automated email from the ASF dual-hosted git repository.

rgoers pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/flume.git

commit 93fbd74336e573b5add3b81760d57a459f8bc440
Author: zhou zhuohan <84...@qq.com>
AuthorDate: Sun Sep 18 04:14:26 2022 +0800

    FLUME-3315 fix kafka ssl https verification (#382)
    
    Authored-by: ninjazhou <ni...@tencent.com>
---
 .../apache/flume/channel/kafka/KafkaChannel.java   |  47 +++++++++++-
 flume-ng-doc/sphinx/FlumeUserGuide.rst             |  33 +++++----
 .../org/apache/flume/sink/kafka/KafkaSink.java     |  12 +++-
 .../org/apache/flume/sink/kafka/TestKafkaSink.java |  41 +++++++++++
 .../org/apache/flume/sink/kafka/util/TestUtil.java |  30 +++++++-
 .../src/test/resources/keystorefile.jks            | Bin 0 -> 1294 bytes
 .../src/test/resources/truststorefile.jks          | Bin 0 -> 887 bytes
 .../org/apache/flume/source/kafka/KafkaSource.java |  80 +++++++++++++++------
 .../flume/source/kafka/KafkaSourceConstants.java   |   2 +
 .../source/kafka/KafkaSourceEmbeddedKafka.java     |  31 +++++++-
 .../apache/flume/source/kafka/TestKafkaSource.java |  65 ++++++++++++++++-
 .../src/test/resources/keystorefile.jks            | Bin 0 -> 1294 bytes
 .../src/test/resources/truststorefile.jks          | Bin 0 -> 887 bytes
 .../apache/flume/shared/kafka/KafkaSSLUtil.java    |   4 +-
 14 files changed, 295 insertions(+), 50 deletions(-)

diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
index 275a614e3..65cad9937 100644
--- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
+++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
@@ -52,6 +52,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.security.JaasUtils;
 import org.apache.kafka.common.utils.Time;
@@ -78,7 +79,33 @@ import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
-import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.*;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.BROKER_LIST_FLUME_KEY;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_ACKS;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_AUTO_OFFSET_RESET;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_GROUP_ID;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_KEY_DESERIALIZER;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_KEY_SERIALIZER;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_PARSE_AS_FLUME_EVENT;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_POLL_TIMEOUT;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_TOPIC;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_VALUE_DESERIAIZER;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_VALUE_SERIAIZER;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.GROUP_ID_FLUME;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.KAFKA_CONSUMER_PREFIX;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.KAFKA_PRODUCER_PREFIX;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.KEY_HEADER;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.MIGRATE_ZOOKEEPER_OFFSETS;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.PARSE_AS_FLUME_EVENT;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.PARTITION_HEADER_NAME;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.POLL_TIMEOUT;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.READ_SMALLEST_OFFSET;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.STATIC_PARTITION_CONF;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.TOPIC_CONFIG;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.ZOOKEEPER_CONNECT_FLUME_KEY;
+import static org.apache.flume.shared.kafka.KafkaSSLUtil.SSL_DISABLE_FQDN_CHECK;
+import static org.apache.flume.shared.kafka.KafkaSSLUtil.isSSLEnabled;
 
 public class KafkaChannel extends BasicChannelSemantics {
 
@@ -269,7 +296,14 @@ public class KafkaChannel extends BasicChannelSemantics {
     // Defaults overridden based on config
     producerProps.putAll(ctx.getSubProperties(KAFKA_PRODUCER_PREFIX));
     producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
-
+    //  The default value of `ssl.endpoint.identification.algorithm`
+    //  is changed to `https`, since kafka client 2.0+
+    //  And because flume does not accept an empty string as property value,
+    //  so we need to use an alternative custom property
+    //  `ssl.disableTLSHostnameVerification` to check if enable fqdn check.
+    if (isSSLEnabled(producerProps) && "true".equalsIgnoreCase(producerProps.getProperty(SSL_DISABLE_FQDN_CHECK))) {
+      producerProps.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
+    }
     KafkaSSLUtil.addGlobalSSLParameters(producerProps);
   }
 
@@ -288,7 +322,14 @@ public class KafkaChannel extends BasicChannelSemantics {
     consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
     consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
     consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
-
+    //  The default value of `ssl.endpoint.identification.algorithm`
+    //  is changed to `https`, since kafka client 2.0+
+    //  And because flume does not accept an empty string as property value,
+    //  so we need to use an alternative custom property
+    //  `ssl.disableTLSHostnameVerification` to check if enable fqdn check.
+    if (isSSLEnabled(consumerProps) && "true".equalsIgnoreCase(consumerProps.getProperty(SSL_DISABLE_FQDN_CHECK))) {
+      consumerProps.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
+    }
     KafkaSSLUtil.addGlobalSSLParameters(consumerProps);
   }
 
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 2f8938c95..9355fad16 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1665,14 +1665,15 @@ Specifying the truststore is optional here, the global truststore can be used in
 For more details about the global SSL setup, see the `SSL/TLS support`_ section.
 
 Note: By default the property ``ssl.endpoint.identification.algorithm``
-is not defined, so hostname verification is not performed.
-In order to enable hostname verification, set the following properties
+is not defined, so hostname verification is performed.
+Since flume does not accept an empty string as property value, in order to disable hostname verification,
+we need to set the following properties instead of setting ``ssl.endpoint.identification.algorithm`` to an empty string.
 
 .. code-block:: properties
 
-    a1.sources.source1.kafka.consumer.ssl.endpoint.identification.algorithm=HTTPS
+    a1.sources.source1.kafka.consumer.ssl.disableTLSHostnameVerification = true
 
-Once enabled, clients will verify the server's fully qualified domain name (FQDN)
+If not set to true, clients will verify the server's fully qualified domain name (FQDN)
 against one of the following two fields:
 
 #) Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3
@@ -3279,18 +3280,19 @@ Example configuration with server side authentication and data encryption.
     a1.sinks.sink1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
     a1.sinks.sink1.kafka.producer.ssl.truststore.password = <password to access the truststore>
 
-Specyfing the truststore is optional here, the global truststore can be used instead.
+Specifying the truststore is optional here, the global truststore can be used instead.
 For more details about the global SSL setup, see the `SSL/TLS support`_ section.
 
 Note: By default the property ``ssl.endpoint.identification.algorithm``
-is not defined, so hostname verification is not performed.
-In order to enable hostname verification, set the following properties
+is not defined, so hostname verification is performed.
+Since flume does not allow an empty string as configuration value, in order to disable hostname verification,
+we need to set the following properties instead of setting ``ssl.endpoint.identification.algorithm`` to an empty string.
 
 .. code-block:: properties
 
-    a1.sinks.sink1.kafka.producer.ssl.endpoint.identification.algorithm = HTTPS
+    a1.sinks.sink1.kafka.producer.ssl.disableTLSHostnameVerification = true
 
-Once enabled, clients will verify the server's fully qualified domain name (FQDN)
+If not set to true, clients will verify the server's fully qualified domain name (FQDN)
 against one of the following two fields:
 
 #) Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3
@@ -3685,19 +3687,20 @@ Example configuration with server side authentication and data encryption.
     a1.channels.channel1.kafka.consumer.ssl.truststore.location = /path/to/truststore.jks
     a1.channels.channel1.kafka.consumer.ssl.truststore.password = <password to access the truststore>
 
-Specyfing the truststore is optional here, the global truststore can be used instead.
+Specifying the truststore is optional here, the global truststore can be used instead.
 For more details about the global SSL setup, see the `SSL/TLS support`_ section.
 
 Note: By default the property ``ssl.endpoint.identification.algorithm``
-is not defined, so hostname verification is not performed.
-In order to enable hostname verification, set the following properties
+is not defined, so hostname verification is performed.
+Since flume does not accept an empty string as property value, in order to disable hostname verification,
+we need to set the following properties instead of setting ``ssl.endpoint.identification.algorithm`` to an empty string.
 
 .. code-block:: properties
 
-    a1.channels.channel1.kafka.producer.ssl.endpoint.identification.algorithm = HTTPS
-    a1.channels.channel1.kafka.consumer.ssl.endpoint.identification.algorithm = HTTPS
+    a1.channels.channel1.kafka.producer.ssl.disableTLSHostnameVerification = true
+    a1.channels.channel1.kafka.consumer.ssl.disableTLSHostnameVerification = true
 
-Once enabled, clients will verify the server's fully qualified domain name (FQDN)
+If not set to true, clients will verify the server's fully qualified domain name (FQDN)
 against one of the following two fields:
 
 #) Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
index 3b1a86697..e4c9ff7e2 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
@@ -43,6 +43,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.config.SslConfigs;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,6 +57,8 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.Future;
 
+import static org.apache.flume.shared.kafka.KafkaSSLUtil.SSL_DISABLE_FQDN_CHECK;
+import static org.apache.flume.shared.kafka.KafkaSSLUtil.isSSLEnabled;
 import static org.apache.flume.sink.kafka.KafkaSinkConstants.BOOTSTRAP_SERVERS_CONFIG;
 import static org.apache.flume.sink.kafka.KafkaSinkConstants.BATCH_SIZE;
 import static org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_BATCH_SIZE;
@@ -421,7 +424,14 @@ public class KafkaSink extends AbstractSink implements Configurable, BatchSizeSu
     kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, DEFAULT_VALUE_SERIAIZER);
     kafkaProps.putAll(context.getSubProperties(KAFKA_PRODUCER_PREFIX));
     kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
-
+    //  The default value of `ssl.endpoint.identification.algorithm`
+    //  is changed to `https`, since kafka client 2.0+
+    //  And because flume does not accept an empty string as property value,
+    //  so we need to use an alternative custom property
+    //  `ssl.disableTLSHostnameVerification` to check if enable fqdn check.
+    if (isSSLEnabled(kafkaProps) && "true".equalsIgnoreCase(kafkaProps.getProperty(SSL_DISABLE_FQDN_CHECK))) {
+      kafkaProps.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
+    }
     KafkaSSLUtil.addGlobalSSLParameters(kafkaProps);
   }
 
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
index 97dc0bdab..5a69b16fb 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
@@ -61,6 +61,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 
+import static org.apache.flume.shared.kafka.KafkaSSLUtil.SSL_DISABLE_FQDN_CHECK;
 import static org.apache.flume.sink.kafka.KafkaSinkConstants.AVRO_EVENT;
 import static org.apache.flume.sink.kafka.KafkaSinkConstants.BATCH_SIZE;
 import static org.apache.flume.sink.kafka.KafkaSinkConstants.BOOTSTRAP_SERVERS_CONFIG;
@@ -72,6 +73,9 @@ import static org.apache.flume.sink.kafka.KafkaSinkConstants.KAFKA_PRODUCER_PREF
 import static org.apache.flume.sink.kafka.KafkaSinkConstants.OLD_BATCH_SIZE;
 import static org.apache.flume.sink.kafka.KafkaSinkConstants.REQUIRED_ACKS_FLUME_KEY;
 import static org.apache.flume.sink.kafka.KafkaSinkConstants.TOPIC_CONFIG;
+import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
+import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG;
+import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -715,4 +719,41 @@ public class TestKafkaSink {
     return newTopic;
   }
 
+  @Test
+  public void testSslTopic() {
+    Sink kafkaSink = new KafkaSink();
+    Context context = prepareDefaultContext();
+    context.put(BOOTSTRAP_SERVERS_CONFIG, testUtil.getKafkaServerSslUrl());
+    context.put(KAFKA_PRODUCER_PREFIX + SECURITY_PROTOCOL_CONFIG, "SSL");
+    context.put(KAFKA_PRODUCER_PREFIX + SSL_TRUSTSTORE_LOCATION_CONFIG, "src/test/resources/truststorefile.jks");
+    context.put(KAFKA_PRODUCER_PREFIX + SSL_TRUSTSTORE_PASSWORD_CONFIG, "password");
+    context.put(KAFKA_PRODUCER_PREFIX + SSL_DISABLE_FQDN_CHECK, "true");
+    Configurables.configure(kafkaSink, context);
+
+    Channel memoryChannel = new MemoryChannel();
+    context = prepareDefaultContext();
+    Configurables.configure(memoryChannel, context);
+    kafkaSink.setChannel(memoryChannel);
+    kafkaSink.start();
+
+    String msg = "default-topic-test";
+    Transaction tx = memoryChannel.getTransaction();
+    tx.begin();
+    Event event = EventBuilder.withBody(msg.getBytes());
+    memoryChannel.put(event);
+    tx.commit();
+    tx.close();
+
+    try {
+      Sink.Status status = kafkaSink.process();
+      if (status == Sink.Status.BACKOFF) {
+        fail("Error Occurred");
+      }
+    } catch (EventDeliveryException ex) {
+      // ignore
+    }
+
+    checkMessageArrived(msg, DEFAULT_TOPIC);
+  }
+
 }
\ No newline at end of file
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java
index 1a87dc5ab..cfd57fc1c 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java
@@ -39,6 +39,11 @@ import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG;
+import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG;
+import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG;
+import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG;
+
 /**
  * A utility class for starting/stopping Kafka Server.
  */
@@ -50,8 +55,10 @@ public class TestUtil {
   private KafkaLocal kafkaServer;
   private boolean externalServers = true;
   private String kafkaServerUrl;
+  private String kafkaServerSslUrl;
   private String zkServerUrl;
   private int kafkaLocalPort;
+  private int kafkaLocalSslPort;
   private Properties clientProps;
   private int zkLocalPort;
   private KafkaConsumer<String, String> consumer;
@@ -80,7 +87,9 @@ public class TestUtil {
         String hostname = InetAddress.getLocalHost().getHostName();
         zkLocalPort = getNextPort();
         kafkaLocalPort = getNextPort();
+        kafkaLocalSslPort = getNextPort();
         kafkaServerUrl = hostname + ":" + kafkaLocalPort;
+        kafkaServerSslUrl = hostname + ":" + kafkaLocalSslPort;
         zkServerUrl = hostname + ":" + zkLocalPort;
       }
       clientProps = createClientProperties();
@@ -112,12 +121,23 @@ public class TestUtil {
           "/kafka-server.properties"));
       // override the Zookeeper url.
       kafkaProperties.setProperty("zookeeper.connect", getZkUrl());
-      // override the Kafka server port
-      kafkaProperties.setProperty("port", Integer.toString(kafkaLocalPort));
+      //  to enable ssl feature,
+      //  we need to use listeners instead of using port property
+      //  kafkaProperties.setProperty("port", Integer.toString(kafkaLocalPort));
+      kafkaProperties.put("listeners",
+              String.format("PLAINTEXT://%s,SSL://%s",
+                      getKafkaServerUrl(),
+                      getKafkaServerSslUrl()
+              )
+      );
+      //  ssl configuration
+      kafkaProperties.put(SSL_TRUSTSTORE_LOCATION_CONFIG, "src/test/resources/truststorefile.jks");
+      kafkaProperties.put(SSL_TRUSTSTORE_PASSWORD_CONFIG, "password");
+      kafkaProperties.put(SSL_KEYSTORE_LOCATION_CONFIG, "src/test/resources/keystorefile.jks");
+      kafkaProperties.put(SSL_KEYSTORE_PASSWORD_CONFIG, "password");
       kafkaServer = new KafkaLocal(kafkaProperties);
       kafkaServer.start();
       logger.info("Kafka Server is successfully started on port " + kafkaLocalPort);
-
       return true;
 
     } catch (Exception e) {
@@ -241,4 +261,8 @@ public class TestUtil {
   public String getKafkaServerUrl() {
     return kafkaServerUrl;
   }
+
+  public String getKafkaServerSslUrl() {
+    return kafkaServerSslUrl;
+  }
 }
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/keystorefile.jks b/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/keystorefile.jks
new file mode 100644
index 000000000..20ac6a816
Binary files /dev/null and b/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/keystorefile.jks differ
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/truststorefile.jks b/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/truststorefile.jks
new file mode 100644
index 000000000..a98c4907e
Binary files /dev/null and b/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/truststorefile.jks differ
diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
index 0c65302b2..7a64ed742 100644
--- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
+++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
@@ -16,22 +16,8 @@
  */
 package org.apache.flume.source.kafka;
 
-import java.io.ByteArrayInputStream;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
 import kafka.cluster.Broker;
 import kafka.cluster.BrokerEndPoint;
 import kafka.zk.KafkaZkClient;
@@ -61,20 +47,61 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.common.network.ListenerName;
-import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.security.JaasUtils;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Optional;
-
-import static org.apache.flume.source.kafka.KafkaSourceConstants.*;
-
 import scala.Option;
 import scala.collection.JavaConverters;
 
+import java.io.ByteArrayInputStream;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.apache.flume.shared.kafka.KafkaSSLUtil.SSL_DISABLE_FQDN_CHECK;
+import static org.apache.flume.shared.kafka.KafkaSSLUtil.isSSLEnabled;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.AVRO_EVENT;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.BATCH_DURATION_MS;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.BATCH_SIZE;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.BOOTSTRAP_SERVERS;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_AUTO_COMMIT;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_AVRO_EVENT;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_BATCH_DURATION;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_BATCH_SIZE;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_GROUP_ID;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_KEY_DESERIALIZER;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_SET_TOPIC_HEADER;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_TOPIC_HEADER;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_VALUE_DESERIALIZER;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.KAFKA_CONSUMER_PREFIX;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.KEY_HEADER;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.MIGRATE_ZOOKEEPER_OFFSETS;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.OFFSET_HEADER;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.OLD_GROUP_ID;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.PARTITION_HEADER;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.SET_TOPIC_HEADER;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.TIMESTAMP_HEADER;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPIC;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPICS;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPICS_REGEX;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPIC_HEADER;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.ZOOKEEPER_CONNECT_FLUME_KEY;
+
 /**
  * A Source for Kafka which reads messages from kafka topics.
  *
@@ -386,7 +413,7 @@ public class KafkaSource extends AbstractPollableSource
         // For backwards compatibility look up the bootstrap from zookeeper
         log.warn("{} is deprecated. Please use the parameter {}", ZOOKEEPER_CONNECT_FLUME_KEY, BOOTSTRAP_SERVERS);
 
-        // Lookup configured security protocol, just in case its not default
+        // Lookup configured security protocol, just in case it's not default
         String securityProtocolStr =
             context.getSubProperties(KAFKA_CONSUMER_PREFIX)
                 .get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG);
@@ -453,7 +480,14 @@ public class KafkaSource extends AbstractPollableSource
       kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
     }
     kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, DEFAULT_AUTO_COMMIT);
-
+    //  The default value of `ssl.endpoint.identification.algorithm`
+    //  is changed to `https`, since kafka client 2.0+
+    //  And because flume does not accept an empty string as property value,
+    //  so we need to use an alternative custom property
+    //  `ssl.disableTLSHostnameVerification` to check if enable fqdn check.
+    if (isSSLEnabled(kafkaProps) && "true".equalsIgnoreCase(kafkaProps.getProperty(SSL_DISABLE_FQDN_CHECK))) {
+      kafkaProps.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
+    }
     KafkaSSLUtil.addGlobalSSLParameters(kafkaProps);
   }
 
diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
index 0e15e7380..8ac437add 100644
--- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
+++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
@@ -59,4 +59,6 @@ public class KafkaSourceConstants {
   public static final boolean DEFAULT_SET_TOPIC_HEADER = true;
   public static final String TOPIC_HEADER = "topicHeader";
 
+
+
 }
diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
index 0799664d6..397af64cf 100644
--- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
+++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
@@ -39,6 +39,11 @@ import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG;
+import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG;
+import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG;
+import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG;
+
 public class KafkaSourceEmbeddedKafka {
 
   public static String HOST = InetAddress.getLoopbackAddress().getCanonicalHostName();
@@ -57,6 +62,7 @@ public class KafkaSourceEmbeddedKafka {
 
   private int zkPort = findFreePort(); // none-standard
   private int serverPort = findFreePort();
+  private int serverSslPort = findFreePort();
 
   KafkaProducer<String, byte[]> producer;
   File dir;
@@ -73,10 +79,25 @@ public class KafkaSourceEmbeddedKafka {
     props.put("zookeeper.connect",zookeeper.getConnectString());
     props.put("broker.id","1");
     props.put("host.name", "localhost");
-    props.put("port", String.valueOf(serverPort));
+    //  to enable ssl feature,
+    //  we need to use listeners instead of using port property
+    //  props.put("port", String.valueOf(serverPort));
+    props.put("listeners",
+            String.format("PLAINTEXT://%s:%d,SSL://%s:%d",
+                    HOST,
+                    serverPort,
+                    HOST,
+                    serverSslPort
+            )
+    );
     props.put("log.dir", dir.getAbsolutePath());
     props.put("offsets.topic.replication.factor", "1");
     props.put("auto.create.topics.enable", "false");
+    //  ssl configuration
+    props.put(SSL_TRUSTSTORE_LOCATION_CONFIG, "src/test/resources/truststorefile.jks");
+    props.put(SSL_TRUSTSTORE_PASSWORD_CONFIG, "password");
+    props.put(SSL_KEYSTORE_LOCATION_CONFIG, "src/test/resources/keystorefile.jks");
+    props.put(SSL_KEYSTORE_PASSWORD_CONFIG, "password");
     if (properties != null) {
       props.putAll(properties);
     }
@@ -100,6 +121,14 @@ public class KafkaSourceEmbeddedKafka {
     return HOST + ":" + serverPort;
   }
 
+  public String getBootstrapSslServers() {
+    return String.format("%s:%s", HOST, serverSslPort);
+  }
+
+  public String getBootstrapSslIpPortServers() {
+    return String.format("%s:%s", "127.0.0.1", serverSslPort);
+  }
+
   private void initProducer() {
     Properties props = new Properties();
     props.put("bootstrap.servers", HOST + ":" + serverPort);
diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
index 39e0e1951..ee913c9e6 100644
--- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
+++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
@@ -19,7 +19,6 @@ package org.apache.flume.source.kafka;
 
 import com.google.common.base.Charsets;
 import com.google.common.collect.Lists;
-
 import kafka.zk.KafkaZkClient;
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.EncoderFactory;
@@ -37,7 +36,6 @@ import org.apache.flume.lifecycle.LifecycleState;
 import org.apache.flume.source.avro.AvroFlumeEvent;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.common.errors.TopicExistsException;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.producer.KafkaProducer;
@@ -45,6 +43,7 @@ import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TopicExistsException;
 import org.apache.kafka.common.security.JaasUtils;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.Time;
@@ -74,11 +73,13 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.regex.Pattern;
 
+import static org.apache.flume.shared.kafka.KafkaSSLUtil.SSL_DISABLE_FQDN_CHECK;
 import static org.apache.flume.source.kafka.KafkaSourceConstants.AVRO_EVENT;
 import static org.apache.flume.source.kafka.KafkaSourceConstants.BATCH_DURATION_MS;
 import static org.apache.flume.source.kafka.KafkaSourceConstants.BATCH_SIZE;
 import static org.apache.flume.source.kafka.KafkaSourceConstants.BOOTSTRAP_SERVERS;
 import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_AUTO_COMMIT;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_TOPIC_HEADER;
 import static org.apache.flume.source.kafka.KafkaSourceConstants.KAFKA_CONSUMER_PREFIX;
 import static org.apache.flume.source.kafka.KafkaSourceConstants.OLD_GROUP_ID;
 import static org.apache.flume.source.kafka.KafkaSourceConstants.PARTITION_HEADER;
@@ -86,9 +87,13 @@ import static org.apache.flume.source.kafka.KafkaSourceConstants.TIMESTAMP_HEADE
 import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPIC;
 import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPICS;
 import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPICS_REGEX;
-import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_TOPIC_HEADER;
 import static org.apache.flume.source.kafka.KafkaSourceConstants.ZOOKEEPER_CONNECT_FLUME_KEY;
+import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
+import static org.apache.kafka.common.config.SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG;
+import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG;
+import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
@@ -1016,4 +1021,58 @@ public class TestKafkaSource {
     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
     return props;
   }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testSslSource() throws EventDeliveryException,
+          SecurityException,
+          IllegalArgumentException,
+          InterruptedException {
+    context.put(TOPICS, topic0);
+    context.put(BATCH_SIZE, "1");
+    context.put(BOOTSTRAP_SERVERS, kafkaServer.getBootstrapSslServers());
+    context.put(KAFKA_CONSUMER_PREFIX + SECURITY_PROTOCOL_CONFIG, "SSL");
+    context.put(KAFKA_CONSUMER_PREFIX + SSL_TRUSTSTORE_LOCATION_CONFIG, "src/test/resources/truststorefile.jks");
+    context.put(KAFKA_CONSUMER_PREFIX + SSL_TRUSTSTORE_PASSWORD_CONFIG, "password");
+    context.put(KAFKA_CONSUMER_PREFIX + SSL_DISABLE_FQDN_CHECK, "true");
+    kafkaSource.configure(context);
+    startKafkaSource();
+
+    Thread.sleep(500L);
+    kafkaServer.produce(topic0, "", "hello, world");
+    Thread.sleep(500L);
+
+    Assert.assertEquals("", kafkaSource.getConsumerProps().get(SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG));
+    Assert.assertEquals(Status.READY, kafkaSource.process());
+    Assert.assertEquals(1, events.size());
+    Assert.assertEquals("hello, world",
+            new String(events.get(0).getBody(), Charsets.UTF_8)
+    );
+  }
+
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testSslWithFqdnValidationFailedSource() throws EventDeliveryException,
+          SecurityException,
+          IllegalArgumentException,
+          InterruptedException {
+    context.put(TOPICS, topic0);
+    context.put(BATCH_SIZE, "1");
+    context.put(BOOTSTRAP_SERVERS, kafkaServer.getBootstrapSslIpPortServers());
+    context.put(KAFKA_CONSUMER_PREFIX + SECURITY_PROTOCOL_CONFIG, "SSL");
+    context.put(KAFKA_CONSUMER_PREFIX + SSL_TRUSTSTORE_LOCATION_CONFIG, "src/test/resources/truststorefile.jks");
+    context.put(KAFKA_CONSUMER_PREFIX + SSL_TRUSTSTORE_PASSWORD_CONFIG, "password");
+    kafkaSource.configure(context);
+    startKafkaSource();
+
+    Thread.sleep(500L);
+    kafkaServer.produce(topic0, "", "hello, world");
+    Thread.sleep(500L);
+
+    assertNull(kafkaSource.getConsumerProps().get(SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG));
+    Assert.assertEquals(Status.BACKOFF, kafkaSource.process());
+  }
+
+
 }
diff --git a/flume-ng-sources/flume-kafka-source/src/test/resources/keystorefile.jks b/flume-ng-sources/flume-kafka-source/src/test/resources/keystorefile.jks
new file mode 100644
index 000000000..20ac6a816
Binary files /dev/null and b/flume-ng-sources/flume-kafka-source/src/test/resources/keystorefile.jks differ
diff --git a/flume-ng-sources/flume-kafka-source/src/test/resources/truststorefile.jks b/flume-ng-sources/flume-kafka-source/src/test/resources/truststorefile.jks
new file mode 100644
index 000000000..a98c4907e
Binary files /dev/null and b/flume-ng-sources/flume-kafka-source/src/test/resources/truststorefile.jks differ
diff --git a/flume-shared/flume-shared-kafka/src/main/java/org/apache/flume/shared/kafka/KafkaSSLUtil.java b/flume-shared/flume-shared-kafka/src/main/java/org/apache/flume/shared/kafka/KafkaSSLUtil.java
index 78e6f639d..4f0300859 100644
--- a/flume-shared/flume-shared-kafka/src/main/java/org/apache/flume/shared/kafka/KafkaSSLUtil.java
+++ b/flume-shared/flume-shared-kafka/src/main/java/org/apache/flume/shared/kafka/KafkaSSLUtil.java
@@ -27,6 +27,8 @@ import java.util.Properties;
 
 public class KafkaSSLUtil {
 
+  public static final String SSL_DISABLE_FQDN_CHECK = "ssl.disableTLSHostnameVerification";
+
   private KafkaSSLUtil() {
   }
 
@@ -61,7 +63,7 @@ public class KafkaSSLUtil {
     }
   }
 
-  private static boolean isSSLEnabled(Properties kafkaProps) {
+  public static boolean isSSLEnabled(Properties kafkaProps) {
     String securityProtocol =
         kafkaProps.getProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG);