You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by tr...@apache.org on 2022/10/30 13:19:23 UTC

[flume] branch trunk updated: FLUME-3443 FLUME-2985 Update to Apache Kafka 3.3.1 and remove deprecated properties and Zookeeper offset migration (relating to 0.8 to 0.9 migration) (#389)

This is an automated email from the ASF dual-hosted git repository.

tristan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/flume.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3d8d59136  FLUME-3443 FLUME-2985 Update to Apache Kafka 3.3.1 and remove deprecated properties and Zookeeper offset migration (relating to 0.8 to 0.9 migration) (#389)
3d8d59136 is described below

commit 3d8d59136c8a0f2b5e1125a995836e39ecdfea08
Author: Tristan Stevens <tm...@users.noreply.github.com>
AuthorDate: Sun Oct 30 13:19:18 2022 +0000

     FLUME-3443 FLUME-2985 Update to Apache Kafka 3.3.1 and remove deprecated properties and Zookeeper offset migration (relating to 0.8 to 0.9 migration) (#389)
---
 dev-docs/UpdateLicenses.md                         |   6 +-
 flume-ng-channels/flume-kafka-channel/pom.xml      |  11 ++
 .../apache/flume/channel/kafka/KafkaChannel.java   | 154 ----------------
 .../channel/kafka/KafkaChannelConfiguration.java   |  18 --
 .../channel/kafka/TestBasicFunctionality.java      |  28 ---
 .../channel/kafka/TestOffsetsAndMigration.java     | 133 --------------
 flume-ng-sinks/flume-ng-kafka-sink/pom.xml         |  13 ++
 .../org/apache/flume/sink/kafka/KafkaSink.java     |  61 -------
 .../flume/sink/kafka/KafkaSinkConstants.java       |   9 -
 .../org/apache/flume/sink/kafka/TestKafkaSink.java |  24 ---
 .../apache/flume/sink/kafka/util/KafkaLocal.java   |   8 +-
 flume-ng-sources/flume-kafka-source/pom.xml        |   8 +
 .../org/apache/flume/source/kafka/KafkaSource.java | 157 +---------------
 .../flume/source/kafka/KafkaSourceConstants.java   |   8 -
 .../source/kafka/KafkaSourceEmbeddedKafka.java     |  13 +-
 .../apache/flume/source/kafka/TestKafkaSource.java | 199 +--------------------
 flume-shared/flume-shared-kafka-test/pom.xml       |   1 -
 pom.xml                                            |   2 +-
 18 files changed, 50 insertions(+), 803 deletions(-)

diff --git a/dev-docs/UpdateLicenses.md b/dev-docs/UpdateLicenses.md
index 6f6fdfb2d..4d74d7498 100644
--- a/dev-docs/UpdateLicenses.md
+++ b/dev-docs/UpdateLicenses.md
@@ -524,9 +524,9 @@ Findbugs.
 same as jar-with-dependencies
 
 ```
-   kafka-clients-2.7.2.jar
-   kafka-raft-2.7.2.jar
-   kafka_2.13-2.7.2.jar
+   kafka-clients-3.3.1.jar
+   kafka-raft-3.3.1.jar
+   kafka_2.13-3.3.1.jar
 ```
 
 ALv2. Entry in NOTICE.  Additional entry in NOTICE and LICENSE for
diff --git a/flume-ng-channels/flume-kafka-channel/pom.xml b/flume-ng-channels/flume-kafka-channel/pom.xml
index 61e55905d..0fd70bf3a 100644
--- a/flume-ng-channels/flume-kafka-channel/pom.xml
+++ b/flume-ng-channels/flume-kafka-channel/pom.xml
@@ -57,9 +57,20 @@ limitations under the License.
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka_${scala.version}</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka_${scala.version}</artifactId>
+      <classifier>test</classifier>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka-clients</artifactId>
+      <version>${kafka.version}</version>
+      <classifier>test</classifier>
     </dependency>
     <dependency>
       <groupId>org.apache.flume.flume-ng-sinks</groupId>
diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
index 65cad9937..8336e94e2 100644
--- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
+++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
@@ -20,7 +20,6 @@ package org.apache.flume.channel.kafka;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
-import kafka.zk.KafkaZkClient;
 import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.DecoderFactory;
@@ -50,15 +49,11 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.security.JaasUtils;
-import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Option;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -80,30 +75,24 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.BOOTSTRAP_SERVERS_CONFIG;
-import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.BROKER_LIST_FLUME_KEY;
 import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_ACKS;
 import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_AUTO_OFFSET_RESET;
 import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_GROUP_ID;
 import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_KEY_DESERIALIZER;
 import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_KEY_SERIALIZER;
-import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS;
 import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_PARSE_AS_FLUME_EVENT;
 import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_POLL_TIMEOUT;
 import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_TOPIC;
 import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_VALUE_DESERIAIZER;
 import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_VALUE_SERIAIZER;
-import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.GROUP_ID_FLUME;
 import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.KAFKA_CONSUMER_PREFIX;
 import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.KAFKA_PRODUCER_PREFIX;
 import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.KEY_HEADER;
-import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.MIGRATE_ZOOKEEPER_OFFSETS;
 import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.PARSE_AS_FLUME_EVENT;
 import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.PARTITION_HEADER_NAME;
 import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.POLL_TIMEOUT;
-import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.READ_SMALLEST_OFFSET;
 import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.STATIC_PARTITION_CONF;
 import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.TOPIC_CONFIG;
-import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.ZOOKEEPER_CONNECT_FLUME_KEY;
 import static org.apache.flume.shared.kafka.KafkaSSLUtil.SSL_DISABLE_FQDN_CHECK;
 import static org.apache.flume.shared.kafka.KafkaSSLUtil.isSSLEnabled;
 
@@ -112,10 +101,6 @@ public class KafkaChannel extends BasicChannelSemantics {
   private static final Logger logger =
           LoggerFactory.getLogger(KafkaChannel.class);
 
-  // Constants used only for offset migration zookeeper connections
-  private static final int ZK_SESSION_TIMEOUT = 30000;
-  private static final int ZK_CONNECTION_TIMEOUT = 30000;
-
   private final Properties consumerProps = new Properties();
   private final Properties producerProps = new Properties();
 
@@ -124,13 +109,10 @@ public class KafkaChannel extends BasicChannelSemantics {
 
   private AtomicReference<String> topic = new AtomicReference<String>();
   private boolean parseAsFlumeEvent = DEFAULT_PARSE_AS_FLUME_EVENT;
-  private String zookeeperConnect = null;
   private String topicStr = DEFAULT_TOPIC;
   private String groupId = DEFAULT_GROUP_ID;
   private String partitionHeader = null;
   private Integer staticPartitionId;
-  @Deprecated
-  private boolean migrateZookeeperOffsets = DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS;
 
   // used to indicate if a rebalance has occurred during the current transaction
   AtomicBoolean rebalanceFlag = new AtomicBoolean();
@@ -159,11 +141,6 @@ public class KafkaChannel extends BasicChannelSemantics {
   @Override
   public void start() {
     logger.info("Starting Kafka Channel: {}", getName());
-    // As a migration step check if there are any offsets from the group stored in kafka
-    // If not read them from Zookeeper and commit them to Kafka
-    if (migrateZookeeperOffsets && zookeeperConnect != null && !zookeeperConnect.isEmpty()) {
-      migrateOffsets();
-    }
     producer = new KafkaProducer<String, byte[]>(producerProps);
     // We always have just one topic being read by one thread
     logger.info("Topic = {}", topic.get());
@@ -193,10 +170,6 @@ public class KafkaChannel extends BasicChannelSemantics {
 
   @Override
   public void configure(Context ctx) {
-
-    // Can remove in the next release
-    translateOldProps(ctx);
-
     topicStr = ctx.getString(TOPIC_CONFIG);
     if (topicStr == null || topicStr.isEmpty()) {
       topicStr = DEFAULT_TOPIC;
@@ -224,10 +197,6 @@ public class KafkaChannel extends BasicChannelSemantics {
     staticPartitionId = ctx.getInteger(STATIC_PARTITION_CONF);
     partitionHeader = ctx.getString(PARTITION_HEADER_NAME);
 
-    migrateZookeeperOffsets = ctx.getBoolean(MIGRATE_ZOOKEEPER_OFFSETS,
-      DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS);
-    zookeeperConnect = ctx.getString(ZOOKEEPER_CONNECT_FLUME_KEY);
-
     if (logger.isDebugEnabled() && LogPrivacyUtil.allowLogPrintConfig()) {
       logger.debug("Kafka properties: {}", ctx);
     }
@@ -237,57 +206,6 @@ public class KafkaChannel extends BasicChannelSemantics {
     }
   }
 
-  // We can remove this once the properties are officially deprecated
-  private void translateOldProps(Context ctx) {
-
-    if (!(ctx.containsKey(TOPIC_CONFIG))) {
-      ctx.put(TOPIC_CONFIG, ctx.getString("topic"));
-      logger.warn("{} is deprecated. Please use the parameter {}", "topic", TOPIC_CONFIG);
-    }
-
-    // Broker List
-    // If there is no value we need to check and set the old param and log a warning message
-    if (!(ctx.containsKey(BOOTSTRAP_SERVERS_CONFIG))) {
-      String brokerList = ctx.getString(BROKER_LIST_FLUME_KEY);
-      if (brokerList == null || brokerList.isEmpty()) {
-        throw new ConfigurationException("Bootstrap Servers must be specified");
-      } else {
-        ctx.put(BOOTSTRAP_SERVERS_CONFIG, brokerList);
-        logger.warn("{} is deprecated. Please use the parameter {}",
-                    BROKER_LIST_FLUME_KEY, BOOTSTRAP_SERVERS_CONFIG);
-      }
-    }
-
-    // GroupId
-    // If there is an old Group Id set, then use that if no groupId is set.
-    if (!(ctx.containsKey(KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG))) {
-      String oldGroupId = ctx.getString(GROUP_ID_FLUME);
-      if (oldGroupId != null  && !oldGroupId.isEmpty()) {
-        ctx.put(KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG, oldGroupId);
-        logger.warn("{} is deprecated. Please use the parameter {}",
-                    GROUP_ID_FLUME, KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG);
-      }
-    }
-
-    if (!(ctx.containsKey((KAFKA_CONSUMER_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)))) {
-      Boolean oldReadSmallest = ctx.getBoolean(READ_SMALLEST_OFFSET);
-      String auto;
-      if (oldReadSmallest != null) {
-        if (oldReadSmallest) {
-          auto = "earliest";
-        } else {
-          auto = "latest";
-        }
-        ctx.put(KAFKA_CONSUMER_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,auto);
-        logger.warn("{} is deprecated. Please use the parameter {}",
-                    READ_SMALLEST_OFFSET,
-                    KAFKA_CONSUMER_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
-      }
-
-    }
-  }
-
-
   private void setProducerProps(Context ctx, String bootStrapServers) {
     producerProps.clear();
     producerProps.put(ProducerConfig.ACKS_CONFIG, DEFAULT_ACKS);
@@ -352,78 +270,6 @@ public class KafkaChannel extends BasicChannelSemantics {
     }
   }
 
-  private void migrateOffsets() {
-    try (KafkaZkClient zkClient = KafkaZkClient.apply(zookeeperConnect,
-            JaasUtils.isZkSaslEnabled(), ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT, 10,
-            Time.SYSTEM, "kafka.server", "SessionExpireListener", scala.Option.empty(),
-            scala.Option.empty());
-         KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(consumerProps)) {
-      Map<TopicPartition, OffsetAndMetadata> kafkaOffsets = getKafkaOffsets(consumer);
-      if (kafkaOffsets == null) {
-        logger.warn("Topic " + topicStr + " not found in Kafka. Offset migration will be skipped.");
-        return;
-      }
-      if (!kafkaOffsets.isEmpty()) {
-        logger.info("Found Kafka offsets for topic {}. Will not migrate from zookeeper", topicStr);
-        logger.debug("Offsets found: {}", kafkaOffsets);
-        return;
-      }
-
-      logger.info("No Kafka offsets found. Migrating zookeeper offsets");
-      Map<TopicPartition, OffsetAndMetadata> zookeeperOffsets =
-              getZookeeperOffsets(zkClient, consumer);
-      if (zookeeperOffsets.isEmpty()) {
-        logger.warn("No offsets to migrate found in Zookeeper");
-        return;
-      }
-
-      logger.info("Committing Zookeeper offsets to Kafka");
-      logger.debug("Offsets to commit: {}", zookeeperOffsets);
-      consumer.commitSync(zookeeperOffsets);
-      // Read the offsets to verify they were committed
-      Map<TopicPartition, OffsetAndMetadata> newKafkaOffsets = getKafkaOffsets(consumer);
-      logger.debug("Offsets committed: {}", newKafkaOffsets);
-      if (newKafkaOffsets == null
-          || !newKafkaOffsets.keySet().containsAll(zookeeperOffsets.keySet())) {
-        throw new FlumeException("Offsets could not be committed");
-      }
-    }
-  }
-
-
-  private Map<TopicPartition, OffsetAndMetadata> getKafkaOffsets(
-      KafkaConsumer<String, byte[]> client) {
-    Map<TopicPartition, OffsetAndMetadata> offsets = null;
-    List<PartitionInfo> partitions = client.partitionsFor(topicStr);
-    if (partitions != null) {
-      offsets = new HashMap<>();
-      for (PartitionInfo partition : partitions) {
-        TopicPartition key = new TopicPartition(topicStr, partition.partition());
-        OffsetAndMetadata offsetAndMetadata = client.committed(key);
-        if (offsetAndMetadata != null) {
-          offsets.put(key, offsetAndMetadata);
-        }
-      }
-    }
-    return offsets;
-  }
-
-  private Map<TopicPartition, OffsetAndMetadata> getZookeeperOffsets(
-          KafkaZkClient zkClient, KafkaConsumer<String, byte[]> consumer) {
-    Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
-    List<PartitionInfo> partitions = consumer.partitionsFor(topicStr);
-    for (PartitionInfo partition : partitions) {
-      TopicPartition topicPartition = new TopicPartition(topicStr, partition.partition());
-      Option<Object> optionOffset = zkClient.getConsumerOffset(groupId, topicPartition);
-      if (optionOffset.nonEmpty()) {
-        Long offset = (Long) optionOffset.get();
-        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(offset);
-        offsets.put(topicPartition, offsetAndMetadata);
-      }
-    }
-    return offsets;
-  }
-
   private void decommissionConsumerAndRecords(ConsumerAndRecords c) {
     c.consumer.wakeup();
     c.consumer.close();
diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java
index ad5a15bf8..87fcbc1a3 100644
--- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java
+++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java
@@ -52,22 +52,4 @@ public class KafkaChannelConfiguration {
   public static final String PARTITION_HEADER_NAME = "partitionIdHeader";
   public static final String STATIC_PARTITION_CONF = "defaultPartitionId";
 
-  public static final String MIGRATE_ZOOKEEPER_OFFSETS = "migrateZookeeperOffsets";
-  public static final boolean DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS = true;
-
-  /*** Old Configuration Parameters ****/
-  public static final String BROKER_LIST_KEY = "metadata.broker.list";
-  public static final String REQUIRED_ACKS_KEY = "request.required.acks";
-  public static final String BROKER_LIST_FLUME_KEY = "brokerList";
-  //public static final String TOPIC = "topic";
-  public static final String GROUP_ID_FLUME = "groupId";
-  public static final String AUTO_COMMIT_ENABLED = "auto.commit.enable";
-  public static final String ZOOKEEPER_CONNECT = "zookeeper.connect";
-  public static final String ZOOKEEPER_CONNECT_FLUME_KEY = "zookeeperConnect";
-  public static final String TIMEOUT = "timeout";
-  public static final String DEFAULT_TIMEOUT = "100";
-  public static final String CONSUMER_TIMEOUT = "consumer.timeout.ms";
-
-  public static final String READ_SMALLEST_OFFSET = "readSmallestOffset";
-  public static final boolean DEFAULT_READ_SMALLEST_OFFSET = false;
 }
diff --git a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestBasicFunctionality.java b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestBasicFunctionality.java
index d119b429f..953133ff0 100644
--- a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestBasicFunctionality.java
+++ b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestBasicFunctionality.java
@@ -23,8 +23,6 @@ import org.apache.flume.Event;
 import org.apache.flume.Transaction;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.instrumentation.kafka.KafkaChannelCounter;
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.junit.Assert;
@@ -39,10 +37,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.BOOTSTRAP_SERVERS_CONFIG;
-import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.BROKER_LIST_FLUME_KEY;
-import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.GROUP_ID_FLUME;
 import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.KEY_HEADER;
-import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.READ_SMALLEST_OFFSET;
 import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.TOPIC_CONFIG;
 
 public class TestBasicFunctionality extends TestKafkaChannelBase {
@@ -65,29 +60,6 @@ public class TestBasicFunctionality extends TestKafkaChannelBase {
     Assert.assertEquals(consumerProps.getProperty("another-parameter"), "1");
   }
 
-  @Test
-  public void testOldConfig() throws Exception {
-    Context context = new Context();
-    context.put(BROKER_LIST_FLUME_KEY, testUtil.getKafkaServerUrl());
-    context.put(GROUP_ID_FLUME, "flume-something");
-    context.put(READ_SMALLEST_OFFSET, "true");
-    context.put("topic", topic);
-
-    final KafkaChannel channel = new KafkaChannel();
-    Configurables.configure(channel, context);
-
-    Properties consumerProps = channel.getConsumerProps();
-    Properties producerProps = channel.getProducerProps();
-
-    Assert.assertEquals(producerProps.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG),
-        testUtil.getKafkaServerUrl());
-    Assert.assertEquals(consumerProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG),
-        "flume-something");
-    Assert.assertEquals(consumerProps.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
-        "earliest");
-  }
-
-
   @Test
   public void testStopAndStart() throws Exception {
     doTestStopAndStart(false, false);
diff --git a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestOffsetsAndMigration.java b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestOffsetsAndMigration.java
index a399b543b..e58b5b158 100644
--- a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestOffsetsAndMigration.java
+++ b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestOffsetsAndMigration.java
@@ -18,33 +18,14 @@
  */
 package org.apache.flume.channel.kafka;
 
-import kafka.zk.KafkaZkClient;
-import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.Transaction;
-import org.apache.flume.lifecycle.LifecycleState;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.security.JaasUtils;
-import org.apache.kafka.common.utils.Time;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.Executors;
-
-import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.GROUP_ID_FLUME;
-import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.ZOOKEEPER_CONNECT_FLUME_KEY;
 
 public class TestOffsetsAndMigration extends TestKafkaChannelBase {
 
@@ -77,26 +58,6 @@ public class TestOffsetsAndMigration extends TestKafkaChannelBase {
     Assert.assertTrue(Arrays.equals(message.getBytes(), event.getBody()));
   }
 
-  @Test
-  public void testMigrateOffsetsNone() throws Exception {
-    doTestMigrateZookeeperOffsets(false, false, "testMigrateOffsets-none");
-  }
-
-  @Test
-  public void testMigrateOffsetsZookeeper() throws Exception {
-    doTestMigrateZookeeperOffsets(true, false, "testMigrateOffsets-zookeeper");
-  }
-
-  @Test
-  public void testMigrateOffsetsKafka() throws Exception {
-    doTestMigrateZookeeperOffsets(false, true, "testMigrateOffsets-kafka");
-  }
-
-  @Test
-  public void testMigrateOffsetsBoth() throws Exception {
-    doTestMigrateZookeeperOffsets(true, true, "testMigrateOffsets-both");
-  }
-
   private Event takeEventWithoutCommittingTxn(KafkaChannel channel) {
     for (int i = 0; i < 10; i++) {
       Transaction txn = channel.getTransaction();
@@ -113,98 +74,4 @@ public class TestOffsetsAndMigration extends TestKafkaChannelBase {
     return null;
   }
 
-  private void doTestMigrateZookeeperOffsets(boolean hasZookeeperOffsets, boolean hasKafkaOffsets,
-                                             String group) throws Exception {
-    // create a topic with 1 partition for simplicity
-    topic = findUnusedTopic();
-    createTopic(topic, 1);
-
-    Context context = prepareDefaultContext(false);
-    context.put(ZOOKEEPER_CONNECT_FLUME_KEY, testUtil.getZkUrl());
-    context.put(GROUP_ID_FLUME, group);
-    final KafkaChannel channel = createChannel(context);
-
-    // Produce some data and save an offset
-    Long fifthOffset = 0L;
-    Long tenthOffset = 0L;
-    Properties props = channel.getProducerProps();
-    KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);
-    for (int i = 1; i <= 50; i++) {
-      ProducerRecord<String, byte[]> data =
-          new ProducerRecord<>(topic, null, String.valueOf(i).getBytes());
-      RecordMetadata recordMetadata = producer.send(data).get();
-      if (i == 5) {
-        fifthOffset = recordMetadata.offset();
-      }
-      if (i == 10) {
-        tenthOffset = recordMetadata.offset();
-      }
-    }
-
-    // Commit 10th offset to zookeeper
-    if (hasZookeeperOffsets) {
-      KafkaZkClient zkClient = KafkaZkClient.apply(testUtil.getZkUrl(),
-              JaasUtils.isZkSaslEnabled(), 30000, 30000, 10, Time.SYSTEM,
-              "kafka.server", "SessionExpireListener", scala.Option.empty(), scala.Option.empty());
-      zkClient.getConsumerOffset(group, new TopicPartition(topic, 0));
-      Long offset = tenthOffset + 1;
-      zkClient.setOrCreateConsumerOffset(group, new TopicPartition(topic, 0), offset);
-      zkClient.close();
-    }
-
-    // Commit 5th offset to kafka
-    if (hasKafkaOffsets) {
-      Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
-      offsets.put(new TopicPartition(topic, 0), new OffsetAndMetadata(fifthOffset + 1));
-      KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(channel.getConsumerProps());
-      consumer.commitSync(offsets);
-      consumer.close();
-    }
-
-    // Start the channel and read some data
-    channel.start();
-    ExecutorCompletionService<Void> submitterSvc = new
-        ExecutorCompletionService<>(Executors.newCachedThreadPool());
-    List<Event> events = pullEvents(channel, submitterSvc,
-        20, false, false);
-    wait(submitterSvc, 5);
-    List<Integer> finals = new ArrayList<>(40);
-    for (Event event : events) {
-      finals.add(Integer.parseInt(new String(event.getBody())));
-    }
-    channel.stop();
-
-    if (!hasKafkaOffsets && !hasZookeeperOffsets) {
-      // The default behavior is to read the entire log
-      Assert.assertTrue("Channel should read the the first message", finals.contains(1));
-    } else if (hasKafkaOffsets && hasZookeeperOffsets) {
-      // Respect Kafka offsets if they exist
-      Assert.assertFalse("Channel should not read the 5th message", finals.contains(5));
-      Assert.assertTrue("Channel should read the 6th message", finals.contains(6));
-    } else if (hasKafkaOffsets) {
-      // Respect Kafka offsets if they exist (don't fail if zookeeper offsets are missing)
-      Assert.assertFalse("Channel should not read the 5th message", finals.contains(5));
-      Assert.assertTrue("Channel should read the 6th message", finals.contains(6));
-    } else {
-      // Otherwise migrate the ZooKeeper offsets if they exist
-      Assert.assertFalse("Channel should not read the 10th message", finals.contains(10));
-      Assert.assertTrue("Channel should read the 11th message", finals.contains(11));
-    }
-  }
-
-  @Test
-  public void testMigrateZookeeperOffsetsWhenTopicNotExists() throws Exception {
-    topic = findUnusedTopic();
-
-    Context context = prepareDefaultContext(false);
-    context.put(ZOOKEEPER_CONNECT_FLUME_KEY, testUtil.getZkUrl());
-    context.put(GROUP_ID_FLUME, "testMigrateOffsets-nonExistingTopic");
-    KafkaChannel channel = createChannel(context);
-
-    channel.start();
-
-    Assert.assertEquals(LifecycleState.START, channel.getLifecycleState());
-
-    channel.stop();
-  }
 }
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/pom.xml b/flume-ng-sinks/flume-ng-kafka-sink/pom.xml
index d75265a9c..aa73b7559 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/pom.xml
+++ b/flume-ng-sinks/flume-ng-kafka-sink/pom.xml
@@ -93,10 +93,23 @@
       <artifactId>kafka_${scala.version}</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka_${scala.version}</artifactId>
+      <classifier>test</classifier>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka-clients</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <scope>test</scope>
+      <classifier>test</classifier>
+      <version>${kafka.version}</version>
+    </dependency>
 
     <dependency>
       <groupId>org.apache.logging.log4j</groupId>
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
index add6916df..b13d5b3b8 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
@@ -65,7 +65,6 @@ import static org.apache.flume.shared.kafka.KafkaSSLUtil.isSSLEnabled;
 import static org.apache.flume.sink.kafka.KafkaSinkConstants.BOOTSTRAP_SERVERS_CONFIG;
 import static org.apache.flume.sink.kafka.KafkaSinkConstants.BATCH_SIZE;
 import static org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_BATCH_SIZE;
-import static org.apache.flume.sink.kafka.KafkaSinkConstants.BROKER_LIST_FLUME_KEY;
 import static org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_ACKS;
 import static org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_KEY_SERIALIZER;
 import static org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_TOPIC;
@@ -73,12 +72,8 @@ import static org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_VALUE_SERIA
 import static org.apache.flume.sink.kafka.KafkaSinkConstants.KAFKA_HEADER;
 import static org.apache.flume.sink.kafka.KafkaSinkConstants.KAFKA_PRODUCER_PREFIX;
 import static org.apache.flume.sink.kafka.KafkaSinkConstants.KEY_HEADER;
-import static org.apache.flume.sink.kafka.KafkaSinkConstants.OLD_BATCH_SIZE;
-import static org.apache.flume.sink.kafka.KafkaSinkConstants.REQUIRED_ACKS_FLUME_KEY;
 import static org.apache.flume.sink.kafka.KafkaSinkConstants.TIMESTAMP_HEADER;
 import static org.apache.flume.sink.kafka.KafkaSinkConstants.TOPIC_CONFIG;
-import static org.apache.flume.sink.kafka.KafkaSinkConstants.KEY_SERIALIZER_KEY;
-import static org.apache.flume.sink.kafka.KafkaSinkConstants.MESSAGE_SERIALIZER_KEY;
 
 /**
  * A Flume Sink that can publish messages to Kafka.
@@ -338,8 +333,6 @@ public class KafkaSink extends AbstractSink implements Configurable, BatchSizeSu
   @Override
   public void configure(Context context) {
 
-    translateOldProps(context);
-
     String topicStr = context.getString(TOPIC_CONFIG);
     if (topicStr == null || topicStr.isEmpty()) {
       topicStr = DEFAULT_TOPIC;
@@ -394,60 +387,6 @@ public class KafkaSink extends AbstractSink implements Configurable, BatchSizeSu
     }
   }
 
-  private void translateOldProps(Context ctx) {
-
-    if (!(ctx.containsKey(TOPIC_CONFIG))) {
-      ctx.put(TOPIC_CONFIG, ctx.getString("topic"));
-      logger.warn("{} is deprecated. Please use the parameter {}", "topic", TOPIC_CONFIG);
-    }
-
-    //Broker List
-    // If there is no value we need to check and set the old param and log a warning message
-    if (!(ctx.containsKey(BOOTSTRAP_SERVERS_CONFIG))) {
-      String brokerList = ctx.getString(BROKER_LIST_FLUME_KEY);
-      if (brokerList == null || brokerList.isEmpty()) {
-        throw new ConfigurationException("Bootstrap Servers must be specified");
-      } else {
-        ctx.put(BOOTSTRAP_SERVERS_CONFIG, brokerList);
-        logger.warn("{} is deprecated. Please use the parameter {}",
-                    BROKER_LIST_FLUME_KEY, BOOTSTRAP_SERVERS_CONFIG);
-      }
-    }
-
-    //batch Size
-    if (!(ctx.containsKey(BATCH_SIZE))) {
-      String oldBatchSize = ctx.getString(OLD_BATCH_SIZE);
-      if ( oldBatchSize != null  && !oldBatchSize.isEmpty())  {
-        ctx.put(BATCH_SIZE, oldBatchSize);
-        logger.warn("{} is deprecated. Please use the parameter {}", OLD_BATCH_SIZE, BATCH_SIZE);
-      }
-    }
-
-    // Acks
-    if (!(ctx.containsKey(KAFKA_PRODUCER_PREFIX + ProducerConfig.ACKS_CONFIG))) {
-      String requiredKey = ctx.getString(
-              KafkaSinkConstants.REQUIRED_ACKS_FLUME_KEY);
-      if (!(requiredKey == null) && !(requiredKey.isEmpty())) {
-        ctx.put(KAFKA_PRODUCER_PREFIX + ProducerConfig.ACKS_CONFIG, requiredKey);
-        logger.warn("{} is deprecated. Please use the parameter {}", REQUIRED_ACKS_FLUME_KEY,
-                KAFKA_PRODUCER_PREFIX + ProducerConfig.ACKS_CONFIG);
-      }
-    }
-
-    if (ctx.containsKey(KEY_SERIALIZER_KEY )) {
-      logger.warn("{} is deprecated. Flume now uses the latest Kafka producer which implements " +
-          "a different interface for serializers. Please use the parameter {}",
-          KEY_SERIALIZER_KEY,KAFKA_PRODUCER_PREFIX + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
-    }
-
-    if (ctx.containsKey(MESSAGE_SERIALIZER_KEY)) {
-      logger.warn("{} is deprecated. Flume now uses the latest Kafka producer which implements " +
-                  "a different interface for serializers. Please use the parameter {}",
-                  MESSAGE_SERIALIZER_KEY,
-                  KAFKA_PRODUCER_PREFIX + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
-    }
-  }
-
   private void setProducerProps(Context context, String bootStrapServers) {
     kafkaProps.clear();
     kafkaProps.put(ProducerConfig.ACKS_CONFIG, DEFAULT_ACKS);
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
index 3f1f279e0..df9000f18 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
@@ -55,14 +55,5 @@ public class KafkaSinkConstants {
   public static final String DEFAULT_TOPIC = "default-flume-topic";
   public static final String DEFAULT_ACKS = "1";
 
-  /* Old Properties */
-
-  /* Properties */
-
-  public static final String OLD_BATCH_SIZE = "batchSize";
-  public static final String MESSAGE_SERIALIZER_KEY = "serializer.class";
-  public static final String KEY_SERIALIZER_KEY = "key.serializer.class";
-  public static final String BROKER_LIST_FLUME_KEY = "brokerList";
-  public static final String REQUIRED_ACKS_FLUME_KEY = "requiredAcks";
 }
 
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
index fd5130ca9..3dbcd000f 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
@@ -39,7 +39,6 @@ import org.apache.flume.shared.kafka.test.PartitionOption;
 import org.apache.flume.shared.kafka.test.PartitionTestScenario;
 import org.apache.flume.sink.kafka.util.TestUtil;
 import org.apache.flume.source.avro.AvroFlumeEvent;
-import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.producer.ProducerConfig;
@@ -69,13 +68,10 @@ import static org.apache.flume.shared.kafka.KafkaSSLUtil.SSL_DISABLE_FQDN_CHECK;
 import static org.apache.flume.sink.kafka.KafkaSinkConstants.AVRO_EVENT;
 import static org.apache.flume.sink.kafka.KafkaSinkConstants.BATCH_SIZE;
 import static org.apache.flume.sink.kafka.KafkaSinkConstants.BOOTSTRAP_SERVERS_CONFIG;
-import static org.apache.flume.sink.kafka.KafkaSinkConstants.BROKER_LIST_FLUME_KEY;
 import static org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_KEY_SERIALIZER;
 import static org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_TOPIC;
 import static org.apache.flume.sink.kafka.KafkaSinkConstants.KAFKA_PREFIX;
 import static org.apache.flume.sink.kafka.KafkaSinkConstants.KAFKA_PRODUCER_PREFIX;
-import static org.apache.flume.sink.kafka.KafkaSinkConstants.OLD_BATCH_SIZE;
-import static org.apache.flume.sink.kafka.KafkaSinkConstants.REQUIRED_ACKS_FLUME_KEY;
 import static org.apache.flume.sink.kafka.KafkaSinkConstants.TOPIC_CONFIG;
 import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
 import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG;
@@ -138,26 +134,6 @@ public class TestKafkaSink {
                  "localhost:9092,localhost:9092");
   }
 
-  @Test
-  public void testOldProperties() {
-    KafkaSink kafkaSink = new KafkaSink();
-    Context context = new Context();
-    context.put("topic", "test-topic");
-    context.put(OLD_BATCH_SIZE, "300");
-    context.put(BROKER_LIST_FLUME_KEY, "localhost:9092,localhost:9092");
-    context.put(REQUIRED_ACKS_FLUME_KEY, "all");
-    Configurables.configure(kafkaSink, context);
-
-    Properties kafkaProps = kafkaSink.getKafkaProps();
-
-    assertEquals(kafkaSink.getTopic(), "test-topic");
-    assertEquals(kafkaSink.getBatchSize(), 300);
-    assertEquals(kafkaProps.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG),
-                 "localhost:9092,localhost:9092");
-    assertEquals(kafkaProps.getProperty(ProducerConfig.ACKS_CONFIG), "all");
-
-  }
-
   @Test
   public void testDefaultTopic() {
     Sink kafkaSink = new KafkaSink();
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaLocal.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaLocal.java
index 6d89bd33d..700c94393 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaLocal.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaLocal.java
@@ -19,7 +19,9 @@
 package org.apache.flume.sink.kafka.util;
 
 import kafka.server.KafkaConfig;
-import kafka.server.KafkaServerStartable;
+import kafka.server.KafkaServer;
+import kafka.utils.TestUtils;
+import org.apache.kafka.common.utils.Time;
 
 import java.io.IOException;
 import java.util.Properties;
@@ -30,14 +32,14 @@ import java.util.Properties;
  */
 public class KafkaLocal {
 
-  public KafkaServerStartable kafka;
+  public KafkaServer kafka;
   public ZooKeeperLocal zookeeper;
 
   public KafkaLocal(Properties kafkaProperties) throws IOException, InterruptedException {
     KafkaConfig kafkaConfig = KafkaConfig.fromProps(kafkaProperties);
 
     // start local kafka broker
-    kafka = new KafkaServerStartable(kafkaConfig);
+    kafka = TestUtils.createServer(kafkaConfig, Time.SYSTEM);
   }
 
   public void start() throws Exception {
diff --git a/flume-ng-sources/flume-kafka-source/pom.xml b/flume-ng-sources/flume-kafka-source/pom.xml
index edb9faeac..5e5c4cf49 100644
--- a/flume-ng-sources/flume-kafka-source/pom.xml
+++ b/flume-ng-sources/flume-kafka-source/pom.xml
@@ -78,10 +78,18 @@
     <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka-clients</artifactId>
+      <version>${kafka.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <classifier>test</classifier>
+      <version>${kafka.version}</version>
     </dependency>
     <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka_${scala.version}</artifactId>
+      <version>${kafka.version}</version>
       <classifier>test</classifier>
       <scope>test</scope>
     </dependency>
diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
index 2bd19757d..a99b6cfdf 100644
--- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
+++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
@@ -18,13 +18,10 @@ package org.apache.flume.source.kafka;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
-import kafka.cluster.Broker;
-import kafka.cluster.BrokerEndPoint;
 import kafka.zk.KafkaZkClient;
 import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.commons.lang.StringUtils;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
@@ -38,7 +35,6 @@ import org.apache.flume.instrumentation.kafka.KafkaSourceCounter;
 import org.apache.flume.shared.kafka.KafkaSSLUtil;
 import org.apache.flume.source.AbstractPollableSource;
 import org.apache.flume.source.avro.AvroFlumeEvent;
-import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -50,14 +46,9 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.common.network.ListenerName;
-import org.apache.kafka.common.security.JaasUtils;
-import org.apache.kafka.common.security.auth.SecurityProtocol;
-import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Option;
-import scala.collection.JavaConverters;
 
 import java.io.ByteArrayInputStream;
 import java.time.Duration;
@@ -72,7 +63,6 @@ import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Pattern;
-import java.util.stream.Collectors;
 
 import static org.apache.flume.shared.kafka.KafkaSSLUtil.SSL_DISABLE_FQDN_CHECK;
 import static org.apache.flume.shared.kafka.KafkaSSLUtil.isSSLEnabled;
@@ -86,24 +76,19 @@ import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_BATCH_D
 import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_BATCH_SIZE;
 import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_GROUP_ID;
 import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_KEY_DESERIALIZER;
-import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS;
 import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_SET_TOPIC_HEADER;
 import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_TOPIC_HEADER;
 import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_VALUE_DESERIALIZER;
 import static org.apache.flume.source.kafka.KafkaSourceConstants.KAFKA_CONSUMER_PREFIX;
 import static org.apache.flume.source.kafka.KafkaSourceConstants.KAFKA_HEADER;
 import static org.apache.flume.source.kafka.KafkaSourceConstants.KEY_HEADER;
-import static org.apache.flume.source.kafka.KafkaSourceConstants.MIGRATE_ZOOKEEPER_OFFSETS;
 import static org.apache.flume.source.kafka.KafkaSourceConstants.OFFSET_HEADER;
-import static org.apache.flume.source.kafka.KafkaSourceConstants.OLD_GROUP_ID;
 import static org.apache.flume.source.kafka.KafkaSourceConstants.PARTITION_HEADER;
 import static org.apache.flume.source.kafka.KafkaSourceConstants.SET_TOPIC_HEADER;
 import static org.apache.flume.source.kafka.KafkaSourceConstants.TIMESTAMP_HEADER;
-import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPIC;
 import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPICS;
 import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPICS_REGEX;
 import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPIC_HEADER;
-import static org.apache.flume.source.kafka.KafkaSourceConstants.ZOOKEEPER_CONNECT_FLUME_KEY;
 
 /**
  * A Source for Kafka which reads messages from kafka topics.
@@ -160,11 +145,8 @@ public class KafkaSource extends AbstractPollableSource
 
   private Subscriber subscriber;
 
-  private String zookeeperConnect;
   private String bootstrapServers;
   private String groupId = DEFAULT_GROUP_ID;
-  @Deprecated
-  private boolean migrateZookeeperOffsets = DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS;
   private String topicHeader = null;
   private boolean setTopicHeader;
   private Map<String, String> headerMap;
@@ -388,10 +370,6 @@ public class KafkaSource extends AbstractPollableSource
     rebalanceFlag = new AtomicBoolean(false);
     kafkaProps = new Properties();
 
-    // can be removed in the next release
-    // See https://issues.apache.org/jira/browse/FLUME-2896
-    translateOldProperties(context);
-
     String topicProperty = context.getString(TOPICS_REGEX);
     if (topicProperty != null && !topicProperty.isEmpty()) {
       // create subscriber that uses pattern-based subscription
@@ -413,28 +391,9 @@ public class KafkaSource extends AbstractPollableSource
       log.debug(AVRO_EVENT + " set to: {}", useAvroEventFormat);
     }
 
-    zookeeperConnect = context.getString(ZOOKEEPER_CONNECT_FLUME_KEY);
-    migrateZookeeperOffsets = context.getBoolean(MIGRATE_ZOOKEEPER_OFFSETS,
-        DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS);
-
     bootstrapServers = context.getString(BOOTSTRAP_SERVERS);
     if (bootstrapServers == null || bootstrapServers.isEmpty()) {
-      if (zookeeperConnect == null || zookeeperConnect.isEmpty()) {
-        throw new ConfigurationException("Bootstrap Servers must be specified");
-      } else {
-        // For backwards compatibility look up the bootstrap from zookeeper
-        log.warn("{} is deprecated. Please use the parameter {}", ZOOKEEPER_CONNECT_FLUME_KEY, BOOTSTRAP_SERVERS);
-
-        // Lookup configured security protocol, just in case it's not default
-        String securityProtocolStr =
-            context.getSubProperties(KAFKA_CONSUMER_PREFIX)
-                .get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG);
-        if (securityProtocolStr == null || securityProtocolStr.isEmpty()) {
-          securityProtocolStr = CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL;
-        }
-        bootstrapServers =
-            lookupBootstrap(zookeeperConnect, SecurityProtocol.valueOf(securityProtocolStr));
-      }
+      throw new ConfigurationException("Bootstrap Servers must be specified");
     }
 
     String groupIdProperty =
@@ -465,23 +424,6 @@ public class KafkaSource extends AbstractPollableSource
     }
   }
 
-  // We can remove this once the properties are officially deprecated
-  private void translateOldProperties(Context ctx) {
-    // topic
-    String topic = context.getString(TOPIC);
-    if (topic != null && !topic.isEmpty()) {
-      subscriber = new TopicListSubscriber(topic);
-      log.warn("{} is deprecated. Please use the parameter {}", TOPIC, TOPICS);
-    }
-
-    // old groupId
-    groupId = ctx.getString(OLD_GROUP_ID);
-    if (groupId != null && !groupId.isEmpty()) {
-      log.warn("{} is deprecated. Please use the parameter {}", OLD_GROUP_ID,
-              KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG);
-    }
-  }
-
   private void setConsumerProps(Context ctx) {
     kafkaProps.clear();
     kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, DEFAULT_KEY_DESERIALIZER);
@@ -505,30 +447,6 @@ public class KafkaSource extends AbstractPollableSource
     KafkaSSLUtil.addGlobalSSLParameters(kafkaProps);
   }
 
-  /**
-   * Generates the Kafka bootstrap connection string from the metadata stored in Zookeeper.
-   * Allows for backwards compatibility of the zookeeperConnect configuration.
-   */
-  private String lookupBootstrap(String zookeeperConnect, SecurityProtocol securityProtocol) {
-    try (KafkaZkClient zkClient = KafkaZkClient.apply(zookeeperConnect,
-            JaasUtils.isZkSaslEnabled(), ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT, 10,
-            Time.SYSTEM, "kafka.server", "SessionExpireListener",
-            scala.Option.empty(), scala.Option.empty())) {
-      List<Broker> brokerList =
-              JavaConverters.seqAsJavaListConverter(zkClient.getAllBrokersInCluster()).asJava();
-      List<BrokerEndPoint> endPoints = brokerList.stream()
-              .map(broker -> broker.brokerEndPoint(
-                  ListenerName.forSecurityProtocol(securityProtocol))
-              )
-              .collect(Collectors.toList());
-      List<String> connections = new ArrayList<>();
-      for (BrokerEndPoint endPoint : endPoints) {
-        connections.add(endPoint.connectionString());
-      }
-      return StringUtils.join(connections, ',');
-    }
-  }
-
   @VisibleForTesting
   String getBootstrapServers() {
     return bootstrapServers;
@@ -557,21 +475,6 @@ public class KafkaSource extends AbstractPollableSource
   protected void doStart() throws FlumeException {
     log.info("Starting {}...", this);
 
-    // As a migration step check if there are any offsets from the group stored in kafka
-    // If not read them from Zookeeper and commit them to Kafka
-    if (migrateZookeeperOffsets && zookeeperConnect != null && !zookeeperConnect.isEmpty()) {
-      // For simplicity we only support migration of a single topic via the TopicListSubscriber.
-      // There was no way to define a list of topics or a pattern in the previous Flume version.
-      if (subscriber instanceof TopicListSubscriber &&
-          ((TopicListSubscriber) subscriber).get().size() == 1) {
-        String topicStr = ((TopicListSubscriber) subscriber).get().get(0);
-        migrateOffsets(topicStr);
-      } else {
-        log.info("Will not attempt to migrate offsets " +
-            "because multiple topics or a pattern are defined");
-      }
-    }
-
     //initialize a consumer.
     consumer = new KafkaConsumer<String, byte[]>(kafkaProps);
 
@@ -594,64 +497,6 @@ public class KafkaSource extends AbstractPollableSource
     log.info("Kafka Source {} stopped. Metrics: {}", getName(), counter);
   }
 
-  private void migrateOffsets(String topicStr) {
-    try (KafkaZkClient zkClient = KafkaZkClient.apply(zookeeperConnect,
-            JaasUtils.isZkSaslEnabled(), ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT, 10,
-            Time.SYSTEM, "kafka.server", "SessionExpireListener",
-            scala.Option.empty(), scala.Option.empty());
-         KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(kafkaProps)) {
-      Map<TopicPartition, OffsetAndMetadata> kafkaOffsets =
-          getKafkaOffsets(consumer, topicStr);
-      if (kafkaOffsets == null) {
-        log.warn("Topic " + topicStr + " not found in Kafka. Offset migration will be skipped.");
-        return;
-      }
-      if (!kafkaOffsets.isEmpty()) {
-        log.info("Found Kafka offsets for topic " + topicStr +
-            ". Will not migrate from zookeeper");
-        log.debug("Offsets found: {}", kafkaOffsets);
-        return;
-      }
-
-      log.info("No Kafka offsets found. Migrating zookeeper offsets");
-      Map<TopicPartition, OffsetAndMetadata> zookeeperOffsets =
-          getZookeeperOffsets(zkClient, consumer, topicStr);
-      if (zookeeperOffsets.isEmpty()) {
-        log.warn("No offsets to migrate found in Zookeeper");
-        return;
-      }
-
-      log.info("Committing Zookeeper offsets to Kafka");
-      log.debug("Offsets to commit: {}", zookeeperOffsets);
-      consumer.commitSync(zookeeperOffsets);
-      // Read the offsets to verify they were committed
-      Map<TopicPartition, OffsetAndMetadata> newKafkaOffsets =
-          getKafkaOffsets(consumer, topicStr);
-      log.debug("Offsets committed: {}", newKafkaOffsets);
-      if (newKafkaOffsets == null
-          || !newKafkaOffsets.keySet().containsAll(zookeeperOffsets.keySet())) {
-        throw new FlumeException("Offsets could not be committed");
-      }
-    }
-  }
-
-  private Map<TopicPartition, OffsetAndMetadata> getKafkaOffsets(
-      KafkaConsumer<String, byte[]> client, String topicStr) {
-    Map<TopicPartition, OffsetAndMetadata> offsets = null;
-    List<PartitionInfo> partitions = client.partitionsFor(topicStr);
-    if (partitions != null) {
-      offsets = new HashMap<>();
-      for (PartitionInfo partition : partitions) {
-        TopicPartition key = new TopicPartition(topicStr, partition.partition());
-        OffsetAndMetadata offsetAndMetadata = client.committed(key);
-        if (offsetAndMetadata != null) {
-          offsets.put(key, offsetAndMetadata);
-        }
-      }
-    }
-    return offsets;
-  }
-
   private Map<TopicPartition, OffsetAndMetadata> getZookeeperOffsets(
           KafkaZkClient zkClient, KafkaConsumer<String, byte[]> consumer, String topicStr) {
 
diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
index 5c7857d1b..97216e159 100644
--- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
+++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
@@ -38,17 +38,9 @@ public class KafkaSourceConstants {
   public static final String DEFAULT_GROUP_ID = "flume";
   public static final String KAFKA_HEADER = "header.";
 
-  public static final String MIGRATE_ZOOKEEPER_OFFSETS = "migrateZookeeperOffsets";
-  public static final boolean DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS = true;
-
   public static final String AVRO_EVENT = "useFlumeEventFormat";
   public static final boolean DEFAULT_AVRO_EVENT = false;
 
-  /* Old Properties */
-  public static final String ZOOKEEPER_CONNECT_FLUME_KEY = "zookeeperConnect";
-  public static final String TOPIC = "topic";
-  public static final String OLD_GROUP_ID = "groupId";
-
   // flume event headers
   public static final String DEFAULT_TOPIC_HEADER = "topic";
   public static final String KEY_HEADER = "key";
diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
index b2deea9c9..e2c4d8f0e 100644
--- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
+++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
@@ -17,7 +17,8 @@
 package org.apache.flume.source.kafka;
 
 import kafka.server.KafkaConfig;
-import kafka.server.KafkaServerStartable;
+import kafka.server.KafkaServer;
+import kafka.utils.TestUtils;
 import org.apache.commons.io.FileUtils;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.CreateTopicsResult;
@@ -28,6 +29,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Time;
 
 import java.io.File;
 import java.io.IOException;
@@ -40,16 +42,13 @@ import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG;
-import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG;
-import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG;
-import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG;
+import static org.apache.kafka.common.config.SslConfigs.*;
 
 public class KafkaSourceEmbeddedKafka {
 
   public static String HOST = InetAddress.getLoopbackAddress().getCanonicalHostName();
 
-  KafkaServerStartable kafkaServer;
+  KafkaServer kafkaServer;
   KafkaSourceEmbeddedZookeeper zookeeper;
   private AdminClient adminClient;
 
@@ -103,7 +102,7 @@ public class KafkaSourceEmbeddedKafka {
       props.putAll(properties);
     }
     KafkaConfig config = new KafkaConfig(props);
-    kafkaServer = new KafkaServerStartable(config);
+    kafkaServer = TestUtils.createServer(config, Time.SYSTEM);
     kafkaServer.startup();
     initProducer();
   }
diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
index d5caf7ceb..2d67f104c 100644
--- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
+++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
@@ -19,7 +19,6 @@ package org.apache.flume.source.kafka;
 
 import com.google.common.base.Charsets;
 import com.google.common.collect.Lists;
-import kafka.zk.KafkaZkClient;
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.specific.SpecificDatumWriter;
@@ -37,19 +36,12 @@ import org.apache.flume.source.avro.AvroFlumeEvent;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TopicExistsException;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.header.internals.RecordHeaders;
-import org.apache.kafka.common.security.JaasUtils;
 import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.utils.Time;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -71,7 +63,6 @@ import java.time.Duration;
 import java.time.ZoneId;
 import java.time.ZonedDateTime;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -87,13 +78,10 @@ import static org.apache.flume.source.kafka.KafkaSourceConstants.BOOTSTRAP_SERVE
 import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_AUTO_COMMIT;
 import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_TOPIC_HEADER;
 import static org.apache.flume.source.kafka.KafkaSourceConstants.KAFKA_CONSUMER_PREFIX;
-import static org.apache.flume.source.kafka.KafkaSourceConstants.OLD_GROUP_ID;
 import static org.apache.flume.source.kafka.KafkaSourceConstants.PARTITION_HEADER;
 import static org.apache.flume.source.kafka.KafkaSourceConstants.TIMESTAMP_HEADER;
-import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPIC;
 import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPICS;
 import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPICS_REGEX;
-import static org.apache.flume.source.kafka.KafkaSourceConstants.ZOOKEEPER_CONNECT_FLUME_KEY;
 import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
 import static org.apache.kafka.common.config.SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG;
 import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG;
@@ -586,47 +574,11 @@ public class TestKafkaSource {
                  kafkaProps.getProperty("bootstrap.servers"));
   }
 
-  @Test
-  public void testOldProperties() {
-    Context context = new Context();
-
-    context.put(TOPIC, "old.topic");
-    context.put(OLD_GROUP_ID, "old.groupId");
-    context.put(BOOTSTRAP_SERVERS, "real-bootstrap-servers-list");
-    KafkaSource source = new KafkaSource();
-    source.doConfigure(context);
-    Properties kafkaProps = source.getConsumerProps();
-
-    KafkaSource.Subscriber<List<String>> subscriber = source.getSubscriber();
-    //check topic was set
-    assertEquals("old.topic", subscriber.get().get(0));
-    //check that kafka old properties override the default and get correct name
-    assertEquals("old.groupId", kafkaProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG));
-
-    source = new KafkaSource();
-    context.put(KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG, "override.old.group.id");
-    source.doConfigure(context);
-    kafkaProps = source.getConsumerProps();
-    //check that kafka new properties override old
-    assertEquals("override.old.group.id", kafkaProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG));
-
-    context.clear();
-    context.put(BOOTSTRAP_SERVERS, "real-bootstrap-servers-list");
-    context.put(TOPIC, "old.topic");
-    source = new KafkaSource();
-    source.doConfigure(context);
-    kafkaProps = source.getConsumerProps();
-    //check defaults set
-    assertEquals(KafkaSourceConstants.DEFAULT_GROUP_ID,
-                 kafkaProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG));
-  }
-
   @Test
   public void testPatternBasedSubscription() {
     Context context = new Context();
 
     context.put(TOPICS_REGEX, "^topic[0-9]$");
-    context.put(OLD_GROUP_ID, "old.groupId");
     context.put(BOOTSTRAP_SERVERS, "real-bootstrap-servers-list");
     KafkaSource source = new KafkaSource();
     source.doConfigure(context);
@@ -709,49 +661,6 @@ public class TestKafkaSource {
 
   }
 
-  @Test
-  public void testBootstrapLookup() {
-    Context context = new Context();
-
-    context.put(ZOOKEEPER_CONNECT_FLUME_KEY, kafkaServer.getZkConnectString());
-    context.put(TOPIC, "old.topic");
-    context.put(OLD_GROUP_ID, "old.groupId");
-    KafkaSource source = new KafkaSource();
-    source.doConfigure(context);
-    String sourceServers = source.getBootstrapServers();
-    sourceServers = sourceServers.substring(0, sourceServers.indexOf(':'));
-
-    String kafkaServers = kafkaServer.getBootstrapServers();
-    kafkaServers = kafkaServers.substring(0, kafkaServers.indexOf(':'));
-
-    List<String> possibleValues = Arrays.asList("localhost", "127.0.0.1");
-
-    if (!(possibleValues.contains(sourceServers) && possibleValues.contains(kafkaServers))) {
-      fail("Expected either 'localhost' or '127.0.0.1'. Source: "
-              + sourceServers + ", Kafka: " + kafkaServers);
-    }
-  }
-
-  @Test
-  public void testMigrateOffsetsNone() throws Exception {
-    doTestMigrateZookeeperOffsets(false, false, "testMigrateOffsets-none");
-  }
-
-  @Test
-  public void testMigrateOffsetsZookeeper() throws Exception {
-    doTestMigrateZookeeperOffsets(true, false, "testMigrateOffsets-zookeeper");
-  }
-
-  @Test
-  public void testMigrateOffsetsKafka() throws Exception {
-    doTestMigrateZookeeperOffsets(false, true, "testMigrateOffsets-kafka");
-  }
-
-  @Test
-  public void testMigrateOffsetsBoth() throws Exception {
-    doTestMigrateZookeeperOffsets(true, true, "testMigrateOffsets-both");
-  }
-
   /**
    * Tests that sub-properties (kafka.consumer.*) apply correctly across multiple invocations
    * of configure() (fix for FLUME-2857).
@@ -766,7 +675,7 @@ public class TestKafkaSource {
     Context context = prepareDefaultContext(group);
     context.put(KafkaSourceConstants.KAFKA_CONSUMER_PREFIX + sampleConsumerProp,
         sampleConsumerVal);
-    context.put(TOPIC, "random-topic");
+    context.put(TOPICS, "random-topic");
 
     kafkaSource.configure(context);
 
@@ -774,7 +683,7 @@ public class TestKafkaSource {
         kafkaSource.getConsumerProps().getProperty(sampleConsumerProp));
 
     context = prepareDefaultContext(group);
-    context.put(TOPIC, "random-topic");
+    context.put(TOPICS, "random-topic");
 
     kafkaSource.configure(context);
     Assert.assertNull(kafkaSource.getConsumerProps().getProperty(sampleConsumerProp));
@@ -913,110 +822,6 @@ public class TestKafkaSource {
     kafkaSource.stop();
   }
 
-  private void doTestMigrateZookeeperOffsets(boolean hasZookeeperOffsets, boolean hasKafkaOffsets,
-                                            String group) throws Exception {
-    // create a topic with 1 partition for simplicity
-    String topic = findUnusedTopic();
-    kafkaServer.createTopic(topic, 1);
-
-    Context context = prepareDefaultContext(group);
-    context.put(ZOOKEEPER_CONNECT_FLUME_KEY, kafkaServer.getZkConnectString());
-    context.put(TOPIC, topic);
-    KafkaSource source = new KafkaSource();
-    source.doConfigure(context);
-
-    // Produce some data and save an offset
-    Long fifthOffset = 0L;
-    Long tenthOffset = 0L;
-    Properties props = createProducerProps(kafkaServer.getBootstrapServers());
-    KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);
-    for (int i = 1; i <= 50; i++) {
-      ProducerRecord<String, byte[]> data =
-          new ProducerRecord<>(topic, null, String.valueOf(i).getBytes());
-      RecordMetadata recordMetadata = producer.send(data).get();
-      if (i == 5) {
-        fifthOffset = recordMetadata.offset();
-      }
-      if (i == 10) {
-        tenthOffset = recordMetadata.offset();
-      }
-    }
-
-    // Commit 10th offset to zookeeper
-    if (hasZookeeperOffsets) {
-      KafkaZkClient zkClient = KafkaZkClient.apply(kafkaServer.getZkConnectString(),
-              JaasUtils.isZkSaslEnabled(), 30000, 30000, 10, Time.SYSTEM,
-              "kafka.server", "SessionExpireListener", scala.Option.empty(), scala.Option.empty());
-      zkClient.getConsumerOffset(group, new TopicPartition(topic, 0));
-      Long offset = tenthOffset + 1;
-      zkClient.setOrCreateConsumerOffset(group, new TopicPartition(topic, 0), offset);
-      zkClient.close();
-    }
-
-    // Commit 5th offset to kafka
-    if (hasKafkaOffsets) {
-      Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
-      offsets.put(new TopicPartition(topic, 0), new OffsetAndMetadata(fifthOffset + 1));
-      KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(source.getConsumerProps());
-      consumer.commitSync(offsets);
-      consumer.close();
-    }
-
-    // Start the source and read some data
-    source.setChannelProcessor(createGoodChannel());
-    source.start();
-    for (int i = 0; i < 3; i++) {
-      source.process();
-      Thread.sleep(1000);
-    }
-
-    Thread.sleep(500L);
-    source.process();
-    List<Integer> finals = new ArrayList<Integer>(40);
-    for (Event event: events) {
-      finals.add(Integer.parseInt(new String(event.getBody())));
-    }
-    source.stop();
-
-    if (!hasKafkaOffsets && !hasZookeeperOffsets) {
-      // The default behavior is to start at the latest message in the log
-      org.junit.Assert.assertTrue("Source should read no messages", finals.isEmpty());
-    } else if (hasKafkaOffsets && hasZookeeperOffsets) {
-      // Respect Kafka offsets if they exist
-      org.junit.Assert.assertFalse("Source should not read the 5th message", finals.contains(5));
-      org.junit.Assert.assertTrue("Source should read the 6th message", finals.contains(6));
-    } else if (hasKafkaOffsets) {
-      // Respect Kafka offsets if they exist (don't fail if zookeeper offsets are missing)
-      org.junit.Assert.assertFalse("Source should not read the 5th message", finals.contains(5));
-      org.junit.Assert.assertTrue("Source should read the 6th message", finals.contains(6));
-    } else {
-      // Otherwise migrate the ZooKeeper offsets if they exist
-      org.junit.Assert.assertFalse("Source should not read the 10th message", finals.contains(10));
-      org.junit.Assert.assertTrue("Source should read the 11th message", finals.contains(11));
-    }
-  }
-
-  @Test
-  public void testMigrateZookeeperOffsetsWhenTopicNotExists() throws Exception {
-    String topic = findUnusedTopic();
-
-    Context context = prepareDefaultContext("testMigrateOffsets-nonExistingTopic");
-    context.put(ZOOKEEPER_CONNECT_FLUME_KEY, kafkaServer.getZkConnectString());
-    context.put(TOPIC, topic);
-    KafkaSource source = new KafkaSource();
-    source.doConfigure(context);
-
-    source.setChannelProcessor(createGoodChannel());
-    source.start();
-
-    assertEquals(LifecycleState.START, source.getLifecycleState());
-
-    Status status = source.process();
-    assertEquals(Status.BACKOFF, status);
-
-    source.stop();
-  }
-
   ChannelProcessor createGoodChannel() {
 
     ChannelProcessor channelProcessor = mock(ChannelProcessor.class);
diff --git a/flume-shared/flume-shared-kafka-test/pom.xml b/flume-shared/flume-shared-kafka-test/pom.xml
index d2e10692d..ebfb0b24d 100644
--- a/flume-shared/flume-shared-kafka-test/pom.xml
+++ b/flume-shared/flume-shared-kafka-test/pom.xml
@@ -81,7 +81,6 @@
       <artifactId>kafka-clients</artifactId>
       <scope>provided</scope>
     </dependency>
-
   </dependencies>
 
 </project>
diff --git a/pom.xml b/pom.xml
index 0bc9ff252..8be106499 100644
--- a/pom.xml
+++ b/pom.xml
@@ -85,7 +85,7 @@ limitations under the License.
     <jdom.version>1.1.3</jdom.version>
     <joda-time.version>2.9.9</joda-time.version>
     <junit.version>4.13.2</junit.version>
-    <kafka.version>2.7.2</kafka.version>
+    <kafka.version>3.3.1</kafka.version>
     <kite.version>1.1.0</kite.version>
     <kudu.version>1.10.0</kudu.version>
     <lifecycle-mapping.version>1.0.0</lifecycle-mapping.version>