You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by tr...@apache.org on 2022/10/30 13:19:23 UTC
[flume] branch trunk updated: FLUME-3443 FLUME-2985 Update to Apache Kafka 3.3.1 and remove deprecated properties and Zookeeper offset migration (relating to 0.8 to 0.9 migration) (#389)
This is an automated email from the ASF dual-hosted git repository.
tristan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/flume.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3d8d59136 FLUME-3443 FLUME-2985 Update to Apache Kafka 3.3.1 and remove deprecated properties and Zookeeper offset migration (relating to 0.8 to 0.9 migration) (#389)
3d8d59136 is described below
commit 3d8d59136c8a0f2b5e1125a995836e39ecdfea08
Author: Tristan Stevens <tm...@users.noreply.github.com>
AuthorDate: Sun Oct 30 13:19:18 2022 +0000
FLUME-3443 FLUME-2985 Update to Apache Kafka 3.3.1 and remove deprecated properties and Zookeeper offset migration (relating to 0.8 to 0.9 migration) (#389)
---
dev-docs/UpdateLicenses.md | 6 +-
flume-ng-channels/flume-kafka-channel/pom.xml | 11 ++
.../apache/flume/channel/kafka/KafkaChannel.java | 154 ----------------
.../channel/kafka/KafkaChannelConfiguration.java | 18 --
.../channel/kafka/TestBasicFunctionality.java | 28 ---
.../channel/kafka/TestOffsetsAndMigration.java | 133 --------------
flume-ng-sinks/flume-ng-kafka-sink/pom.xml | 13 ++
.../org/apache/flume/sink/kafka/KafkaSink.java | 61 -------
.../flume/sink/kafka/KafkaSinkConstants.java | 9 -
.../org/apache/flume/sink/kafka/TestKafkaSink.java | 24 ---
.../apache/flume/sink/kafka/util/KafkaLocal.java | 8 +-
flume-ng-sources/flume-kafka-source/pom.xml | 8 +
.../org/apache/flume/source/kafka/KafkaSource.java | 157 +---------------
.../flume/source/kafka/KafkaSourceConstants.java | 8 -
.../source/kafka/KafkaSourceEmbeddedKafka.java | 13 +-
.../apache/flume/source/kafka/TestKafkaSource.java | 199 +--------------------
flume-shared/flume-shared-kafka-test/pom.xml | 1 -
pom.xml | 2 +-
18 files changed, 50 insertions(+), 803 deletions(-)
diff --git a/dev-docs/UpdateLicenses.md b/dev-docs/UpdateLicenses.md
index 6f6fdfb2d..4d74d7498 100644
--- a/dev-docs/UpdateLicenses.md
+++ b/dev-docs/UpdateLicenses.md
@@ -524,9 +524,9 @@ Findbugs.
same as jar-with-dependencies
```
- kafka-clients-2.7.2.jar
- kafka-raft-2.7.2.jar
- kafka_2.13-2.7.2.jar
+ kafka-clients-3.3.1.jar
+ kafka-raft-3.3.1.jar
+ kafka_2.13-3.3.1.jar
```
ALv2. Entry in NOTICE. Additional entry in NOTICE and LICENSE for
diff --git a/flume-ng-channels/flume-kafka-channel/pom.xml b/flume-ng-channels/flume-kafka-channel/pom.xml
index 61e55905d..0fd70bf3a 100644
--- a/flume-ng-channels/flume-kafka-channel/pom.xml
+++ b/flume-ng-channels/flume-kafka-channel/pom.xml
@@ -57,9 +57,20 @@ limitations under the License.
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.version}</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_${scala.version}</artifactId>
+ <classifier>test</classifier>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
+ <version>${kafka.version}</version>
+ <classifier>test</classifier>
</dependency>
<dependency>
<groupId>org.apache.flume.flume-ng-sinks</groupId>
diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
index 65cad9937..8336e94e2 100644
--- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
+++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
@@ -20,7 +20,6 @@ package org.apache.flume.channel.kafka;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
-import kafka.zk.KafkaZkClient;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DecoderFactory;
@@ -50,15 +49,11 @@ import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.security.JaasUtils;
-import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.Option;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -80,30 +75,24 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.BOOTSTRAP_SERVERS_CONFIG;
-import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.BROKER_LIST_FLUME_KEY;
import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_ACKS;
import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_AUTO_OFFSET_RESET;
import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_GROUP_ID;
import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_KEY_DESERIALIZER;
import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_KEY_SERIALIZER;
-import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS;
import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_PARSE_AS_FLUME_EVENT;
import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_POLL_TIMEOUT;
import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_TOPIC;
import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_VALUE_DESERIAIZER;
import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_VALUE_SERIAIZER;
-import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.GROUP_ID_FLUME;
import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.KAFKA_CONSUMER_PREFIX;
import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.KAFKA_PRODUCER_PREFIX;
import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.KEY_HEADER;
-import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.MIGRATE_ZOOKEEPER_OFFSETS;
import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.PARSE_AS_FLUME_EVENT;
import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.PARTITION_HEADER_NAME;
import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.POLL_TIMEOUT;
-import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.READ_SMALLEST_OFFSET;
import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.STATIC_PARTITION_CONF;
import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.TOPIC_CONFIG;
-import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.ZOOKEEPER_CONNECT_FLUME_KEY;
import static org.apache.flume.shared.kafka.KafkaSSLUtil.SSL_DISABLE_FQDN_CHECK;
import static org.apache.flume.shared.kafka.KafkaSSLUtil.isSSLEnabled;
@@ -112,10 +101,6 @@ public class KafkaChannel extends BasicChannelSemantics {
private static final Logger logger =
LoggerFactory.getLogger(KafkaChannel.class);
- // Constants used only for offset migration zookeeper connections
- private static final int ZK_SESSION_TIMEOUT = 30000;
- private static final int ZK_CONNECTION_TIMEOUT = 30000;
-
private final Properties consumerProps = new Properties();
private final Properties producerProps = new Properties();
@@ -124,13 +109,10 @@ public class KafkaChannel extends BasicChannelSemantics {
private AtomicReference<String> topic = new AtomicReference<String>();
private boolean parseAsFlumeEvent = DEFAULT_PARSE_AS_FLUME_EVENT;
- private String zookeeperConnect = null;
private String topicStr = DEFAULT_TOPIC;
private String groupId = DEFAULT_GROUP_ID;
private String partitionHeader = null;
private Integer staticPartitionId;
- @Deprecated
- private boolean migrateZookeeperOffsets = DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS;
// used to indicate if a rebalance has occurred during the current transaction
AtomicBoolean rebalanceFlag = new AtomicBoolean();
@@ -159,11 +141,6 @@ public class KafkaChannel extends BasicChannelSemantics {
@Override
public void start() {
logger.info("Starting Kafka Channel: {}", getName());
- // As a migration step check if there are any offsets from the group stored in kafka
- // If not read them from Zookeeper and commit them to Kafka
- if (migrateZookeeperOffsets && zookeeperConnect != null && !zookeeperConnect.isEmpty()) {
- migrateOffsets();
- }
producer = new KafkaProducer<String, byte[]>(producerProps);
// We always have just one topic being read by one thread
logger.info("Topic = {}", topic.get());
@@ -193,10 +170,6 @@ public class KafkaChannel extends BasicChannelSemantics {
@Override
public void configure(Context ctx) {
-
- // Can remove in the next release
- translateOldProps(ctx);
-
topicStr = ctx.getString(TOPIC_CONFIG);
if (topicStr == null || topicStr.isEmpty()) {
topicStr = DEFAULT_TOPIC;
@@ -224,10 +197,6 @@ public class KafkaChannel extends BasicChannelSemantics {
staticPartitionId = ctx.getInteger(STATIC_PARTITION_CONF);
partitionHeader = ctx.getString(PARTITION_HEADER_NAME);
- migrateZookeeperOffsets = ctx.getBoolean(MIGRATE_ZOOKEEPER_OFFSETS,
- DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS);
- zookeeperConnect = ctx.getString(ZOOKEEPER_CONNECT_FLUME_KEY);
-
if (logger.isDebugEnabled() && LogPrivacyUtil.allowLogPrintConfig()) {
logger.debug("Kafka properties: {}", ctx);
}
@@ -237,57 +206,6 @@ public class KafkaChannel extends BasicChannelSemantics {
}
}
- // We can remove this once the properties are officially deprecated
- private void translateOldProps(Context ctx) {
-
- if (!(ctx.containsKey(TOPIC_CONFIG))) {
- ctx.put(TOPIC_CONFIG, ctx.getString("topic"));
- logger.warn("{} is deprecated. Please use the parameter {}", "topic", TOPIC_CONFIG);
- }
-
- // Broker List
- // If there is no value we need to check and set the old param and log a warning message
- if (!(ctx.containsKey(BOOTSTRAP_SERVERS_CONFIG))) {
- String brokerList = ctx.getString(BROKER_LIST_FLUME_KEY);
- if (brokerList == null || brokerList.isEmpty()) {
- throw new ConfigurationException("Bootstrap Servers must be specified");
- } else {
- ctx.put(BOOTSTRAP_SERVERS_CONFIG, brokerList);
- logger.warn("{} is deprecated. Please use the parameter {}",
- BROKER_LIST_FLUME_KEY, BOOTSTRAP_SERVERS_CONFIG);
- }
- }
-
- // GroupId
- // If there is an old Group Id set, then use that if no groupId is set.
- if (!(ctx.containsKey(KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG))) {
- String oldGroupId = ctx.getString(GROUP_ID_FLUME);
- if (oldGroupId != null && !oldGroupId.isEmpty()) {
- ctx.put(KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG, oldGroupId);
- logger.warn("{} is deprecated. Please use the parameter {}",
- GROUP_ID_FLUME, KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG);
- }
- }
-
- if (!(ctx.containsKey((KAFKA_CONSUMER_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)))) {
- Boolean oldReadSmallest = ctx.getBoolean(READ_SMALLEST_OFFSET);
- String auto;
- if (oldReadSmallest != null) {
- if (oldReadSmallest) {
- auto = "earliest";
- } else {
- auto = "latest";
- }
- ctx.put(KAFKA_CONSUMER_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,auto);
- logger.warn("{} is deprecated. Please use the parameter {}",
- READ_SMALLEST_OFFSET,
- KAFKA_CONSUMER_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
- }
-
- }
- }
-
-
private void setProducerProps(Context ctx, String bootStrapServers) {
producerProps.clear();
producerProps.put(ProducerConfig.ACKS_CONFIG, DEFAULT_ACKS);
@@ -352,78 +270,6 @@ public class KafkaChannel extends BasicChannelSemantics {
}
}
- private void migrateOffsets() {
- try (KafkaZkClient zkClient = KafkaZkClient.apply(zookeeperConnect,
- JaasUtils.isZkSaslEnabled(), ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT, 10,
- Time.SYSTEM, "kafka.server", "SessionExpireListener", scala.Option.empty(),
- scala.Option.empty());
- KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(consumerProps)) {
- Map<TopicPartition, OffsetAndMetadata> kafkaOffsets = getKafkaOffsets(consumer);
- if (kafkaOffsets == null) {
- logger.warn("Topic " + topicStr + " not found in Kafka. Offset migration will be skipped.");
- return;
- }
- if (!kafkaOffsets.isEmpty()) {
- logger.info("Found Kafka offsets for topic {}. Will not migrate from zookeeper", topicStr);
- logger.debug("Offsets found: {}", kafkaOffsets);
- return;
- }
-
- logger.info("No Kafka offsets found. Migrating zookeeper offsets");
- Map<TopicPartition, OffsetAndMetadata> zookeeperOffsets =
- getZookeeperOffsets(zkClient, consumer);
- if (zookeeperOffsets.isEmpty()) {
- logger.warn("No offsets to migrate found in Zookeeper");
- return;
- }
-
- logger.info("Committing Zookeeper offsets to Kafka");
- logger.debug("Offsets to commit: {}", zookeeperOffsets);
- consumer.commitSync(zookeeperOffsets);
- // Read the offsets to verify they were committed
- Map<TopicPartition, OffsetAndMetadata> newKafkaOffsets = getKafkaOffsets(consumer);
- logger.debug("Offsets committed: {}", newKafkaOffsets);
- if (newKafkaOffsets == null
- || !newKafkaOffsets.keySet().containsAll(zookeeperOffsets.keySet())) {
- throw new FlumeException("Offsets could not be committed");
- }
- }
- }
-
-
- private Map<TopicPartition, OffsetAndMetadata> getKafkaOffsets(
- KafkaConsumer<String, byte[]> client) {
- Map<TopicPartition, OffsetAndMetadata> offsets = null;
- List<PartitionInfo> partitions = client.partitionsFor(topicStr);
- if (partitions != null) {
- offsets = new HashMap<>();
- for (PartitionInfo partition : partitions) {
- TopicPartition key = new TopicPartition(topicStr, partition.partition());
- OffsetAndMetadata offsetAndMetadata = client.committed(key);
- if (offsetAndMetadata != null) {
- offsets.put(key, offsetAndMetadata);
- }
- }
- }
- return offsets;
- }
-
- private Map<TopicPartition, OffsetAndMetadata> getZookeeperOffsets(
- KafkaZkClient zkClient, KafkaConsumer<String, byte[]> consumer) {
- Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
- List<PartitionInfo> partitions = consumer.partitionsFor(topicStr);
- for (PartitionInfo partition : partitions) {
- TopicPartition topicPartition = new TopicPartition(topicStr, partition.partition());
- Option<Object> optionOffset = zkClient.getConsumerOffset(groupId, topicPartition);
- if (optionOffset.nonEmpty()) {
- Long offset = (Long) optionOffset.get();
- OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(offset);
- offsets.put(topicPartition, offsetAndMetadata);
- }
- }
- return offsets;
- }
-
private void decommissionConsumerAndRecords(ConsumerAndRecords c) {
c.consumer.wakeup();
c.consumer.close();
diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java
index ad5a15bf8..87fcbc1a3 100644
--- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java
+++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java
@@ -52,22 +52,4 @@ public class KafkaChannelConfiguration {
public static final String PARTITION_HEADER_NAME = "partitionIdHeader";
public static final String STATIC_PARTITION_CONF = "defaultPartitionId";
- public static final String MIGRATE_ZOOKEEPER_OFFSETS = "migrateZookeeperOffsets";
- public static final boolean DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS = true;
-
- /*** Old Configuration Parameters ****/
- public static final String BROKER_LIST_KEY = "metadata.broker.list";
- public static final String REQUIRED_ACKS_KEY = "request.required.acks";
- public static final String BROKER_LIST_FLUME_KEY = "brokerList";
- //public static final String TOPIC = "topic";
- public static final String GROUP_ID_FLUME = "groupId";
- public static final String AUTO_COMMIT_ENABLED = "auto.commit.enable";
- public static final String ZOOKEEPER_CONNECT = "zookeeper.connect";
- public static final String ZOOKEEPER_CONNECT_FLUME_KEY = "zookeeperConnect";
- public static final String TIMEOUT = "timeout";
- public static final String DEFAULT_TIMEOUT = "100";
- public static final String CONSUMER_TIMEOUT = "consumer.timeout.ms";
-
- public static final String READ_SMALLEST_OFFSET = "readSmallestOffset";
- public static final boolean DEFAULT_READ_SMALLEST_OFFSET = false;
}
diff --git a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestBasicFunctionality.java b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestBasicFunctionality.java
index d119b429f..953133ff0 100644
--- a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestBasicFunctionality.java
+++ b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestBasicFunctionality.java
@@ -23,8 +23,6 @@ import org.apache.flume.Event;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurables;
import org.apache.flume.instrumentation.kafka.KafkaChannelCounter;
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.Assert;
@@ -39,10 +37,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.BOOTSTRAP_SERVERS_CONFIG;
-import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.BROKER_LIST_FLUME_KEY;
-import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.GROUP_ID_FLUME;
import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.KEY_HEADER;
-import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.READ_SMALLEST_OFFSET;
import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.TOPIC_CONFIG;
public class TestBasicFunctionality extends TestKafkaChannelBase {
@@ -65,29 +60,6 @@ public class TestBasicFunctionality extends TestKafkaChannelBase {
Assert.assertEquals(consumerProps.getProperty("another-parameter"), "1");
}
- @Test
- public void testOldConfig() throws Exception {
- Context context = new Context();
- context.put(BROKER_LIST_FLUME_KEY, testUtil.getKafkaServerUrl());
- context.put(GROUP_ID_FLUME, "flume-something");
- context.put(READ_SMALLEST_OFFSET, "true");
- context.put("topic", topic);
-
- final KafkaChannel channel = new KafkaChannel();
- Configurables.configure(channel, context);
-
- Properties consumerProps = channel.getConsumerProps();
- Properties producerProps = channel.getProducerProps();
-
- Assert.assertEquals(producerProps.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG),
- testUtil.getKafkaServerUrl());
- Assert.assertEquals(consumerProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG),
- "flume-something");
- Assert.assertEquals(consumerProps.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
- "earliest");
- }
-
-
@Test
public void testStopAndStart() throws Exception {
doTestStopAndStart(false, false);
diff --git a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestOffsetsAndMigration.java b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestOffsetsAndMigration.java
index a399b543b..e58b5b158 100644
--- a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestOffsetsAndMigration.java
+++ b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestOffsetsAndMigration.java
@@ -18,33 +18,14 @@
*/
package org.apache.flume.channel.kafka;
-import kafka.zk.KafkaZkClient;
-import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.Transaction;
-import org.apache.flume.lifecycle.LifecycleState;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.security.JaasUtils;
-import org.apache.kafka.common.utils.Time;
import org.junit.Assert;
import org.junit.Test;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.Executors;
-
-import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.GROUP_ID_FLUME;
-import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.ZOOKEEPER_CONNECT_FLUME_KEY;
public class TestOffsetsAndMigration extends TestKafkaChannelBase {
@@ -77,26 +58,6 @@ public class TestOffsetsAndMigration extends TestKafkaChannelBase {
Assert.assertTrue(Arrays.equals(message.getBytes(), event.getBody()));
}
- @Test
- public void testMigrateOffsetsNone() throws Exception {
- doTestMigrateZookeeperOffsets(false, false, "testMigrateOffsets-none");
- }
-
- @Test
- public void testMigrateOffsetsZookeeper() throws Exception {
- doTestMigrateZookeeperOffsets(true, false, "testMigrateOffsets-zookeeper");
- }
-
- @Test
- public void testMigrateOffsetsKafka() throws Exception {
- doTestMigrateZookeeperOffsets(false, true, "testMigrateOffsets-kafka");
- }
-
- @Test
- public void testMigrateOffsetsBoth() throws Exception {
- doTestMigrateZookeeperOffsets(true, true, "testMigrateOffsets-both");
- }
-
private Event takeEventWithoutCommittingTxn(KafkaChannel channel) {
for (int i = 0; i < 10; i++) {
Transaction txn = channel.getTransaction();
@@ -113,98 +74,4 @@ public class TestOffsetsAndMigration extends TestKafkaChannelBase {
return null;
}
- private void doTestMigrateZookeeperOffsets(boolean hasZookeeperOffsets, boolean hasKafkaOffsets,
- String group) throws Exception {
- // create a topic with 1 partition for simplicity
- topic = findUnusedTopic();
- createTopic(topic, 1);
-
- Context context = prepareDefaultContext(false);
- context.put(ZOOKEEPER_CONNECT_FLUME_KEY, testUtil.getZkUrl());
- context.put(GROUP_ID_FLUME, group);
- final KafkaChannel channel = createChannel(context);
-
- // Produce some data and save an offset
- Long fifthOffset = 0L;
- Long tenthOffset = 0L;
- Properties props = channel.getProducerProps();
- KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);
- for (int i = 1; i <= 50; i++) {
- ProducerRecord<String, byte[]> data =
- new ProducerRecord<>(topic, null, String.valueOf(i).getBytes());
- RecordMetadata recordMetadata = producer.send(data).get();
- if (i == 5) {
- fifthOffset = recordMetadata.offset();
- }
- if (i == 10) {
- tenthOffset = recordMetadata.offset();
- }
- }
-
- // Commit 10th offset to zookeeper
- if (hasZookeeperOffsets) {
- KafkaZkClient zkClient = KafkaZkClient.apply(testUtil.getZkUrl(),
- JaasUtils.isZkSaslEnabled(), 30000, 30000, 10, Time.SYSTEM,
- "kafka.server", "SessionExpireListener", scala.Option.empty(), scala.Option.empty());
- zkClient.getConsumerOffset(group, new TopicPartition(topic, 0));
- Long offset = tenthOffset + 1;
- zkClient.setOrCreateConsumerOffset(group, new TopicPartition(topic, 0), offset);
- zkClient.close();
- }
-
- // Commit 5th offset to kafka
- if (hasKafkaOffsets) {
- Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
- offsets.put(new TopicPartition(topic, 0), new OffsetAndMetadata(fifthOffset + 1));
- KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(channel.getConsumerProps());
- consumer.commitSync(offsets);
- consumer.close();
- }
-
- // Start the channel and read some data
- channel.start();
- ExecutorCompletionService<Void> submitterSvc = new
- ExecutorCompletionService<>(Executors.newCachedThreadPool());
- List<Event> events = pullEvents(channel, submitterSvc,
- 20, false, false);
- wait(submitterSvc, 5);
- List<Integer> finals = new ArrayList<>(40);
- for (Event event : events) {
- finals.add(Integer.parseInt(new String(event.getBody())));
- }
- channel.stop();
-
- if (!hasKafkaOffsets && !hasZookeeperOffsets) {
- // The default behavior is to read the entire log
- Assert.assertTrue("Channel should read the the first message", finals.contains(1));
- } else if (hasKafkaOffsets && hasZookeeperOffsets) {
- // Respect Kafka offsets if they exist
- Assert.assertFalse("Channel should not read the 5th message", finals.contains(5));
- Assert.assertTrue("Channel should read the 6th message", finals.contains(6));
- } else if (hasKafkaOffsets) {
- // Respect Kafka offsets if they exist (don't fail if zookeeper offsets are missing)
- Assert.assertFalse("Channel should not read the 5th message", finals.contains(5));
- Assert.assertTrue("Channel should read the 6th message", finals.contains(6));
- } else {
- // Otherwise migrate the ZooKeeper offsets if they exist
- Assert.assertFalse("Channel should not read the 10th message", finals.contains(10));
- Assert.assertTrue("Channel should read the 11th message", finals.contains(11));
- }
- }
-
- @Test
- public void testMigrateZookeeperOffsetsWhenTopicNotExists() throws Exception {
- topic = findUnusedTopic();
-
- Context context = prepareDefaultContext(false);
- context.put(ZOOKEEPER_CONNECT_FLUME_KEY, testUtil.getZkUrl());
- context.put(GROUP_ID_FLUME, "testMigrateOffsets-nonExistingTopic");
- KafkaChannel channel = createChannel(context);
-
- channel.start();
-
- Assert.assertEquals(LifecycleState.START, channel.getLifecycleState());
-
- channel.stop();
- }
}
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/pom.xml b/flume-ng-sinks/flume-ng-kafka-sink/pom.xml
index d75265a9c..aa73b7559 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/pom.xml
+++ b/flume-ng-sinks/flume-ng-kafka-sink/pom.xml
@@ -93,10 +93,23 @@
<artifactId>kafka_${scala.version}</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_${scala.version}</artifactId>
+ <classifier>test</classifier>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <scope>test</scope>
+ <classifier>test</classifier>
+ <version>${kafka.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
index add6916df..b13d5b3b8 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
@@ -65,7 +65,6 @@ import static org.apache.flume.shared.kafka.KafkaSSLUtil.isSSLEnabled;
import static org.apache.flume.sink.kafka.KafkaSinkConstants.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.flume.sink.kafka.KafkaSinkConstants.BATCH_SIZE;
import static org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_BATCH_SIZE;
-import static org.apache.flume.sink.kafka.KafkaSinkConstants.BROKER_LIST_FLUME_KEY;
import static org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_ACKS;
import static org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_KEY_SERIALIZER;
import static org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_TOPIC;
@@ -73,12 +72,8 @@ import static org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_VALUE_SERIA
import static org.apache.flume.sink.kafka.KafkaSinkConstants.KAFKA_HEADER;
import static org.apache.flume.sink.kafka.KafkaSinkConstants.KAFKA_PRODUCER_PREFIX;
import static org.apache.flume.sink.kafka.KafkaSinkConstants.KEY_HEADER;
-import static org.apache.flume.sink.kafka.KafkaSinkConstants.OLD_BATCH_SIZE;
-import static org.apache.flume.sink.kafka.KafkaSinkConstants.REQUIRED_ACKS_FLUME_KEY;
import static org.apache.flume.sink.kafka.KafkaSinkConstants.TIMESTAMP_HEADER;
import static org.apache.flume.sink.kafka.KafkaSinkConstants.TOPIC_CONFIG;
-import static org.apache.flume.sink.kafka.KafkaSinkConstants.KEY_SERIALIZER_KEY;
-import static org.apache.flume.sink.kafka.KafkaSinkConstants.MESSAGE_SERIALIZER_KEY;
/**
* A Flume Sink that can publish messages to Kafka.
@@ -338,8 +333,6 @@ public class KafkaSink extends AbstractSink implements Configurable, BatchSizeSu
@Override
public void configure(Context context) {
- translateOldProps(context);
-
String topicStr = context.getString(TOPIC_CONFIG);
if (topicStr == null || topicStr.isEmpty()) {
topicStr = DEFAULT_TOPIC;
@@ -394,60 +387,6 @@ public class KafkaSink extends AbstractSink implements Configurable, BatchSizeSu
}
}
- private void translateOldProps(Context ctx) {
-
- if (!(ctx.containsKey(TOPIC_CONFIG))) {
- ctx.put(TOPIC_CONFIG, ctx.getString("topic"));
- logger.warn("{} is deprecated. Please use the parameter {}", "topic", TOPIC_CONFIG);
- }
-
- //Broker List
- // If there is no value we need to check and set the old param and log a warning message
- if (!(ctx.containsKey(BOOTSTRAP_SERVERS_CONFIG))) {
- String brokerList = ctx.getString(BROKER_LIST_FLUME_KEY);
- if (brokerList == null || brokerList.isEmpty()) {
- throw new ConfigurationException("Bootstrap Servers must be specified");
- } else {
- ctx.put(BOOTSTRAP_SERVERS_CONFIG, brokerList);
- logger.warn("{} is deprecated. Please use the parameter {}",
- BROKER_LIST_FLUME_KEY, BOOTSTRAP_SERVERS_CONFIG);
- }
- }
-
- //batch Size
- if (!(ctx.containsKey(BATCH_SIZE))) {
- String oldBatchSize = ctx.getString(OLD_BATCH_SIZE);
- if ( oldBatchSize != null && !oldBatchSize.isEmpty()) {
- ctx.put(BATCH_SIZE, oldBatchSize);
- logger.warn("{} is deprecated. Please use the parameter {}", OLD_BATCH_SIZE, BATCH_SIZE);
- }
- }
-
- // Acks
- if (!(ctx.containsKey(KAFKA_PRODUCER_PREFIX + ProducerConfig.ACKS_CONFIG))) {
- String requiredKey = ctx.getString(
- KafkaSinkConstants.REQUIRED_ACKS_FLUME_KEY);
- if (!(requiredKey == null) && !(requiredKey.isEmpty())) {
- ctx.put(KAFKA_PRODUCER_PREFIX + ProducerConfig.ACKS_CONFIG, requiredKey);
- logger.warn("{} is deprecated. Please use the parameter {}", REQUIRED_ACKS_FLUME_KEY,
- KAFKA_PRODUCER_PREFIX + ProducerConfig.ACKS_CONFIG);
- }
- }
-
- if (ctx.containsKey(KEY_SERIALIZER_KEY )) {
- logger.warn("{} is deprecated. Flume now uses the latest Kafka producer which implements " +
- "a different interface for serializers. Please use the parameter {}",
- KEY_SERIALIZER_KEY,KAFKA_PRODUCER_PREFIX + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
- }
-
- if (ctx.containsKey(MESSAGE_SERIALIZER_KEY)) {
- logger.warn("{} is deprecated. Flume now uses the latest Kafka producer which implements " +
- "a different interface for serializers. Please use the parameter {}",
- MESSAGE_SERIALIZER_KEY,
- KAFKA_PRODUCER_PREFIX + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
- }
- }
-
private void setProducerProps(Context context, String bootStrapServers) {
kafkaProps.clear();
kafkaProps.put(ProducerConfig.ACKS_CONFIG, DEFAULT_ACKS);
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
index 3f1f279e0..df9000f18 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
@@ -55,14 +55,5 @@ public class KafkaSinkConstants {
public static final String DEFAULT_TOPIC = "default-flume-topic";
public static final String DEFAULT_ACKS = "1";
- /* Old Properties */
-
- /* Properties */
-
- public static final String OLD_BATCH_SIZE = "batchSize";
- public static final String MESSAGE_SERIALIZER_KEY = "serializer.class";
- public static final String KEY_SERIALIZER_KEY = "key.serializer.class";
- public static final String BROKER_LIST_FLUME_KEY = "brokerList";
- public static final String REQUIRED_ACKS_FLUME_KEY = "requiredAcks";
}
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
index fd5130ca9..3dbcd000f 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
@@ -39,7 +39,6 @@ import org.apache.flume.shared.kafka.test.PartitionOption;
import org.apache.flume.shared.kafka.test.PartitionTestScenario;
import org.apache.flume.sink.kafka.util.TestUtil;
import org.apache.flume.source.avro.AvroFlumeEvent;
-import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerConfig;
@@ -69,13 +68,10 @@ import static org.apache.flume.shared.kafka.KafkaSSLUtil.SSL_DISABLE_FQDN_CHECK;
import static org.apache.flume.sink.kafka.KafkaSinkConstants.AVRO_EVENT;
import static org.apache.flume.sink.kafka.KafkaSinkConstants.BATCH_SIZE;
import static org.apache.flume.sink.kafka.KafkaSinkConstants.BOOTSTRAP_SERVERS_CONFIG;
-import static org.apache.flume.sink.kafka.KafkaSinkConstants.BROKER_LIST_FLUME_KEY;
import static org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_KEY_SERIALIZER;
import static org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_TOPIC;
import static org.apache.flume.sink.kafka.KafkaSinkConstants.KAFKA_PREFIX;
import static org.apache.flume.sink.kafka.KafkaSinkConstants.KAFKA_PRODUCER_PREFIX;
-import static org.apache.flume.sink.kafka.KafkaSinkConstants.OLD_BATCH_SIZE;
-import static org.apache.flume.sink.kafka.KafkaSinkConstants.REQUIRED_ACKS_FLUME_KEY;
import static org.apache.flume.sink.kafka.KafkaSinkConstants.TOPIC_CONFIG;
import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG;
@@ -138,26 +134,6 @@ public class TestKafkaSink {
"localhost:9092,localhost:9092");
}
- @Test
- public void testOldProperties() {
- KafkaSink kafkaSink = new KafkaSink();
- Context context = new Context();
- context.put("topic", "test-topic");
- context.put(OLD_BATCH_SIZE, "300");
- context.put(BROKER_LIST_FLUME_KEY, "localhost:9092,localhost:9092");
- context.put(REQUIRED_ACKS_FLUME_KEY, "all");
- Configurables.configure(kafkaSink, context);
-
- Properties kafkaProps = kafkaSink.getKafkaProps();
-
- assertEquals(kafkaSink.getTopic(), "test-topic");
- assertEquals(kafkaSink.getBatchSize(), 300);
- assertEquals(kafkaProps.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG),
- "localhost:9092,localhost:9092");
- assertEquals(kafkaProps.getProperty(ProducerConfig.ACKS_CONFIG), "all");
-
- }
-
@Test
public void testDefaultTopic() {
Sink kafkaSink = new KafkaSink();
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaLocal.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaLocal.java
index 6d89bd33d..700c94393 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaLocal.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaLocal.java
@@ -19,7 +19,9 @@
package org.apache.flume.sink.kafka.util;
import kafka.server.KafkaConfig;
-import kafka.server.KafkaServerStartable;
+import kafka.server.KafkaServer;
+import kafka.utils.TestUtils;
+import org.apache.kafka.common.utils.Time;
import java.io.IOException;
import java.util.Properties;
@@ -30,14 +32,14 @@ import java.util.Properties;
*/
public class KafkaLocal {
- public KafkaServerStartable kafka;
+ public KafkaServer kafka;
public ZooKeeperLocal zookeeper;
public KafkaLocal(Properties kafkaProperties) throws IOException, InterruptedException {
KafkaConfig kafkaConfig = KafkaConfig.fromProps(kafkaProperties);
// start local kafka broker
- kafka = new KafkaServerStartable(kafkaConfig);
+ kafka = TestUtils.createServer(kafkaConfig, Time.SYSTEM);
}
public void start() throws Exception {
diff --git a/flume-ng-sources/flume-kafka-source/pom.xml b/flume-ng-sources/flume-kafka-source/pom.xml
index edb9faeac..5e5c4cf49 100644
--- a/flume-ng-sources/flume-kafka-source/pom.xml
+++ b/flume-ng-sources/flume-kafka-source/pom.xml
@@ -78,10 +78,18 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
+ <version>${kafka.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <classifier>test</classifier>
+ <version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.version}</artifactId>
+ <version>${kafka.version}</version>
<classifier>test</classifier>
<scope>test</scope>
</dependency>
diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
index 2bd19757d..a99b6cfdf 100644
--- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
+++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
@@ -18,13 +18,10 @@ package org.apache.flume.source.kafka;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
-import kafka.cluster.Broker;
-import kafka.cluster.BrokerEndPoint;
import kafka.zk.KafkaZkClient;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.commons.lang.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
@@ -38,7 +35,6 @@ import org.apache.flume.instrumentation.kafka.KafkaSourceCounter;
import org.apache.flume.shared.kafka.KafkaSSLUtil;
import org.apache.flume.source.AbstractPollableSource;
import org.apache.flume.source.avro.AvroFlumeEvent;
-import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -50,14 +46,9 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.common.network.ListenerName;
-import org.apache.kafka.common.security.JaasUtils;
-import org.apache.kafka.common.security.auth.SecurityProtocol;
-import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
-import scala.collection.JavaConverters;
import java.io.ByteArrayInputStream;
import java.time.Duration;
@@ -72,7 +63,6 @@ import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
-import java.util.stream.Collectors;
import static org.apache.flume.shared.kafka.KafkaSSLUtil.SSL_DISABLE_FQDN_CHECK;
import static org.apache.flume.shared.kafka.KafkaSSLUtil.isSSLEnabled;
@@ -86,24 +76,19 @@ import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_BATCH_D
import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_BATCH_SIZE;
import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_GROUP_ID;
import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_KEY_DESERIALIZER;
-import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS;
import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_SET_TOPIC_HEADER;
import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_TOPIC_HEADER;
import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_VALUE_DESERIALIZER;
import static org.apache.flume.source.kafka.KafkaSourceConstants.KAFKA_CONSUMER_PREFIX;
import static org.apache.flume.source.kafka.KafkaSourceConstants.KAFKA_HEADER;
import static org.apache.flume.source.kafka.KafkaSourceConstants.KEY_HEADER;
-import static org.apache.flume.source.kafka.KafkaSourceConstants.MIGRATE_ZOOKEEPER_OFFSETS;
import static org.apache.flume.source.kafka.KafkaSourceConstants.OFFSET_HEADER;
-import static org.apache.flume.source.kafka.KafkaSourceConstants.OLD_GROUP_ID;
import static org.apache.flume.source.kafka.KafkaSourceConstants.PARTITION_HEADER;
import static org.apache.flume.source.kafka.KafkaSourceConstants.SET_TOPIC_HEADER;
import static org.apache.flume.source.kafka.KafkaSourceConstants.TIMESTAMP_HEADER;
-import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPIC;
import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPICS;
import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPICS_REGEX;
import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPIC_HEADER;
-import static org.apache.flume.source.kafka.KafkaSourceConstants.ZOOKEEPER_CONNECT_FLUME_KEY;
/**
* A Source for Kafka which reads messages from kafka topics.
@@ -160,11 +145,8 @@ public class KafkaSource extends AbstractPollableSource
private Subscriber subscriber;
- private String zookeeperConnect;
private String bootstrapServers;
private String groupId = DEFAULT_GROUP_ID;
- @Deprecated
- private boolean migrateZookeeperOffsets = DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS;
private String topicHeader = null;
private boolean setTopicHeader;
private Map<String, String> headerMap;
@@ -388,10 +370,6 @@ public class KafkaSource extends AbstractPollableSource
rebalanceFlag = new AtomicBoolean(false);
kafkaProps = new Properties();
- // can be removed in the next release
- // See https://issues.apache.org/jira/browse/FLUME-2896
- translateOldProperties(context);
-
String topicProperty = context.getString(TOPICS_REGEX);
if (topicProperty != null && !topicProperty.isEmpty()) {
// create subscriber that uses pattern-based subscription
@@ -413,28 +391,9 @@ public class KafkaSource extends AbstractPollableSource
log.debug(AVRO_EVENT + " set to: {}", useAvroEventFormat);
}
- zookeeperConnect = context.getString(ZOOKEEPER_CONNECT_FLUME_KEY);
- migrateZookeeperOffsets = context.getBoolean(MIGRATE_ZOOKEEPER_OFFSETS,
- DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS);
-
bootstrapServers = context.getString(BOOTSTRAP_SERVERS);
if (bootstrapServers == null || bootstrapServers.isEmpty()) {
- if (zookeeperConnect == null || zookeeperConnect.isEmpty()) {
- throw new ConfigurationException("Bootstrap Servers must be specified");
- } else {
- // For backwards compatibility look up the bootstrap from zookeeper
- log.warn("{} is deprecated. Please use the parameter {}", ZOOKEEPER_CONNECT_FLUME_KEY, BOOTSTRAP_SERVERS);
-
- // Lookup configured security protocol, just in case it's not default
- String securityProtocolStr =
- context.getSubProperties(KAFKA_CONSUMER_PREFIX)
- .get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG);
- if (securityProtocolStr == null || securityProtocolStr.isEmpty()) {
- securityProtocolStr = CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL;
- }
- bootstrapServers =
- lookupBootstrap(zookeeperConnect, SecurityProtocol.valueOf(securityProtocolStr));
- }
+ throw new ConfigurationException("Bootstrap Servers must be specified");
}
String groupIdProperty =
@@ -465,23 +424,6 @@ public class KafkaSource extends AbstractPollableSource
}
}
- // We can remove this once the properties are officially deprecated
- private void translateOldProperties(Context ctx) {
- // topic
- String topic = context.getString(TOPIC);
- if (topic != null && !topic.isEmpty()) {
- subscriber = new TopicListSubscriber(topic);
- log.warn("{} is deprecated. Please use the parameter {}", TOPIC, TOPICS);
- }
-
- // old groupId
- groupId = ctx.getString(OLD_GROUP_ID);
- if (groupId != null && !groupId.isEmpty()) {
- log.warn("{} is deprecated. Please use the parameter {}", OLD_GROUP_ID,
- KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG);
- }
- }
-
private void setConsumerProps(Context ctx) {
kafkaProps.clear();
kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, DEFAULT_KEY_DESERIALIZER);
@@ -505,30 +447,6 @@ public class KafkaSource extends AbstractPollableSource
KafkaSSLUtil.addGlobalSSLParameters(kafkaProps);
}
- /**
- * Generates the Kafka bootstrap connection string from the metadata stored in Zookeeper.
- * Allows for backwards compatibility of the zookeeperConnect configuration.
- */
- private String lookupBootstrap(String zookeeperConnect, SecurityProtocol securityProtocol) {
- try (KafkaZkClient zkClient = KafkaZkClient.apply(zookeeperConnect,
- JaasUtils.isZkSaslEnabled(), ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT, 10,
- Time.SYSTEM, "kafka.server", "SessionExpireListener",
- scala.Option.empty(), scala.Option.empty())) {
- List<Broker> brokerList =
- JavaConverters.seqAsJavaListConverter(zkClient.getAllBrokersInCluster()).asJava();
- List<BrokerEndPoint> endPoints = brokerList.stream()
- .map(broker -> broker.brokerEndPoint(
- ListenerName.forSecurityProtocol(securityProtocol))
- )
- .collect(Collectors.toList());
- List<String> connections = new ArrayList<>();
- for (BrokerEndPoint endPoint : endPoints) {
- connections.add(endPoint.connectionString());
- }
- return StringUtils.join(connections, ',');
- }
- }
-
@VisibleForTesting
String getBootstrapServers() {
return bootstrapServers;
@@ -557,21 +475,6 @@ public class KafkaSource extends AbstractPollableSource
protected void doStart() throws FlumeException {
log.info("Starting {}...", this);
- // As a migration step check if there are any offsets from the group stored in kafka
- // If not read them from Zookeeper and commit them to Kafka
- if (migrateZookeeperOffsets && zookeeperConnect != null && !zookeeperConnect.isEmpty()) {
- // For simplicity we only support migration of a single topic via the TopicListSubscriber.
- // There was no way to define a list of topics or a pattern in the previous Flume version.
- if (subscriber instanceof TopicListSubscriber &&
- ((TopicListSubscriber) subscriber).get().size() == 1) {
- String topicStr = ((TopicListSubscriber) subscriber).get().get(0);
- migrateOffsets(topicStr);
- } else {
- log.info("Will not attempt to migrate offsets " +
- "because multiple topics or a pattern are defined");
- }
- }
-
//initialize a consumer.
consumer = new KafkaConsumer<String, byte[]>(kafkaProps);
@@ -594,64 +497,6 @@ public class KafkaSource extends AbstractPollableSource
log.info("Kafka Source {} stopped. Metrics: {}", getName(), counter);
}
- private void migrateOffsets(String topicStr) {
- try (KafkaZkClient zkClient = KafkaZkClient.apply(zookeeperConnect,
- JaasUtils.isZkSaslEnabled(), ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT, 10,
- Time.SYSTEM, "kafka.server", "SessionExpireListener",
- scala.Option.empty(), scala.Option.empty());
- KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(kafkaProps)) {
- Map<TopicPartition, OffsetAndMetadata> kafkaOffsets =
- getKafkaOffsets(consumer, topicStr);
- if (kafkaOffsets == null) {
- log.warn("Topic " + topicStr + " not found in Kafka. Offset migration will be skipped.");
- return;
- }
- if (!kafkaOffsets.isEmpty()) {
- log.info("Found Kafka offsets for topic " + topicStr +
- ". Will not migrate from zookeeper");
- log.debug("Offsets found: {}", kafkaOffsets);
- return;
- }
-
- log.info("No Kafka offsets found. Migrating zookeeper offsets");
- Map<TopicPartition, OffsetAndMetadata> zookeeperOffsets =
- getZookeeperOffsets(zkClient, consumer, topicStr);
- if (zookeeperOffsets.isEmpty()) {
- log.warn("No offsets to migrate found in Zookeeper");
- return;
- }
-
- log.info("Committing Zookeeper offsets to Kafka");
- log.debug("Offsets to commit: {}", zookeeperOffsets);
- consumer.commitSync(zookeeperOffsets);
- // Read the offsets to verify they were committed
- Map<TopicPartition, OffsetAndMetadata> newKafkaOffsets =
- getKafkaOffsets(consumer, topicStr);
- log.debug("Offsets committed: {}", newKafkaOffsets);
- if (newKafkaOffsets == null
- || !newKafkaOffsets.keySet().containsAll(zookeeperOffsets.keySet())) {
- throw new FlumeException("Offsets could not be committed");
- }
- }
- }
-
- private Map<TopicPartition, OffsetAndMetadata> getKafkaOffsets(
- KafkaConsumer<String, byte[]> client, String topicStr) {
- Map<TopicPartition, OffsetAndMetadata> offsets = null;
- List<PartitionInfo> partitions = client.partitionsFor(topicStr);
- if (partitions != null) {
- offsets = new HashMap<>();
- for (PartitionInfo partition : partitions) {
- TopicPartition key = new TopicPartition(topicStr, partition.partition());
- OffsetAndMetadata offsetAndMetadata = client.committed(key);
- if (offsetAndMetadata != null) {
- offsets.put(key, offsetAndMetadata);
- }
- }
- }
- return offsets;
- }
-
private Map<TopicPartition, OffsetAndMetadata> getZookeeperOffsets(
KafkaZkClient zkClient, KafkaConsumer<String, byte[]> consumer, String topicStr) {
diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
index 5c7857d1b..97216e159 100644
--- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
+++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
@@ -38,17 +38,9 @@ public class KafkaSourceConstants {
public static final String DEFAULT_GROUP_ID = "flume";
public static final String KAFKA_HEADER = "header.";
- public static final String MIGRATE_ZOOKEEPER_OFFSETS = "migrateZookeeperOffsets";
- public static final boolean DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS = true;
-
public static final String AVRO_EVENT = "useFlumeEventFormat";
public static final boolean DEFAULT_AVRO_EVENT = false;
- /* Old Properties */
- public static final String ZOOKEEPER_CONNECT_FLUME_KEY = "zookeeperConnect";
- public static final String TOPIC = "topic";
- public static final String OLD_GROUP_ID = "groupId";
-
// flume event headers
public static final String DEFAULT_TOPIC_HEADER = "topic";
public static final String KEY_HEADER = "key";
diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
index b2deea9c9..e2c4d8f0e 100644
--- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
+++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
@@ -17,7 +17,8 @@
package org.apache.flume.source.kafka;
import kafka.server.KafkaConfig;
-import kafka.server.KafkaServerStartable;
+import kafka.server.KafkaServer;
+import kafka.utils.TestUtils;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
@@ -28,6 +29,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Time;
import java.io.File;
import java.io.IOException;
@@ -40,16 +42,13 @@ import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG;
-import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG;
-import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG;
-import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG;
+import static org.apache.kafka.common.config.SslConfigs.*;
public class KafkaSourceEmbeddedKafka {
public static String HOST = InetAddress.getLoopbackAddress().getCanonicalHostName();
- KafkaServerStartable kafkaServer;
+ KafkaServer kafkaServer;
KafkaSourceEmbeddedZookeeper zookeeper;
private AdminClient adminClient;
@@ -103,7 +102,7 @@ public class KafkaSourceEmbeddedKafka {
props.putAll(properties);
}
KafkaConfig config = new KafkaConfig(props);
- kafkaServer = new KafkaServerStartable(config);
+ kafkaServer = TestUtils.createServer(config, Time.SYSTEM);
kafkaServer.startup();
initProducer();
}
diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
index d5caf7ceb..2d67f104c 100644
--- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
+++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
@@ -19,7 +19,6 @@ package org.apache.flume.source.kafka;
import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
-import kafka.zk.KafkaZkClient;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
@@ -37,19 +36,12 @@ import org.apache.flume.source.avro.AvroFlumeEvent;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
-import org.apache.kafka.common.security.JaasUtils;
import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.utils.Time;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -71,7 +63,6 @@ import java.time.Duration;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -87,13 +78,10 @@ import static org.apache.flume.source.kafka.KafkaSourceConstants.BOOTSTRAP_SERVE
import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_AUTO_COMMIT;
import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_TOPIC_HEADER;
import static org.apache.flume.source.kafka.KafkaSourceConstants.KAFKA_CONSUMER_PREFIX;
-import static org.apache.flume.source.kafka.KafkaSourceConstants.OLD_GROUP_ID;
import static org.apache.flume.source.kafka.KafkaSourceConstants.PARTITION_HEADER;
import static org.apache.flume.source.kafka.KafkaSourceConstants.TIMESTAMP_HEADER;
-import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPIC;
import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPICS;
import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPICS_REGEX;
-import static org.apache.flume.source.kafka.KafkaSourceConstants.ZOOKEEPER_CONNECT_FLUME_KEY;
import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
import static org.apache.kafka.common.config.SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG;
import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG;
@@ -586,47 +574,11 @@ public class TestKafkaSource {
kafkaProps.getProperty("bootstrap.servers"));
}
- @Test
- public void testOldProperties() {
- Context context = new Context();
-
- context.put(TOPIC, "old.topic");
- context.put(OLD_GROUP_ID, "old.groupId");
- context.put(BOOTSTRAP_SERVERS, "real-bootstrap-servers-list");
- KafkaSource source = new KafkaSource();
- source.doConfigure(context);
- Properties kafkaProps = source.getConsumerProps();
-
- KafkaSource.Subscriber<List<String>> subscriber = source.getSubscriber();
- //check topic was set
- assertEquals("old.topic", subscriber.get().get(0));
- //check that kafka old properties override the default and get correct name
- assertEquals("old.groupId", kafkaProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG));
-
- source = new KafkaSource();
- context.put(KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG, "override.old.group.id");
- source.doConfigure(context);
- kafkaProps = source.getConsumerProps();
- //check that kafka new properties override old
- assertEquals("override.old.group.id", kafkaProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG));
-
- context.clear();
- context.put(BOOTSTRAP_SERVERS, "real-bootstrap-servers-list");
- context.put(TOPIC, "old.topic");
- source = new KafkaSource();
- source.doConfigure(context);
- kafkaProps = source.getConsumerProps();
- //check defaults set
- assertEquals(KafkaSourceConstants.DEFAULT_GROUP_ID,
- kafkaProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG));
- }
-
@Test
public void testPatternBasedSubscription() {
Context context = new Context();
context.put(TOPICS_REGEX, "^topic[0-9]$");
- context.put(OLD_GROUP_ID, "old.groupId");
context.put(BOOTSTRAP_SERVERS, "real-bootstrap-servers-list");
KafkaSource source = new KafkaSource();
source.doConfigure(context);
@@ -709,49 +661,6 @@ public class TestKafkaSource {
}
- @Test
- public void testBootstrapLookup() {
- Context context = new Context();
-
- context.put(ZOOKEEPER_CONNECT_FLUME_KEY, kafkaServer.getZkConnectString());
- context.put(TOPIC, "old.topic");
- context.put(OLD_GROUP_ID, "old.groupId");
- KafkaSource source = new KafkaSource();
- source.doConfigure(context);
- String sourceServers = source.getBootstrapServers();
- sourceServers = sourceServers.substring(0, sourceServers.indexOf(':'));
-
- String kafkaServers = kafkaServer.getBootstrapServers();
- kafkaServers = kafkaServers.substring(0, kafkaServers.indexOf(':'));
-
- List<String> possibleValues = Arrays.asList("localhost", "127.0.0.1");
-
- if (!(possibleValues.contains(sourceServers) && possibleValues.contains(kafkaServers))) {
- fail("Expected either 'localhost' or '127.0.0.1'. Source: "
- + sourceServers + ", Kafka: " + kafkaServers);
- }
- }
-
- @Test
- public void testMigrateOffsetsNone() throws Exception {
- doTestMigrateZookeeperOffsets(false, false, "testMigrateOffsets-none");
- }
-
- @Test
- public void testMigrateOffsetsZookeeper() throws Exception {
- doTestMigrateZookeeperOffsets(true, false, "testMigrateOffsets-zookeeper");
- }
-
- @Test
- public void testMigrateOffsetsKafka() throws Exception {
- doTestMigrateZookeeperOffsets(false, true, "testMigrateOffsets-kafka");
- }
-
- @Test
- public void testMigrateOffsetsBoth() throws Exception {
- doTestMigrateZookeeperOffsets(true, true, "testMigrateOffsets-both");
- }
-
/**
* Tests that sub-properties (kafka.consumer.*) apply correctly across multiple invocations
* of configure() (fix for FLUME-2857).
@@ -766,7 +675,7 @@ public class TestKafkaSource {
Context context = prepareDefaultContext(group);
context.put(KafkaSourceConstants.KAFKA_CONSUMER_PREFIX + sampleConsumerProp,
sampleConsumerVal);
- context.put(TOPIC, "random-topic");
+ context.put(TOPICS, "random-topic");
kafkaSource.configure(context);
@@ -774,7 +683,7 @@ public class TestKafkaSource {
kafkaSource.getConsumerProps().getProperty(sampleConsumerProp));
context = prepareDefaultContext(group);
- context.put(TOPIC, "random-topic");
+ context.put(TOPICS, "random-topic");
kafkaSource.configure(context);
Assert.assertNull(kafkaSource.getConsumerProps().getProperty(sampleConsumerProp));
@@ -913,110 +822,6 @@ public class TestKafkaSource {
kafkaSource.stop();
}
- private void doTestMigrateZookeeperOffsets(boolean hasZookeeperOffsets, boolean hasKafkaOffsets,
- String group) throws Exception {
- // create a topic with 1 partition for simplicity
- String topic = findUnusedTopic();
- kafkaServer.createTopic(topic, 1);
-
- Context context = prepareDefaultContext(group);
- context.put(ZOOKEEPER_CONNECT_FLUME_KEY, kafkaServer.getZkConnectString());
- context.put(TOPIC, topic);
- KafkaSource source = new KafkaSource();
- source.doConfigure(context);
-
- // Produce some data and save an offset
- Long fifthOffset = 0L;
- Long tenthOffset = 0L;
- Properties props = createProducerProps(kafkaServer.getBootstrapServers());
- KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);
- for (int i = 1; i <= 50; i++) {
- ProducerRecord<String, byte[]> data =
- new ProducerRecord<>(topic, null, String.valueOf(i).getBytes());
- RecordMetadata recordMetadata = producer.send(data).get();
- if (i == 5) {
- fifthOffset = recordMetadata.offset();
- }
- if (i == 10) {
- tenthOffset = recordMetadata.offset();
- }
- }
-
- // Commit 10th offset to zookeeper
- if (hasZookeeperOffsets) {
- KafkaZkClient zkClient = KafkaZkClient.apply(kafkaServer.getZkConnectString(),
- JaasUtils.isZkSaslEnabled(), 30000, 30000, 10, Time.SYSTEM,
- "kafka.server", "SessionExpireListener", scala.Option.empty(), scala.Option.empty());
- zkClient.getConsumerOffset(group, new TopicPartition(topic, 0));
- Long offset = tenthOffset + 1;
- zkClient.setOrCreateConsumerOffset(group, new TopicPartition(topic, 0), offset);
- zkClient.close();
- }
-
- // Commit 5th offset to kafka
- if (hasKafkaOffsets) {
- Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
- offsets.put(new TopicPartition(topic, 0), new OffsetAndMetadata(fifthOffset + 1));
- KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(source.getConsumerProps());
- consumer.commitSync(offsets);
- consumer.close();
- }
-
- // Start the source and read some data
- source.setChannelProcessor(createGoodChannel());
- source.start();
- for (int i = 0; i < 3; i++) {
- source.process();
- Thread.sleep(1000);
- }
-
- Thread.sleep(500L);
- source.process();
- List<Integer> finals = new ArrayList<Integer>(40);
- for (Event event: events) {
- finals.add(Integer.parseInt(new String(event.getBody())));
- }
- source.stop();
-
- if (!hasKafkaOffsets && !hasZookeeperOffsets) {
- // The default behavior is to start at the latest message in the log
- org.junit.Assert.assertTrue("Source should read no messages", finals.isEmpty());
- } else if (hasKafkaOffsets && hasZookeeperOffsets) {
- // Respect Kafka offsets if they exist
- org.junit.Assert.assertFalse("Source should not read the 5th message", finals.contains(5));
- org.junit.Assert.assertTrue("Source should read the 6th message", finals.contains(6));
- } else if (hasKafkaOffsets) {
- // Respect Kafka offsets if they exist (don't fail if zookeeper offsets are missing)
- org.junit.Assert.assertFalse("Source should not read the 5th message", finals.contains(5));
- org.junit.Assert.assertTrue("Source should read the 6th message", finals.contains(6));
- } else {
- // Otherwise migrate the ZooKeeper offsets if they exist
- org.junit.Assert.assertFalse("Source should not read the 10th message", finals.contains(10));
- org.junit.Assert.assertTrue("Source should read the 11th message", finals.contains(11));
- }
- }
-
- @Test
- public void testMigrateZookeeperOffsetsWhenTopicNotExists() throws Exception {
- String topic = findUnusedTopic();
-
- Context context = prepareDefaultContext("testMigrateOffsets-nonExistingTopic");
- context.put(ZOOKEEPER_CONNECT_FLUME_KEY, kafkaServer.getZkConnectString());
- context.put(TOPIC, topic);
- KafkaSource source = new KafkaSource();
- source.doConfigure(context);
-
- source.setChannelProcessor(createGoodChannel());
- source.start();
-
- assertEquals(LifecycleState.START, source.getLifecycleState());
-
- Status status = source.process();
- assertEquals(Status.BACKOFF, status);
-
- source.stop();
- }
-
ChannelProcessor createGoodChannel() {
ChannelProcessor channelProcessor = mock(ChannelProcessor.class);
diff --git a/flume-shared/flume-shared-kafka-test/pom.xml b/flume-shared/flume-shared-kafka-test/pom.xml
index d2e10692d..ebfb0b24d 100644
--- a/flume-shared/flume-shared-kafka-test/pom.xml
+++ b/flume-shared/flume-shared-kafka-test/pom.xml
@@ -81,7 +81,6 @@
<artifactId>kafka-clients</artifactId>
<scope>provided</scope>
</dependency>
-
</dependencies>
</project>
diff --git a/pom.xml b/pom.xml
index 0bc9ff252..8be106499 100644
--- a/pom.xml
+++ b/pom.xml
@@ -85,7 +85,7 @@ limitations under the License.
<jdom.version>1.1.3</jdom.version>
<joda-time.version>2.9.9</joda-time.version>
<junit.version>4.13.2</junit.version>
- <kafka.version>2.7.2</kafka.version>
+ <kafka.version>3.3.1</kafka.version>
<kite.version>1.1.0</kite.version>
<kudu.version>1.10.0</kudu.version>
<lifecycle-mapping.version>1.0.0</lifecycle-mapping.version>