You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/07/01 07:35:59 UTC
[08/14] flink git commit: [FLINK-4022] [kafka] Partition / topic
discovery for FlinkKafkaConsumer
[FLINK-4022] [kafka] Partition / topic discovery for FlinkKafkaConsumer
This closes #3746.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/085d4db8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/085d4db8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/085d4db8
Branch: refs/heads/master
Commit: 085d4db849b9c659ba6bb2c7d66a04debeb048c4
Parents: b8c8f20
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Thu Apr 20 17:17:43 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Sat Jul 1 15:33:42 2017 +0800
----------------------------------------------------------------------
docs/dev/connectors/kafka.md | 15 +
.../connectors/kafka/FlinkKafkaConsumer010.java | 12 +
.../internal/Kafka010PartitionDiscoverer.java | 38 +
.../connectors/kafka/Kafka010FetcherTest.java | 483 -----------
.../kafka/internal/Kafka010FetcherTest.java | 480 +++++++++++
.../connectors/kafka/FlinkKafkaConsumer08.java | 173 +---
.../kafka/internals/ClosableBlockingQueue.java | 506 -----------
.../kafka/internals/Kafka08Fetcher.java | 25 +-
.../internals/Kafka08PartitionDiscoverer.java | 293 +++++++
.../kafka/internals/PartitionInfoFetcher.java | 18 +-
.../internals/PeriodicOffsetCommitter.java | 9 +-
.../connectors/kafka/KafkaConsumer08Test.java | 51 +-
.../kafka/KafkaTestEnvironmentImpl.java | 16 +-
.../internals/ClosableBlockingQueueTest.java | 611 --------------
.../connectors/kafka/FlinkKafkaConsumer09.java | 59 +-
.../kafka/internal/Kafka09Fetcher.java | 10 +-
.../internal/Kafka09PartitionDiscoverer.java | 103 +++
.../kafka/internal/KafkaConsumerThread.java | 229 ++++-
.../connectors/kafka/Kafka09FetcherTest.java | 482 -----------
.../kafka/internal/Kafka09FetcherTest.java | 479 +++++++++++
.../kafka/internal/KafkaConsumerThreadTest.java | 843 +++++++++++++++++++
.../kafka/FlinkKafkaConsumerBase.java | 497 ++++++-----
.../kafka/internals/AbstractFetcher.java | 235 ++++--
.../internals/AbstractPartitionDiscoverer.java | 256 ++++++
.../kafka/internals/ClosableBlockingQueue.java | 506 +++++++++++
.../kafka/internals/KafkaTopicPartition.java | 19 +
.../kafka/internals/KafkaTopicsDescriptor.java | 74 ++
.../FlinkKafkaConsumerBaseMigrationTest.java | 121 ++-
.../kafka/FlinkKafkaConsumerBaseTest.java | 74 +-
.../KafkaConsumerPartitionAssignmentTest.java | 356 --------
.../connectors/kafka/KafkaConsumerTestBase.java | 5 +-
.../kafka/internals/AbstractFetcherTest.java | 18 +-
.../AbstractPartitionDiscovererTest.java | 428 ++++++++++
.../internals/ClosableBlockingQueueTest.java | 611 ++++++++++++++
34 files changed, 5055 insertions(+), 3080 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/085d4db8/docs/dev/connectors/kafka.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index bc7e7de..23ad3b8 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -292,6 +292,21 @@ Flink on YARN supports automatic restart of lost YARN containers.
If checkpointing is not enabled, the Kafka consumer will periodically commit the offsets to Zookeeper.
+### Kafka Consumers Partition Discovery
+
+The Flink Kafka Consumer supports discovering dynamically created Kafka partitions, and consumes them with
+exactly-once guarantees. All partitions discovered after the initial retrieval of partition metadata (i.e., when the
+job starts running) will be consumed from the earliest possible offset.
+
+By default, partition discovery is disabled. To enable it, set a non-negative value
+for `flink.partition-discovery.interval-millis` in the provided properties config,
+representing the discovery interval in milliseconds.
+
+<span class="label label-danger">Limitation</span> When the consumer is restored from a savepoint from Flink versions
+prior to Flink 1.3.x, partition discovery cannot be enabled on the restore run. If enabled, the restore would fail
+with an exception. In this case, in order to use partition discovery, please first take a savepoint in Flink 1.3.x and
+then restore again from that.
+
### Kafka Consumers Offset Committing Behaviour Configuration
The Flink Kafka Consumer allows configuring the behaviour of how offsets
http://git-wip-us.apache.org/repos/asf/flink/blob/085d4db8/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
index 1bbd1dc..b851e3e 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
@@ -22,8 +22,11 @@ import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
import org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher;
+import org.apache.flink.streaming.connectors.kafka.internal.Kafka010PartitionDiscoverer;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
@@ -159,4 +162,13 @@ public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> {
pollTimeout,
useMetrics);
}
+
+ @Override
+ protected AbstractPartitionDiscoverer createPartitionDiscoverer(
+ KafkaTopicsDescriptor topicsDescriptor,
+ int indexOfThisSubtask,
+ int numParallelSubtasks) {
+
+ return new Kafka010PartitionDiscoverer(topicsDescriptor, indexOfThisSubtask, numParallelSubtasks, properties);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/085d4db8/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010PartitionDiscoverer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010PartitionDiscoverer.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010PartitionDiscoverer.java
new file mode 100644
index 0000000..0c10f40
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010PartitionDiscoverer.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
+
+import java.util.Properties;
+
+/**
+ * A partition discoverer that can be used to discover topics and partitions metadata
+ * from Kafka brokers via the Kafka 0.10 high-level consumer API.
+ */
+public class Kafka010PartitionDiscoverer extends Kafka09PartitionDiscoverer {
+
+ public Kafka010PartitionDiscoverer(
+ KafkaTopicsDescriptor topicsDescriptor,
+ int indexOfThisSubtask,
+ int numParallelSubtasks,
+ Properties kafkaProperties) {
+
+ super(topicsDescriptor, indexOfThisSubtask, numParallelSubtasks, kafkaProperties);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/085d4db8/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
deleted file mode 100644
index aedd4ba..0000000
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
+++ /dev/null
@@ -1,483 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.core.testutils.MultiShotLatch;
-import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.connectors.kafka.internal.Handover;
-import org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher;
-import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
-import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
-import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.consumer.OffsetCommitCallback;
-import org.apache.kafka.common.TopicPartition;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantLock;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyLong;
-import static org.powermock.api.mockito.PowerMockito.doAnswer;
-import static org.powermock.api.mockito.PowerMockito.mock;
-import static org.powermock.api.mockito.PowerMockito.when;
-import static org.powermock.api.mockito.PowerMockito.whenNew;
-
-/**
- * Unit tests for the {@link Kafka010Fetcher}.
- */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(KafkaConsumerThread.class)
-public class Kafka010FetcherTest {
-
- @Test
- public void testCommitDoesNotBlock() throws Exception {
-
- // test data
- final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42);
- final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>();
- testCommitData.put(testPartition, 11L);
-
- // to synchronize when the consumer is in its blocking method
- final OneShotLatch sync = new OneShotLatch();
-
- // ----- the mock consumer with blocking poll calls ----
- final MultiShotLatch blockerLatch = new MultiShotLatch();
-
- KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
- when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
-
- @Override
- public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException {
- sync.trigger();
- blockerLatch.await();
- return ConsumerRecords.empty();
- }
- });
-
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocation) {
- blockerLatch.trigger();
- return null;
- }
- }).when(mockConsumer).wakeup();
-
- // make sure the fetcher creates the mock consumer
- whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
-
- // ----- create the test fetcher -----
-
- @SuppressWarnings("unchecked")
- SourceContext<String> sourceContext = mock(SourceContext.class);
- Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
- Collections.singletonMap(new KafkaTopicPartition("test", 42), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
- KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
-
- final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
- sourceContext,
- partitionsWithInitialOffsets,
- null, /* periodic assigner */
- null, /* punctuated assigner */
- new TestProcessingTimeService(),
- 10,
- getClass().getClassLoader(),
- "taskname-with-subtask",
- new UnregisteredMetricsGroup(),
- schema,
- new Properties(),
- 0L,
- false);
-
- // ----- run the fetcher -----
-
- final AtomicReference<Throwable> error = new AtomicReference<>();
- final Thread fetcherRunner = new Thread("fetcher runner") {
-
- @Override
- public void run() {
- try {
- fetcher.runFetchLoop();
- } catch (Throwable t) {
- error.set(t);
- }
- }
- };
- fetcherRunner.start();
-
- // wait until the fetcher has reached the method of interest
- sync.await();
-
- // ----- trigger the offset commit -----
-
- final AtomicReference<Throwable> commitError = new AtomicReference<>();
- final Thread committer = new Thread("committer runner") {
- @Override
- public void run() {
- try {
- fetcher.commitInternalOffsetsToKafka(testCommitData);
- } catch (Throwable t) {
- commitError.set(t);
- }
- }
- };
- committer.start();
-
- // ----- ensure that the committer finishes in time -----
- committer.join(30000);
- assertFalse("The committer did not finish in time", committer.isAlive());
-
- // ----- test done, wait till the fetcher is done for a clean shutdown -----
- fetcher.cancel();
- fetcherRunner.join();
-
- // check that there were no errors in the fetcher
- final Throwable fetcherError = error.get();
- if (fetcherError != null && !(fetcherError instanceof Handover.ClosedException)) {
- throw new Exception("Exception in the fetcher", fetcherError);
- }
-
- final Throwable committerError = commitError.get();
- if (committerError != null) {
- throw new Exception("Exception in the committer", committerError);
- }
- }
-
- @Test
- public void ensureOffsetsGetCommitted() throws Exception {
-
- // test data
- final KafkaTopicPartition testPartition1 = new KafkaTopicPartition("test", 42);
- final KafkaTopicPartition testPartition2 = new KafkaTopicPartition("another", 99);
-
- final Map<KafkaTopicPartition, Long> testCommitData1 = new HashMap<>();
- testCommitData1.put(testPartition1, 11L);
- testCommitData1.put(testPartition2, 18L);
-
- final Map<KafkaTopicPartition, Long> testCommitData2 = new HashMap<>();
- testCommitData2.put(testPartition1, 19L);
- testCommitData2.put(testPartition2, 28L);
-
- final BlockingQueue<Map<TopicPartition, OffsetAndMetadata>> commitStore = new LinkedBlockingQueue<>();
-
- // ----- the mock consumer with poll(), wakeup(), and commit(A)sync calls ----
-
- final MultiShotLatch blockerLatch = new MultiShotLatch();
-
- KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
-
- when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
- @Override
- public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException {
- blockerLatch.await();
- return ConsumerRecords.empty();
- }
- });
-
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocation) {
- blockerLatch.trigger();
- return null;
- }
- }).when(mockConsumer).wakeup();
-
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocation) {
- @SuppressWarnings("unchecked")
- Map<TopicPartition, OffsetAndMetadata> offsets =
- (Map<TopicPartition, OffsetAndMetadata>) invocation.getArguments()[0];
-
- OffsetCommitCallback callback = (OffsetCommitCallback) invocation.getArguments()[1];
-
- commitStore.add(offsets);
- callback.onComplete(offsets, null);
-
- return null;
- }
- }).when(mockConsumer).commitAsync(
- Mockito.<Map<TopicPartition, OffsetAndMetadata>>any(), any(OffsetCommitCallback.class));
-
- // make sure the fetcher creates the mock consumer
- whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
-
- // ----- create the test fetcher -----
-
- @SuppressWarnings("unchecked")
- SourceContext<String> sourceContext = mock(SourceContext.class);
- Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
- Collections.singletonMap(new KafkaTopicPartition("test", 42), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
- KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
-
- final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
- sourceContext,
- partitionsWithInitialOffsets,
- null, /* periodic assigner */
- null, /* punctuated assigner */
- new TestProcessingTimeService(),
- 10,
- getClass().getClassLoader(),
- "taskname-with-subtask",
- new UnregisteredMetricsGroup(),
- schema,
- new Properties(),
- 0L,
- false);
-
- // ----- run the fetcher -----
-
- final AtomicReference<Throwable> error = new AtomicReference<>();
- final Thread fetcherRunner = new Thread("fetcher runner") {
-
- @Override
- public void run() {
- try {
- fetcher.runFetchLoop();
- } catch (Throwable t) {
- error.set(t);
- }
- }
- };
- fetcherRunner.start();
-
- // ----- trigger the first offset commit -----
-
- fetcher.commitInternalOffsetsToKafka(testCommitData1);
- Map<TopicPartition, OffsetAndMetadata> result1 = commitStore.take();
-
- for (Entry<TopicPartition, OffsetAndMetadata> entry : result1.entrySet()) {
- TopicPartition partition = entry.getKey();
- if (partition.topic().equals("test")) {
- assertEquals(42, partition.partition());
- assertEquals(12L, entry.getValue().offset());
- }
- else if (partition.topic().equals("another")) {
- assertEquals(99, partition.partition());
- assertEquals(18L, entry.getValue().offset());
- }
- }
-
- // ----- trigger the second offset commit -----
-
- fetcher.commitInternalOffsetsToKafka(testCommitData2);
- Map<TopicPartition, OffsetAndMetadata> result2 = commitStore.take();
-
- for (Entry<TopicPartition, OffsetAndMetadata> entry : result2.entrySet()) {
- TopicPartition partition = entry.getKey();
- if (partition.topic().equals("test")) {
- assertEquals(42, partition.partition());
- assertEquals(20L, entry.getValue().offset());
- }
- else if (partition.topic().equals("another")) {
- assertEquals(99, partition.partition());
- assertEquals(28L, entry.getValue().offset());
- }
- }
-
- // ----- test done, wait till the fetcher is done for a clean shutdown -----
- fetcher.cancel();
- fetcherRunner.join();
-
- // check that there were no errors in the fetcher
- final Throwable caughtError = error.get();
- if (caughtError != null && !(caughtError instanceof Handover.ClosedException)) {
- throw new Exception("Exception in the fetcher", caughtError);
- }
- }
-
- @Test
- public void testCancellationWhenEmitBlocks() throws Exception {
-
- // ----- some test data -----
-
- final String topic = "test-topic";
- final int partition = 3;
- final byte[] payload = new byte[] {1, 2, 3, 4};
-
- final List<ConsumerRecord<byte[], byte[]>> records = Arrays.asList(
- new ConsumerRecord<byte[], byte[]>(topic, partition, 15, payload, payload),
- new ConsumerRecord<byte[], byte[]>(topic, partition, 16, payload, payload),
- new ConsumerRecord<byte[], byte[]>(topic, partition, 17, payload, payload));
-
- final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> data = new HashMap<>();
- data.put(new TopicPartition(topic, partition), records);
-
- final ConsumerRecords<byte[], byte[]> consumerRecords = new ConsumerRecords<>(data);
-
- // ----- the test consumer -----
-
- final KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
- when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
- @Override
- public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) {
- return consumerRecords;
- }
- });
-
- whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
-
- // ----- build a fetcher -----
-
- BlockingSourceContext<String> sourceContext = new BlockingSourceContext<>();
- Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
- Collections.singletonMap(new KafkaTopicPartition(topic, partition), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
- KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
-
- final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
- sourceContext,
- partitionsWithInitialOffsets,
- null, /* periodic watermark extractor */
- null, /* punctuated watermark extractor */
- new TestProcessingTimeService(),
- 10, /* watermark interval */
- this.getClass().getClassLoader(),
- "task_name",
- new UnregisteredMetricsGroup(),
- schema,
- new Properties(),
- 0L,
- false);
-
- // ----- run the fetcher -----
-
- final AtomicReference<Throwable> error = new AtomicReference<>();
- final Thread fetcherRunner = new Thread("fetcher runner") {
-
- @Override
- public void run() {
- try {
- fetcher.runFetchLoop();
- } catch (Throwable t) {
- error.set(t);
- }
- }
- };
- fetcherRunner.start();
-
- // wait until the thread started to emit records to the source context
- sourceContext.waitTillHasBlocker();
-
- // now we try to cancel the fetcher, including the interruption usually done on the task thread
- // once it has finished, there must be no more thread blocked on the source context
- fetcher.cancel();
- fetcherRunner.interrupt();
- fetcherRunner.join();
-
- assertFalse("fetcher threads did not properly finish", sourceContext.isStillBlocking());
- }
-
- // ------------------------------------------------------------------------
- // test utilities
- // ------------------------------------------------------------------------
-
- private static final class BlockingSourceContext<T> implements SourceContext<T> {
-
- private final ReentrantLock lock = new ReentrantLock();
- private final OneShotLatch inBlocking = new OneShotLatch();
-
- @Override
- public void collect(T element) {
- block();
- }
-
- @Override
- public void collectWithTimestamp(T element, long timestamp) {
- block();
- }
-
- @Override
- public void emitWatermark(Watermark mark) {
- block();
- }
-
- @Override
- public void markAsTemporarilyIdle() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Object getCheckpointLock() {
- return new Object();
- }
-
- @Override
- public void close() {}
-
- public void waitTillHasBlocker() throws InterruptedException {
- inBlocking.await();
- }
-
- public boolean isStillBlocking() {
- return lock.isLocked();
- }
-
- @SuppressWarnings({"InfiniteLoopStatement", "SynchronizationOnLocalVariableOrMethodParameter"})
- private void block() {
- lock.lock();
- try {
- inBlocking.trigger();
-
- // put this thread to sleep indefinitely
- final Object o = new Object();
- while (true) {
- synchronized (o) {
- o.wait();
- }
- }
- }
- catch (InterruptedException e) {
- // exit cleanly, simply reset the interruption flag
- Thread.currentThread().interrupt();
- }
- finally {
- lock.unlock();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/085d4db8/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java
new file mode 100644
index 0000000..2ea1622
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.core.testutils.MultiShotLatch;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyLong;
+import static org.powermock.api.mockito.PowerMockito.doAnswer;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+/**
+ * Unit tests for the {@link Kafka010Fetcher}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(KafkaConsumerThread.class)
+public class Kafka010FetcherTest {
+
+ @Test
+ public void testCommitDoesNotBlock() throws Exception {
+
+ // test data
+ final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42);
+ final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>();
+ testCommitData.put(testPartition, 11L);
+
+ // to synchronize when the consumer is in its blocking method
+ final OneShotLatch sync = new OneShotLatch();
+
+ // ----- the mock consumer with blocking poll calls ----
+ final MultiShotLatch blockerLatch = new MultiShotLatch();
+
+ KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
+ when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
+
+ @Override
+ public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException {
+ sync.trigger();
+ blockerLatch.await();
+ return ConsumerRecords.empty();
+ }
+ });
+
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) {
+ blockerLatch.trigger();
+ return null;
+ }
+ }).when(mockConsumer).wakeup();
+
+ // make sure the fetcher creates the mock consumer
+ whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+ // ----- create the test fetcher -----
+
+ @SuppressWarnings("unchecked")
+ SourceContext<String> sourceContext = mock(SourceContext.class);
+ Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
+ Collections.singletonMap(new KafkaTopicPartition("test", 42), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
+ KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
+
+ final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
+ sourceContext,
+ partitionsWithInitialOffsets,
+ null, /* periodic assigner */
+ null, /* punctuated assigner */
+ new TestProcessingTimeService(),
+ 10,
+ getClass().getClassLoader(),
+ "taskname-with-subtask",
+ new UnregisteredMetricsGroup(),
+ schema,
+ new Properties(),
+ 0L,
+ false);
+
+ // ----- run the fetcher -----
+
+ final AtomicReference<Throwable> error = new AtomicReference<>();
+ final Thread fetcherRunner = new Thread("fetcher runner") {
+
+ @Override
+ public void run() {
+ try {
+ fetcher.runFetchLoop();
+ } catch (Throwable t) {
+ error.set(t);
+ }
+ }
+ };
+ fetcherRunner.start();
+
+ // wait until the fetcher has reached the method of interest
+ sync.await();
+
+ // ----- trigger the offset commit -----
+
+ final AtomicReference<Throwable> commitError = new AtomicReference<>();
+ final Thread committer = new Thread("committer runner") {
+ @Override
+ public void run() {
+ try {
+ fetcher.commitInternalOffsetsToKafka(testCommitData);
+ } catch (Throwable t) {
+ commitError.set(t);
+ }
+ }
+ };
+ committer.start();
+
+ // ----- ensure that the committer finishes in time -----
+ committer.join(30000);
+ assertFalse("The committer did not finish in time", committer.isAlive());
+
+ // ----- test done, wait till the fetcher is done for a clean shutdown -----
+ fetcher.cancel();
+ fetcherRunner.join();
+
+ // check that there were no errors in the fetcher
+ final Throwable fetcherError = error.get();
+ if (fetcherError != null && !(fetcherError instanceof Handover.ClosedException)) {
+ throw new Exception("Exception in the fetcher", fetcherError);
+ }
+
+ final Throwable committerError = commitError.get();
+ if (committerError != null) {
+ throw new Exception("Exception in the committer", committerError);
+ }
+ }
+
+ @Test
+ public void ensureOffsetsGetCommitted() throws Exception {
+
+ // test data
+ final KafkaTopicPartition testPartition1 = new KafkaTopicPartition("test", 42);
+ final KafkaTopicPartition testPartition2 = new KafkaTopicPartition("another", 99);
+
+ final Map<KafkaTopicPartition, Long> testCommitData1 = new HashMap<>();
+ testCommitData1.put(testPartition1, 11L);
+ testCommitData1.put(testPartition2, 18L);
+
+ final Map<KafkaTopicPartition, Long> testCommitData2 = new HashMap<>();
+ testCommitData2.put(testPartition1, 19L);
+ testCommitData2.put(testPartition2, 28L);
+
+ final BlockingQueue<Map<TopicPartition, OffsetAndMetadata>> commitStore = new LinkedBlockingQueue<>();
+
+ // ----- the mock consumer with poll(), wakeup(), and commit(A)sync calls ----
+
+ final MultiShotLatch blockerLatch = new MultiShotLatch();
+
+ KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
+
+ when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
+ @Override
+ public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException {
+ blockerLatch.await();
+ return ConsumerRecords.empty();
+ }
+ });
+
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) {
+ blockerLatch.trigger();
+ return null;
+ }
+ }).when(mockConsumer).wakeup();
+
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) {
+ @SuppressWarnings("unchecked")
+ Map<TopicPartition, OffsetAndMetadata> offsets =
+ (Map<TopicPartition, OffsetAndMetadata>) invocation.getArguments()[0];
+
+ OffsetCommitCallback callback = (OffsetCommitCallback) invocation.getArguments()[1];
+
+ commitStore.add(offsets);
+ callback.onComplete(offsets, null);
+
+ return null;
+ }
+ }).when(mockConsumer).commitAsync(
+ Mockito.<Map<TopicPartition, OffsetAndMetadata>>any(), any(OffsetCommitCallback.class));
+
+ // make sure the fetcher creates the mock consumer
+ whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+ // ----- create the test fetcher -----
+
+ @SuppressWarnings("unchecked")
+ SourceContext<String> sourceContext = mock(SourceContext.class);
+ Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
+ Collections.singletonMap(new KafkaTopicPartition("test", 42), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
+ KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
+
+ final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
+ sourceContext,
+ partitionsWithInitialOffsets,
+ null, /* periodic assigner */
+ null, /* punctuated assigner */
+ new TestProcessingTimeService(),
+ 10,
+ getClass().getClassLoader(),
+ "taskname-with-subtask",
+ new UnregisteredMetricsGroup(),
+ schema,
+ new Properties(),
+ 0L,
+ false);
+
+ // ----- run the fetcher -----
+
+ final AtomicReference<Throwable> error = new AtomicReference<>();
+ final Thread fetcherRunner = new Thread("fetcher runner") {
+
+ @Override
+ public void run() {
+ try {
+ fetcher.runFetchLoop();
+ } catch (Throwable t) {
+ error.set(t);
+ }
+ }
+ };
+ fetcherRunner.start();
+
+ // ----- trigger the first offset commit -----
+
+ fetcher.commitInternalOffsetsToKafka(testCommitData1);
+ Map<TopicPartition, OffsetAndMetadata> result1 = commitStore.take();
+
+ for (Entry<TopicPartition, OffsetAndMetadata> entry : result1.entrySet()) {
+ TopicPartition partition = entry.getKey();
+ if (partition.topic().equals("test")) {
+ assertEquals(42, partition.partition());
+ assertEquals(12L, entry.getValue().offset());
+ }
+ else if (partition.topic().equals("another")) {
+ assertEquals(99, partition.partition());
+ assertEquals(18L, entry.getValue().offset());
+ }
+ }
+
+ // ----- trigger the second offset commit -----
+
+ fetcher.commitInternalOffsetsToKafka(testCommitData2);
+ Map<TopicPartition, OffsetAndMetadata> result2 = commitStore.take();
+
+ for (Entry<TopicPartition, OffsetAndMetadata> entry : result2.entrySet()) {
+ TopicPartition partition = entry.getKey();
+ if (partition.topic().equals("test")) {
+ assertEquals(42, partition.partition());
+ assertEquals(20L, entry.getValue().offset());
+ }
+ else if (partition.topic().equals("another")) {
+ assertEquals(99, partition.partition());
+ assertEquals(28L, entry.getValue().offset());
+ }
+ }
+
+ // ----- test done, wait till the fetcher is done for a clean shutdown -----
+ fetcher.cancel();
+ fetcherRunner.join();
+
+ // check that there were no errors in the fetcher
+ final Throwable caughtError = error.get();
+ if (caughtError != null && !(caughtError instanceof Handover.ClosedException)) {
+ throw new Exception("Exception in the fetcher", caughtError);
+ }
+ }
+
+ @Test
+ public void testCancellationWhenEmitBlocks() throws Exception {
+
+ // ----- some test data -----
+
+ final String topic = "test-topic";
+ final int partition = 3;
+ final byte[] payload = new byte[] {1, 2, 3, 4};
+
+ final List<ConsumerRecord<byte[], byte[]>> records = Arrays.asList(
+ new ConsumerRecord<byte[], byte[]>(topic, partition, 15, payload, payload),
+ new ConsumerRecord<byte[], byte[]>(topic, partition, 16, payload, payload),
+ new ConsumerRecord<byte[], byte[]>(topic, partition, 17, payload, payload));
+
+ final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> data = new HashMap<>();
+ data.put(new TopicPartition(topic, partition), records);
+
+ final ConsumerRecords<byte[], byte[]> consumerRecords = new ConsumerRecords<>(data);
+
+ // ----- the test consumer -----
+
+ final KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
+ when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
+ @Override
+ public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) {
+ return consumerRecords;
+ }
+ });
+
+ whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+ // ----- build a fetcher -----
+
+ BlockingSourceContext<String> sourceContext = new BlockingSourceContext<>();
+ Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
+ Collections.singletonMap(new KafkaTopicPartition(topic, partition), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
+ KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
+
+ final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
+ sourceContext,
+ partitionsWithInitialOffsets,
+ null, /* periodic watermark extractor */
+ null, /* punctuated watermark extractor */
+ new TestProcessingTimeService(),
+ 10, /* watermark interval */
+ this.getClass().getClassLoader(),
+ "task_name",
+ new UnregisteredMetricsGroup(),
+ schema,
+ new Properties(),
+ 0L,
+ false);
+
+ // ----- run the fetcher -----
+
+ final AtomicReference<Throwable> error = new AtomicReference<>();
+ final Thread fetcherRunner = new Thread("fetcher runner") {
+
+ @Override
+ public void run() {
+ try {
+ fetcher.runFetchLoop();
+ } catch (Throwable t) {
+ error.set(t);
+ }
+ }
+ };
+ fetcherRunner.start();
+
+ // wait until the thread started to emit records to the source context
+ sourceContext.waitTillHasBlocker();
+
+ // now we try to cancel the fetcher, including the interruption usually done on the task thread
+ // once it has finished, there must be no more thread blocked on the source context
+ fetcher.cancel();
+ fetcherRunner.interrupt();
+ fetcherRunner.join();
+
+ assertFalse("fetcher threads did not properly finish", sourceContext.isStillBlocking());
+ }
+
+ // ------------------------------------------------------------------------
+ // test utilities
+ // ------------------------------------------------------------------------
+
+ private static final class BlockingSourceContext<T> implements SourceContext<T> {
+
+ private final ReentrantLock lock = new ReentrantLock();
+ private final OneShotLatch inBlocking = new OneShotLatch();
+
+ @Override
+ public void collect(T element) {
+ block();
+ }
+
+ @Override
+ public void collectWithTimestamp(T element, long timestamp) {
+ block();
+ }
+
+ @Override
+ public void emitWatermark(Watermark mark) {
+ block();
+ }
+
+ @Override
+ public void markAsTemporarilyIdle() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Object getCheckpointLock() {
+ return new Object();
+ }
+
+ @Override
+ public void close() {}
+
+ public void waitTillHasBlocker() throws InterruptedException {
+ inBlocking.await();
+ }
+
+ public boolean isStillBlocking() {
+ return lock.isLocked();
+ }
+
+ @SuppressWarnings({"InfiniteLoopStatement", "SynchronizationOnLocalVariableOrMethodParameter"})
+ private void block() {
+ lock.lock();
+ try {
+ inBlocking.trigger();
+
+ // put this thread to sleep indefinitely
+ final Object o = new Object();
+ while (true) {
+ synchronized (o) {
+ o.wait();
+ }
+ }
+ }
+ catch (InterruptedException e) {
+ // exit cleanly, simply reset the interruption flag
+ Thread.currentThread().interrupt();
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/085d4db8/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
index 6c7b94d..c4cd3e7 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
@@ -22,38 +22,26 @@ import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer;
import org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.Kafka08PartitionDiscoverer;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
-import org.apache.flink.util.NetUtils;
import org.apache.flink.util.PropertiesUtil;
import org.apache.flink.util.SerializedValue;
-import kafka.cluster.Broker;
-import kafka.common.ErrorMapping;
-import kafka.javaapi.PartitionMetadata;
-import kafka.javaapi.TopicMetadata;
-import kafka.javaapi.TopicMetadataRequest;
-import kafka.javaapi.consumer.SimpleConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.Node;
-import java.net.InetAddress;
-import java.net.URL;
-import java.net.UnknownHostException;
-import java.nio.channels.ClosedChannelException;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.Random;
import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.PropertiesUtil.getInt;
+import static org.apache.flink.util.PropertiesUtil.getLong;
/**
* The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from
@@ -172,9 +160,12 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
* The properties that are used to configure both the fetcher and the offset handler.
*/
public FlinkKafkaConsumer08(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) {
- super(topics, deserializer);
+ super(
+ topics,
+ null,
+ deserializer,
+ getLong(props, KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED));
- checkNotNull(topics, "topics");
this.kafkaProperties = checkNotNull(props, "props");
// validate the zookeeper properties
@@ -212,22 +203,12 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
}
@Override
- protected List<KafkaTopicPartition> getKafkaPartitions(List<String> topics) {
- // Connect to a broker to get the partitions for all topics
- List<KafkaTopicPartition> partitionInfos =
- KafkaTopicPartition.dropLeaderData(getPartitionsForTopic(topics, kafkaProperties));
-
- if (partitionInfos.size() == 0) {
- throw new RuntimeException(
- "Unable to retrieve any partitions for the requested topics " + topics +
- ". Please check previous log entries");
- }
-
- if (LOG.isInfoEnabled()) {
- logPartitionInfo(LOG, partitionInfos);
- }
+ protected AbstractPartitionDiscoverer createPartitionDiscoverer(
+ KafkaTopicsDescriptor topicsDescriptor,
+ int indexOfThisSubtask,
+ int numParallelSubtasks) {
- return partitionInfos;
+ return new Kafka08PartitionDiscoverer(topicsDescriptor, indexOfThisSubtask, numParallelSubtasks, kafkaProperties);
}
@Override
@@ -237,104 +218,12 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
}
// ------------------------------------------------------------------------
- // Kafka / ZooKeeper communication utilities
+ // Kafka / ZooKeeper configuration utilities
// ------------------------------------------------------------------------
/**
- * Send request to Kafka to get partitions for topic.
- *
- * @param topics The name of the topics.
- * @param properties The properties for the Kafka Consumer that is used to query the partitions for the topic.
- */
- public static List<KafkaTopicPartitionLeader> getPartitionsForTopic(List<String> topics, Properties properties) {
- String seedBrokersConfString = properties.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
- final int numRetries = getInt(properties, GET_PARTITIONS_RETRIES_KEY, DEFAULT_GET_PARTITIONS_RETRIES);
-
- checkNotNull(seedBrokersConfString, "Configuration property %s not set", ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
- String[] seedBrokers = seedBrokersConfString.split(",");
- List<KafkaTopicPartitionLeader> partitions = new ArrayList<>();
-
- final String clientId = "flink-kafka-consumer-partition-lookup";
- final int soTimeout = getInt(properties, "socket.timeout.ms", 30000);
- final int bufferSize = getInt(properties, "socket.receive.buffer.bytes", 65536);
-
- Random rnd = new Random();
- retryLoop: for (int retry = 0; retry < numRetries; retry++) {
- // we pick a seed broker randomly to avoid overloading the first broker with all the requests when the
- // parallel source instances start. Still, we try all available brokers.
- int index = rnd.nextInt(seedBrokers.length);
- brokersLoop: for (int arrIdx = 0; arrIdx < seedBrokers.length; arrIdx++) {
- String seedBroker = seedBrokers[index];
- LOG.info("Trying to get topic metadata from broker {} in try {}/{}", seedBroker, retry, numRetries);
- if (++index == seedBrokers.length) {
- index = 0;
- }
-
- URL brokerUrl = NetUtils.getCorrectHostnamePort(seedBroker);
- SimpleConsumer consumer = null;
- try {
- consumer = new SimpleConsumer(brokerUrl.getHost(), brokerUrl.getPort(), soTimeout, bufferSize, clientId);
-
- TopicMetadataRequest req = new TopicMetadataRequest(topics);
- kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
-
- List<TopicMetadata> metaData = resp.topicsMetadata();
-
- // clear in case we have an incomplete list from previous tries
- partitions.clear();
- for (TopicMetadata item : metaData) {
- if (item.errorCode() != ErrorMapping.NoError()) {
- // warn and try more brokers
- LOG.warn("Error while getting metadata from broker " + seedBroker + " to find partitions " +
- "for " + topics.toString() + ". Error: " + ErrorMapping.exceptionFor(item.errorCode()).getMessage());
- continue brokersLoop;
- }
- if (!topics.contains(item.topic())) {
- LOG.warn("Received metadata from topic " + item.topic() + " even though it was not requested. Skipping ...");
- continue brokersLoop;
- }
- for (PartitionMetadata part : item.partitionsMetadata()) {
- Node leader = brokerToNode(part.leader());
- KafkaTopicPartition ktp = new KafkaTopicPartition(item.topic(), part.partitionId());
- KafkaTopicPartitionLeader pInfo = new KafkaTopicPartitionLeader(ktp, leader);
- partitions.add(pInfo);
- }
- }
- break retryLoop; // leave the loop through the brokers
- }
- catch (Exception e) {
- //validates seed brokers in case of a ClosedChannelException
- validateSeedBrokers(seedBrokers, e);
- LOG.warn("Error communicating with broker {} to find partitions for {}. {} Message: {}",
- seedBroker, topics, e.getClass().getName(), e.getMessage());
- LOG.debug("Detailed trace", e);
- // we sleep a bit. Retrying immediately doesn't make sense in cases where Kafka is reorganizing the leader metadata
- try {
- Thread.sleep(500);
- } catch (InterruptedException e1) {
- // sleep shorter.
- }
- } finally {
- if (consumer != null) {
- consumer.close();
- }
- }
- } // brokers loop
- } // retries loop
- return partitions;
- }
-
- /**
- * Turn a broker instance into a node instance.
- * @param broker broker instance
- * @return Node representing the given broker
- */
- private static Node brokerToNode(Broker broker) {
- return new Node(broker.id(), broker.host(), broker.port());
- }
-
- /**
* Validate the ZK configuration, checking for required parameters.
+ *
* @param props Properties to check
*/
protected static void validateZooKeeperConfig(Properties props) {
@@ -364,36 +253,6 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
}
/**
- * Validate that at least one seed broker is valid in case of a
- * ClosedChannelException.
- *
- * @param seedBrokers
- * array containing the seed brokers e.g. ["host1:port1",
- * "host2:port2"]
- * @param exception
- * instance
- */
- private static void validateSeedBrokers(String[] seedBrokers, Exception exception) {
- if (!(exception instanceof ClosedChannelException)) {
- return;
- }
- int unknownHosts = 0;
- for (String broker : seedBrokers) {
- URL brokerUrl = NetUtils.getCorrectHostnamePort(broker.trim());
- try {
- InetAddress.getByName(brokerUrl.getHost());
- } catch (UnknownHostException e) {
- unknownHosts++;
- }
- }
- // throw meaningful exception if all the provided hosts are invalid
- if (unknownHosts == seedBrokers.length) {
- throw new IllegalArgumentException("All the servers provided in: '"
- + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + "' config are invalid. (unknown hosts)");
- }
- }
-
- /**
* Check for invalid "auto.offset.reset" values. Should be called in constructor for eager checking before submitting
* the job. Note that 'none' is also considered invalid, as we don't want to deliberately throw an exception
* right after a task is started.
http://git-wip-us.apache.org/repos/asf/flink/blob/085d4db8/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
deleted file mode 100644
index da61dd0..0000000
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
+++ /dev/null
@@ -1,506 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * A special form of blocking queue with two additions:
- * <ol>
- * <li>The queue can be closed atomically when empty. Adding elements after the queue
- * is closed fails. This allows queue consumers to atomically discover that no elements
- * are available and mark themselves as shut down.</li>
- * <li>The queue allows to poll batches of elements in one polling call.</li>
- * </ol>
- *
- * <p>The queue has no capacity restriction and is safe for multiple producers and consumers.
- *
- * <p>Note: Null elements are prohibited.
- *
- * @param <E> The type of elements in the queue.
- */
-public class ClosableBlockingQueue<E> {
-
- /** The lock used to make queue accesses and open checks atomic. */
- private final ReentrantLock lock;
-
- /** The condition on which blocking get-calls wait if the queue is empty. */
- private final Condition nonEmpty;
-
- /** The deque of elements. */
- private final ArrayDeque<E> elements;
-
- /** Flag marking the status of the queue. */
- private volatile boolean open;
-
- // ------------------------------------------------------------------------
-
- /**
- * Creates a new empty queue.
- */
- public ClosableBlockingQueue() {
- this(10);
- }
-
- /**
- * Creates a new empty queue, reserving space for at least the specified number
- * of elements. The queu can still grow, of more elements are added than the
- * reserved space.
- *
- * @param initialSize The number of elements to reserve space for.
- */
- public ClosableBlockingQueue(int initialSize) {
- this.lock = new ReentrantLock(true);
- this.nonEmpty = this.lock.newCondition();
-
- this.elements = new ArrayDeque<>(initialSize);
- this.open = true;
-
- }
-
- /**
- * Creates a new queue that contains the given elements.
- *
- * @param initialElements The elements to initially add to the queue.
- */
- public ClosableBlockingQueue(Collection<? extends E> initialElements) {
- this(initialElements.size());
- this.elements.addAll(initialElements);
- }
-
- // ------------------------------------------------------------------------
- // Size and status
- // ------------------------------------------------------------------------
-
- /**
- * Gets the number of elements currently in the queue.
- * @return The number of elements currently in the queue.
- */
- public int size() {
- lock.lock();
- try {
- return elements.size();
- } finally {
- lock.unlock();
- }
- }
-
- /**
- * Checks whether the queue is empty (has no elements).
- * @return True, if the queue is empty; false, if it is non-empty.
- */
- public boolean isEmpty() {
- return size() == 0;
- }
-
- /**
- * Checks whether the queue is currently open, meaning elements can be added and polled.
- * @return True, if the queue is open; false, if it is closed.
- */
- public boolean isOpen() {
- return open;
- }
-
- /**
- * Tries to close the queue. Closing the queue only succeeds when no elements are
- * in the queue when this method is called. Checking whether the queue is empty, and
- * marking the queue as closed is one atomic operation.
- *
- * @return True, if the queue is closed, false if the queue remains open.
- */
- public boolean close() {
- lock.lock();
- try {
- if (open) {
- if (elements.isEmpty()) {
- open = false;
- nonEmpty.signalAll();
- return true;
- } else {
- return false;
- }
- }
- else {
- // already closed
- return true;
- }
- } finally {
- lock.unlock();
- }
- }
-
- // ------------------------------------------------------------------------
- // Adding / Removing elements
- // ------------------------------------------------------------------------
-
- /**
- * Tries to add an element to the queue, if the queue is still open. Checking whether the queue
- * is open and adding the element is one atomic operation.
- *
- * <p>Unlike the {@link #add(Object)} method, this method never throws an exception,
- * but only indicates via the return code if the element was added or the
- * queue was closed.
- *
- * @param element The element to add.
- * @return True, if the element was added, false if the queue was closes.
- */
- public boolean addIfOpen(E element) {
- requireNonNull(element);
-
- lock.lock();
- try {
- if (open) {
- elements.addLast(element);
- if (elements.size() == 1) {
- nonEmpty.signalAll();
- }
- }
- return open;
- } finally {
- lock.unlock();
- }
- }
-
- /**
- * Adds the element to the queue, or fails with an exception, if the queue is closed.
- * Checking whether the queue is open and adding the element is one atomic operation.
- *
- * @param element The element to add.
- * @throws IllegalStateException Thrown, if the queue is closed.
- */
- public void add(E element) throws IllegalStateException {
- requireNonNull(element);
-
- lock.lock();
- try {
- if (open) {
- elements.addLast(element);
- if (elements.size() == 1) {
- nonEmpty.signalAll();
- }
- } else {
- throw new IllegalStateException("queue is closed");
- }
- } finally {
- lock.unlock();
- }
- }
-
- /**
- * Returns the queue's next element without removing it, if the queue is non-empty.
- * Otherwise, returns null.
- *
- * <p>The method throws an {@code IllegalStateException} if the queue is closed.
- * Checking whether the queue is open and getting the next element is one atomic operation.
- *
- * <p>This method never blocks.
- *
- * @return The queue's next element, or null, if the queue is empty.
- * @throws IllegalStateException Thrown, if the queue is closed.
- */
- public E peek() {
- lock.lock();
- try {
- if (open) {
- if (elements.size() > 0) {
- return elements.getFirst();
- } else {
- return null;
- }
- } else {
- throw new IllegalStateException("queue is closed");
- }
- } finally {
- lock.unlock();
- }
- }
-
- /**
- * Returns the queue's next element and removes it, the queue is non-empty.
- * Otherwise, this method returns null.
- *
- * <p>The method throws an {@code IllegalStateException} if the queue is closed.
- * Checking whether the queue is open and removing the next element is one atomic operation.
- *
- * <p>This method never blocks.
- *
- * @return The queue's next element, or null, if the queue is empty.
- * @throws IllegalStateException Thrown, if the queue is closed.
- */
- public E poll() {
- lock.lock();
- try {
- if (open) {
- if (elements.size() > 0) {
- return elements.removeFirst();
- } else {
- return null;
- }
- } else {
- throw new IllegalStateException("queue is closed");
- }
- } finally {
- lock.unlock();
- }
- }
-
- /**
- * Returns all of the queue's current elements in a list, if the queue is non-empty.
- * Otherwise, this method returns null.
- *
- * <p>The method throws an {@code IllegalStateException} if the queue is closed.
- * Checking whether the queue is open and removing the elements is one atomic operation.
- *
- * <p>This method never blocks.
- *
- * @return All of the queue's elements, or null, if the queue is empty.
- * @throws IllegalStateException Thrown, if the queue is closed.
- */
- public List<E> pollBatch() {
- lock.lock();
- try {
- if (open) {
- if (elements.size() > 0) {
- ArrayList<E> result = new ArrayList<>(elements);
- elements.clear();
- return result;
- } else {
- return null;
- }
- } else {
- throw new IllegalStateException("queue is closed");
- }
- } finally {
- lock.unlock();
- }
- }
-
- /**
- * Returns the next element in the queue. If the queue is empty, this method
- * waits until at least one element is added.
- *
- * <p>The method throws an {@code IllegalStateException} if the queue is closed.
- * Checking whether the queue is open and removing the next element is one atomic operation.
- *
- * @return The next element in the queue, never null.
- *
- * @throws IllegalStateException Thrown, if the queue is closed.
- * @throws InterruptedException Throw, if the thread is interrupted while waiting for an
- * element to be added.
- */
- public E getElementBlocking() throws InterruptedException {
- lock.lock();
- try {
- while (open && elements.isEmpty()) {
- nonEmpty.await();
- }
-
- if (open) {
- return elements.removeFirst();
- } else {
- throw new IllegalStateException("queue is closed");
- }
- } finally {
- lock.unlock();
- }
- }
-
- /**
- * Returns the next element in the queue. If the queue is empty, this method
- * waits at most a certain time until an element becomes available. If no element
- * is available after that time, the method returns null.
- *
- * <p>The method throws an {@code IllegalStateException} if the queue is closed.
- * Checking whether the queue is open and removing the next element is one atomic operation.
- *
- * @param timeoutMillis The number of milliseconds to block, at most.
- * @return The next element in the queue, or null, if the timeout expires before an element is available.
- *
- * @throws IllegalStateException Thrown, if the queue is closed.
- * @throws InterruptedException Throw, if the thread is interrupted while waiting for an
- * element to be added.
- */
- public E getElementBlocking(long timeoutMillis) throws InterruptedException {
- if (timeoutMillis == 0L) {
- // wait forever case
- return getElementBlocking();
- } else if (timeoutMillis < 0L) {
- throw new IllegalArgumentException("invalid timeout");
- }
-
- final long deadline = System.nanoTime() + timeoutMillis * 1_000_000L;
-
- lock.lock();
- try {
- while (open && elements.isEmpty() && timeoutMillis > 0) {
- nonEmpty.await(timeoutMillis, TimeUnit.MILLISECONDS);
- timeoutMillis = (deadline - System.nanoTime()) / 1_000_000L;
- }
-
- if (!open) {
- throw new IllegalStateException("queue is closed");
- }
- else if (elements.isEmpty()) {
- return null;
- } else {
- return elements.removeFirst();
- }
- } finally {
- lock.unlock();
- }
- }
-
- /**
- * Gets all the elements found in the list, or blocks until at least one element
- * was added. If the queue is empty when this method is called, it blocks until
- * at least one element is added.
- *
- * <p>This method always returns a list with at least one element.
- *
- * <p>The method throws an {@code IllegalStateException} if the queue is closed.
- * Checking whether the queue is open and removing the next element is one atomic operation.
- *
- * @return A list with all elements in the queue, always at least one element.
- *
- * @throws IllegalStateException Thrown, if the queue is closed.
- * @throws InterruptedException Throw, if the thread is interrupted while waiting for an
- * element to be added.
- */
- public List<E> getBatchBlocking() throws InterruptedException {
- lock.lock();
- try {
- while (open && elements.isEmpty()) {
- nonEmpty.await();
- }
- if (open) {
- ArrayList<E> result = new ArrayList<>(elements);
- elements.clear();
- return result;
- } else {
- throw new IllegalStateException("queue is closed");
- }
- } finally {
- lock.unlock();
- }
- }
-
- /**
- * Gets all the elements found in the list, or blocks until at least one element
- * was added. This method is similar as {@link #getBatchBlocking()}, but takes
- * a number of milliseconds that the method will maximally wait before returning.
- *
- * <p>This method never returns null, but an empty list, if the queue is empty when
- * the method is called and the request times out before an element was added.
- *
- * <p>The method throws an {@code IllegalStateException} if the queue is closed.
- * Checking whether the queue is open and removing the next element is one atomic operation.
- *
- * @param timeoutMillis The number of milliseconds to wait, at most.
- * @return A list with all elements in the queue, possible an empty list.
- *
- * @throws IllegalStateException Thrown, if the queue is closed.
- * @throws InterruptedException Throw, if the thread is interrupted while waiting for an
- * element to be added.
- */
- public List<E> getBatchBlocking(long timeoutMillis) throws InterruptedException {
- if (timeoutMillis == 0L) {
- // wait forever case
- return getBatchBlocking();
- } else if (timeoutMillis < 0L) {
- throw new IllegalArgumentException("invalid timeout");
- }
-
- final long deadline = System.nanoTime() + timeoutMillis * 1_000_000L;
-
- lock.lock();
- try {
- while (open && elements.isEmpty() && timeoutMillis > 0) {
- nonEmpty.await(timeoutMillis, TimeUnit.MILLISECONDS);
- timeoutMillis = (deadline - System.nanoTime()) / 1_000_000L;
- }
-
- if (!open) {
- throw new IllegalStateException("queue is closed");
- }
- else if (elements.isEmpty()) {
- return Collections.emptyList();
- }
- else {
- ArrayList<E> result = new ArrayList<>(elements);
- elements.clear();
- return result;
- }
- } finally {
- lock.unlock();
- }
- }
-
- // ------------------------------------------------------------------------
- // Standard Utilities
- // ------------------------------------------------------------------------
-
- @Override
- public int hashCode() {
- int hashCode = 17;
- for (E element : elements) {
- hashCode = 31 * hashCode + element.hashCode();
- }
- return hashCode;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == this) {
- return true;
- } else if (obj != null && obj.getClass() == ClosableBlockingQueue.class) {
- @SuppressWarnings("unchecked")
- ClosableBlockingQueue<E> that = (ClosableBlockingQueue<E>) obj;
-
- if (this.elements.size() == that.elements.size()) {
- Iterator<E> thisElements = this.elements.iterator();
- for (E thatNext : that.elements) {
- E thisNext = thisElements.next();
- if (!(thisNext == null ? thatNext == null : thisNext.equals(thatNext))) {
- return false;
- }
- }
- return true;
- } else {
- return false;
- }
- } else {
- return false;
- }
- }
-
- @Override
- public String toString() {
- return elements.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/085d4db8/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
index 659bbd7..aa7649c 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
@@ -72,9 +72,6 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
/** The subtask's runtime context. */
private final RuntimeContext runtimeContext;
- /** The queue of partitions that are currently not assigned to a broker connection. */
- private final ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> unassignedPartitionsQueue;
-
/** The behavior to use in case that an offset is not valid (any more) for a partition. */
private final long invalidOffsetBehavior;
@@ -89,7 +86,7 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
public Kafka08Fetcher(
SourceContext<T> sourceContext,
- Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
+ Map<KafkaTopicPartition, Long> seedPartitionsWithInitialOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
StreamingRuntimeContext runtimeContext,
@@ -99,7 +96,7 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
boolean useMetrics) throws Exception {
super(
sourceContext,
- assignedPartitionsWithInitialOffsets,
+ seedPartitionsWithInitialOffsets,
watermarksPeriodic,
watermarksPunctuated,
runtimeContext.getProcessingTimeService(),
@@ -112,12 +109,6 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
this.runtimeContext = runtimeContext;
this.invalidOffsetBehavior = getInvalidOffsetBehavior(kafkaProperties);
this.autoCommitInterval = autoCommitInterval;
- this.unassignedPartitionsQueue = new ClosableBlockingQueue<>();
-
- // initially, all these partitions are not assigned to a specific broker connection
- for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitionStates()) {
- unassignedPartitionsQueue.add(partition);
- }
}
// ------------------------------------------------------------------------
@@ -172,8 +163,11 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
if (autoCommitInterval > 0) {
LOG.info("Starting periodic offset committer, with commit interval of {}ms", autoCommitInterval);
- periodicCommitter = new PeriodicOffsetCommitter(zookeeperOffsetHandler,
- subscribedPartitionStates(), errorHandler, autoCommitInterval);
+ periodicCommitter = new PeriodicOffsetCommitter(
+ zookeeperOffsetHandler,
+ subscribedPartitionStates(),
+ errorHandler,
+ autoCommitInterval);
periodicCommitter.setName("Periodic Kafka partition offset committer");
periodicCommitter.setDaemon(true);
periodicCommitter.start();
@@ -343,7 +337,7 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
// ------------------------------------------------------------------------
@Override
- public TopicAndPartition createKafkaPartitionHandle(KafkaTopicPartition partition) {
+ protected TopicAndPartition createKafkaPartitionHandle(KafkaTopicPartition partition) {
return new TopicAndPartition(partition.getTopic(), partition.getPartition());
}
@@ -369,8 +363,7 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
}
// Set committed offsets in topic partition state
- KafkaTopicPartitionState<TopicAndPartition>[] partitions = subscribedPartitionStates();
- for (KafkaTopicPartitionState<TopicAndPartition> partition : partitions) {
+ for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitionStates()) {
Long offset = offsets.get(partition.getKafkaTopicPartition());
if (offset != null) {
partition.setCommittedOffset(offset);
http://git-wip-us.apache.org/repos/asf/flink/blob/085d4db8/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08PartitionDiscoverer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08PartitionDiscoverer.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08PartitionDiscoverer.java
new file mode 100644
index 0000000..9730114
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08PartitionDiscoverer.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.apache.flink.util.NetUtils;
+
+import kafka.cluster.Broker;
+import kafka.common.ErrorMapping;
+import kafka.javaapi.PartitionMetadata;
+import kafka.javaapi.TopicMetadata;
+import kafka.javaapi.TopicMetadataRequest;
+import kafka.javaapi.consumer.SimpleConsumer;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.nio.channels.ClosedChannelException;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.DEFAULT_GET_PARTITIONS_RETRIES;
+import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.GET_PARTITIONS_RETRIES_KEY;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.PropertiesUtil.getInt;
+
+/**
+ * A partition discoverer that can be used to discover topics and partitions metadata
+ * from Kafka brokers via the Kafka 0.8 low-level consumer API.
+ */
+public class Kafka08PartitionDiscoverer extends AbstractPartitionDiscoverer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Kafka08PartitionDiscoverer.class);
+
+ private static final String dummyClientId = "flink-kafka-consumer-partition-lookup";
+
+ /** All seed broker addresses. */
+ private final String[] seedBrokerAddresses;
+
+ /** Configuration for the Kafka client. */
+ private final int numRetries;
+ private final int soTimeout;
+ private final int bufferSize;
+
+ /**
+ * The current seed broker address to use.
+ * Each subtask starts with an assigned seed broker using round-robin assigning.
+ * If this subtask fails in any one of the fetch attempts, the next address in the seed brokers list
+ * will be used.
+ */
+ private int currentContactSeedBrokerIndex;
+
+ /** Low-level consumer used to fetch topics and partitions metadata. */
+ private SimpleConsumer consumer;
+
+ public Kafka08PartitionDiscoverer(
+ KafkaTopicsDescriptor topicsDescriptor,
+ int indexOfThisSubtask,
+ int numParallelSubtasks,
+ Properties kafkaProperties) {
+
+ super(topicsDescriptor, indexOfThisSubtask, numParallelSubtasks);
+
+ checkNotNull(kafkaProperties);
+
+ String seedBrokersConfString = kafkaProperties.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
+ checkArgument(seedBrokersConfString != null && !seedBrokersConfString.isEmpty(),
+ "Configuration property %s not set", ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
+
+ this.seedBrokerAddresses = seedBrokersConfString.split(",");
+
+ // evenly distribute seed brokers across subtasks, to
+ // avoid too much pressure on a single broker on startup
+ this.currentContactSeedBrokerIndex = indexOfThisSubtask % seedBrokerAddresses.length;
+
+ this.numRetries = getInt(kafkaProperties, GET_PARTITIONS_RETRIES_KEY, DEFAULT_GET_PARTITIONS_RETRIES);
+ this.soTimeout = getInt(kafkaProperties, "socket.timeout.ms", 30000);
+ this.bufferSize = getInt(kafkaProperties, "socket.receive.buffer.bytes", 65536);
+ }
+
+ @Override
+ protected void initializeConnections() {
+ URL contactUrl = NetUtils.getCorrectHostnamePort(seedBrokerAddresses[currentContactSeedBrokerIndex]);
+ this.consumer = new SimpleConsumer(contactUrl.getHost(), contactUrl.getPort(), soTimeout, bufferSize, dummyClientId);
+ }
+
+ @Override
+ protected List<String> getAllTopics() {
+ List<String> topics = new LinkedList<>();
+
+ retryLoop: for (int retry = 0; retry < numRetries; retry++) {
+ brokersLoop: for (int arrIdx = 0; arrIdx < seedBrokerAddresses.length; arrIdx++) {
+ LOG.info("Trying to get topic metadata from broker {} in try {}/{}", seedBrokerAddresses[currentContactSeedBrokerIndex], retry, numRetries);
+
+ try {
+ // clear in case we have an incomplete list from previous tries
+ topics.clear();
+
+ for (TopicMetadata item : consumer.send(new TopicMetadataRequest(Collections.<String>emptyList())).topicsMetadata()) {
+ if (item.errorCode() != ErrorMapping.NoError()) {
+ // warn and try more brokers
+ LOG.warn("Error while getting metadata from broker {} to find partitions for {}. Error: {}.",
+ seedBrokerAddresses[currentContactSeedBrokerIndex], topics.toString(), ErrorMapping.exceptionFor(item.errorCode()).getMessage());
+
+ useNextAddressAsNewContactSeedBroker();
+ continue brokersLoop;
+ }
+
+ topics.add(item.topic());
+ }
+ break retryLoop; // leave the loop through the brokers
+ }
+ catch (Exception e) {
+ //validates seed brokers in case of a ClosedChannelException
+ validateSeedBrokers(seedBrokerAddresses, e);
+ LOG.warn("Error communicating with broker {} to find partitions for {}. {} Message: {}",
+ seedBrokerAddresses[currentContactSeedBrokerIndex], topics, e.getClass().getName(), e.getMessage());
+ LOG.debug("Detailed trace", e);
+
+ // we sleep a bit. Retrying immediately doesn't make sense in cases where Kafka is reorganizing the leader metadata
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e1) {
+ // sleep shorter.
+ }
+
+ useNextAddressAsNewContactSeedBroker();
+ }
+ } // brokers loop
+ } // retries loop
+
+ return topics;
+ }
+
+ @Override
+ public List<KafkaTopicPartition> getAllPartitionsForTopics(List<String> topics) {
+ return KafkaTopicPartition.dropLeaderData(getPartitionLeadersForTopics(topics));
+ }
+
+ @Override
+ protected void wakeupConnections() {
+ // nothing to do, as Kafka 0.8's SimpleConsumer does not support wakeup
+ }
+
+ @Override
+ protected void closeConnections() throws Exception {
+ if (consumer != null) {
+ SimpleConsumer consumer = this.consumer;
+ consumer.close();
+
+ // de-reference the consumer to avoid closing multiple times
+ this.consumer = null;
+ }
+ }
+
+ /**
+ * Send request to Kafka to get partitions for topics.
+ *
+ * @param topics The name of the topics.
+ */
+ public List<KafkaTopicPartitionLeader> getPartitionLeadersForTopics(List<String> topics) {
+ List<KafkaTopicPartitionLeader> partitions = new LinkedList<>();
+
+ retryLoop: for (int retry = 0; retry < numRetries; retry++) {
+ brokersLoop: for (int arrIdx = 0; arrIdx < seedBrokerAddresses.length; arrIdx++) {
+ LOG.info("Trying to get topic metadata from broker {} in try {}/{}", seedBrokerAddresses[currentContactSeedBrokerIndex], retry, numRetries);
+
+ try {
+ // clear in case we have an incomplete list from previous tries
+ partitions.clear();
+
+ for (TopicMetadata item : consumer.send(new TopicMetadataRequest(topics)).topicsMetadata()) {
+ if (item.errorCode() != ErrorMapping.NoError()) {
+ // warn and try more brokers
+ LOG.warn("Error while getting metadata from broker {} to find partitions for {}. Error: {}.",
+ seedBrokerAddresses[currentContactSeedBrokerIndex], topics.toString(), ErrorMapping.exceptionFor(item.errorCode()).getMessage());
+
+ useNextAddressAsNewContactSeedBroker();
+ continue brokersLoop;
+ }
+
+ if (!topics.contains(item.topic())) {
+ LOG.warn("Received metadata from topic " + item.topic() + " even though it was not requested. Skipping ...");
+
+ useNextAddressAsNewContactSeedBroker();
+ continue brokersLoop;
+ }
+
+ for (PartitionMetadata part : item.partitionsMetadata()) {
+ Node leader = brokerToNode(part.leader());
+ KafkaTopicPartition ktp = new KafkaTopicPartition(item.topic(), part.partitionId());
+ KafkaTopicPartitionLeader pInfo = new KafkaTopicPartitionLeader(ktp, leader);
+ partitions.add(pInfo);
+ }
+ }
+ break retryLoop; // leave the loop through the brokers
+ }
+ catch (Exception e) {
+ //validates seed brokers in case of a ClosedChannelException
+ validateSeedBrokers(seedBrokerAddresses, e);
+ LOG.warn("Error communicating with broker {} to find partitions for {}. {} Message: {}",
+ seedBrokerAddresses[currentContactSeedBrokerIndex], topics, e.getClass().getName(), e.getMessage());
+ LOG.debug("Detailed trace", e);
+
+ // we sleep a bit. Retrying immediately doesn't make sense in cases where Kafka is reorganizing the leader metadata
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e1) {
+ // sleep shorter.
+ }
+
+ useNextAddressAsNewContactSeedBroker();
+ }
+ } // brokers loop
+ } // retries loop
+
+ return partitions;
+ }
+
+ /**
+ * Re-establish broker connection using the next available seed broker address.
+ */
+ private void useNextAddressAsNewContactSeedBroker() {
+ if (++currentContactSeedBrokerIndex == seedBrokerAddresses.length) {
+ currentContactSeedBrokerIndex = 0;
+ }
+
+ URL newContactUrl = NetUtils.getCorrectHostnamePort(seedBrokerAddresses[currentContactSeedBrokerIndex]);
+ this.consumer = new SimpleConsumer(newContactUrl.getHost(), newContactUrl.getPort(), soTimeout, bufferSize, dummyClientId);
+ }
+
+ /**
+ * Turn a broker instance into a node instance.
+ *
+ * @param broker broker instance
+ * @return Node representing the given broker
+ */
+ private static Node brokerToNode(Broker broker) {
+ return new Node(broker.id(), broker.host(), broker.port());
+ }
+
+ /**
+ * Validate that at least one seed broker is valid in case of a
+ * ClosedChannelException.
+ *
+ * @param seedBrokers
+ * array containing the seed brokers e.g. ["host1:port1",
+ * "host2:port2"]
+ * @param exception
+ * instance
+ */
+ private static void validateSeedBrokers(String[] seedBrokers, Exception exception) {
+ if (!(exception instanceof ClosedChannelException)) {
+ return;
+ }
+ int unknownHosts = 0;
+ for (String broker : seedBrokers) {
+ URL brokerUrl = NetUtils.getCorrectHostnamePort(broker.trim());
+ try {
+ InetAddress.getByName(brokerUrl.getHost());
+ } catch (UnknownHostException e) {
+ unknownHosts++;
+ }
+ }
+ // throw meaningful exception if all the provided hosts are invalid
+ if (unknownHosts == seedBrokers.length) {
+ throw new IllegalArgumentException("All the servers provided in: '"
+ + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + "' config are invalid. (unknown hosts)");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/085d4db8/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionInfoFetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionInfoFetcher.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionInfoFetcher.java
index ecf1378..836ed6b 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionInfoFetcher.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionInfoFetcher.java
@@ -18,31 +18,37 @@
package org.apache.flink.streaming.connectors.kafka.internals;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
-
import java.util.List;
import java.util.Properties;
class PartitionInfoFetcher extends Thread {
private final List<String> topics;
- private final Properties properties;
+ private final Kafka08PartitionDiscoverer partitionDiscoverer;
private volatile List<KafkaTopicPartitionLeader> result;
private volatile Throwable error;
PartitionInfoFetcher(List<String> topics, Properties properties) {
+ // we're only using partial functionality of the partition discoverer; the subtask id arguments doesn't matter
+ this.partitionDiscoverer = new Kafka08PartitionDiscoverer(new KafkaTopicsDescriptor(topics, null), 0, 1, properties);
this.topics = topics;
- this.properties = properties;
}
@Override
public void run() {
try {
- result = FlinkKafkaConsumer08.getPartitionsForTopic(topics, properties);
+ partitionDiscoverer.open();
+ result = partitionDiscoverer.getPartitionLeadersForTopics(topics);
}
catch (Throwable t) {
this.error = t;
+ } finally {
+ try {
+ partitionDiscoverer.close();
+ } catch (Exception e) {
+ throw new RuntimeException("Error while closing partition discoverer.", e);
+ }
}
}
@@ -57,9 +63,11 @@ class PartitionInfoFetcher extends Thread {
if (error != null) {
throw new Exception("Failed to fetch partitions for topics " + topics.toString(), error);
}
+
if (result != null) {
return result;
}
+
throw new Exception("Partition fetching failed");
}
}