You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/09/30 14:47:16 UTC
flink git commit: [FLINK-4702] [kafka connector] Commit offsets to
Kafka asynchronously and don't block on polls
Repository: flink
Updated Branches:
refs/heads/release-1.1 caa0fbb21 -> 90d77594f
[FLINK-4702] [kafka connector] Commit offsets to Kafka asynchronously and don't block on polls
Letting the Kafka commit block on polls means that 'notifyCheckpointComplete()' may take
very long. This is mostly relevant for low-throughput Kafka topics.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/90d77594
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/90d77594
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/90d77594
Branch: refs/heads/release-1.1
Commit: 90d77594fffda1d8d15658d363c478ea6430514e
Parents: caa0fbb
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Sep 29 18:09:51 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Sep 30 12:39:53 2016 +0200
----------------------------------------------------------------------
.../kafka/internal/Kafka09Fetcher.java | 73 +++--
.../connectors/kafka/Kafka09FetcherTest.java | 304 +++++++++++++++++++
2 files changed, 355 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/90d77594/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
index 9c861c9..1da2259 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
@@ -37,6 +37,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
@@ -50,6 +51,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
/**
* A fetcher that fetches data from Kafka brokers via the Kafka 0.9 consumer API.
@@ -74,18 +76,24 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem
/** The maximum number of milliseconds to wait for a fetch batch */
private final long pollTimeout;
- /** Mutex to guard against concurrent access to the non-threadsafe Kafka consumer */
- private final Object consumerLock = new Object();
+ /** The next offsets that the main thread should commit */
+ private final AtomicReference<Map<TopicPartition, OffsetAndMetadata>> nextOffsetsToCommit;
+
+ /** The callback invoked by Kafka once an offset commit is complete */
+ private final OffsetCommitCallback offsetCommitCallback;
/** Reference to the Kafka consumer, once it is created */
private volatile KafkaConsumer<byte[], byte[]> consumer;
-
+
/** Reference to the proxy, forwarding exceptions from the fetch thread to the main thread */
private volatile ExceptionProxy errorHandler;
/** Flag to mark the main work loop as alive */
private volatile boolean running = true;
+ /** Flag tracking whether the latest commit request has completed */
+ private volatile boolean commitInProgress;
+
// ------------------------------------------------------------------------
public Kafka09Fetcher(
@@ -105,6 +113,8 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem
this.runtimeContext = runtimeContext;
this.kafkaProperties = kafkaProperties;
this.pollTimeout = pollTimeout;
+ this.nextOffsetsToCommit = new AtomicReference<>();
+ this.offsetCommitCallback = new CommitCallback();
// if checkpointing is enabled, we are not automatically committing to Kafka.
kafkaProperties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
@@ -203,19 +213,23 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem
// main fetch loop
while (running) {
+
+ // check if there is something to commit
+ final Map<TopicPartition, OffsetAndMetadata> toCommit = nextOffsetsToCommit.getAndSet(null);
+ if (toCommit != null && !commitInProgress) {
+ // reset the work-to-be committed, so we don't repeatedly commit the same
+ // also record that a commit is already in progress
+ commitInProgress = true;
+ consumer.commitAsync(toCommit, offsetCommitCallback);
+ }
+
// get the next batch of records
final ConsumerRecords<byte[], byte[]> records;
- synchronized (consumerLock) {
- try {
- records = consumer.poll(pollTimeout);
- }
- catch (WakeupException we) {
- if (running) {
- throw we;
- } else {
- continue;
- }
- }
+ try {
+ records = consumer.poll(pollTimeout);
+ }
+ catch (WakeupException we) {
+ continue;
}
// get the records for each topic partition
@@ -252,10 +266,9 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem
}
finally {
try {
- synchronized (consumerLock) {
- consumer.close();
- }
- } catch (Throwable t) {
+ consumer.close();
+ }
+ catch (Throwable t) {
LOG.warn("Error while closing Kafka 0.9 consumer", t);
}
}
@@ -283,10 +296,14 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem
}
}
- if (this.consumer != null) {
- synchronized (consumerLock) {
- this.consumer.commitSync(offsetsToCommit);
- }
+ // record the work to be committed by the main consumer thread and make sure the consumer notices that
+ if (nextOffsetsToCommit.getAndSet(offsetsToCommit) != null) {
+ LOG.warn("Committing offsets to Kafka takes longer than the checkpoint interval. " +
+ "Skipping commit of previous offsets because newer complete checkpoint offsets are available. " +
+ "This does not compromise Flink's checkpoint integrity.");
+ }
+ if (consumer != null) {
+ consumer.wakeup();
}
}
@@ -301,4 +318,16 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem
}
return result;
}
+
+ private class CommitCallback implements OffsetCommitCallback {
+
+ @Override
+ public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception ex) {
+ commitInProgress = false;
+
+ if (ex != null) {
+ LOG.warn("Committing offsets to Kafka failed. This does not compromise Flink's checkpoints.", ex);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/90d77594/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
new file mode 100644
index 0000000..4fd6c9f
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.core.testutils.MultiShotLatch;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.TopicPartition;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyLong;
+import static org.powermock.api.mockito.PowerMockito.doAnswer;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+/**
+ * Unit tests for the {@link Kafka09Fetcher}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(Kafka09Fetcher.class)
+public class Kafka09FetcherTest {
+
+ @Test
+ public void testCommitDoesNotBlock() throws Exception {
+
+ // test data
+ final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42);
+ final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>();
+ testCommitData.put(testPartition, 11L);
+
+ // to synchronize when the consumer is in its blocking method
+ final OneShotLatch sync = new OneShotLatch();
+
+ // ----- the mock consumer with blocking poll calls ----
+ final MultiShotLatch blockerLatch = new MultiShotLatch();
+
+ KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
+ when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
+
+ @Override
+ public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException {
+ sync.trigger();
+ blockerLatch.await();
+ return ConsumerRecords.empty();
+ }
+ });
+
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) {
+ blockerLatch.trigger();
+ return null;
+ }
+ }).when(mockConsumer).wakeup();
+
+ // make sure the fetcher creates the mock consumer
+ whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+ // ----- create the test fetcher -----
+
+ @SuppressWarnings("unchecked")
+ SourceContext<String> sourceContext = mock(SourceContext.class);
+ List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42));
+ KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
+ StreamingRuntimeContext context = mock(StreamingRuntimeContext.class);
+
+ final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
+ sourceContext, topics, null, null, context, schema, new Properties(), 0L, false);
+
+ // ----- run the fetcher -----
+
+ final AtomicReference<Throwable> error = new AtomicReference<>();
+ final Thread fetcherRunner = new Thread("fetcher runner") {
+
+ @Override
+ public void run() {
+ try {
+ fetcher.runFetchLoop();
+ } catch (Throwable t) {
+ error.set(t);
+ }
+ }
+ };
+ fetcherRunner.start();
+
+ // wait until the fetcher has reached the method of interest
+ sync.await();
+
+ // ----- trigger the offset commit -----
+
+ final AtomicReference<Throwable> commitError = new AtomicReference<>();
+ final Thread committer = new Thread("committer runner") {
+ @Override
+ public void run() {
+ try {
+ fetcher.commitSpecificOffsetsToKafka(testCommitData);
+ } catch (Throwable t) {
+ commitError.set(t);
+ }
+ }
+ };
+ committer.start();
+
+ // ----- ensure that the committer finishes in time -----
+ committer.join(30000);
+ assertFalse("The committer did not finish in time", committer.isAlive());
+
+ // ----- test done, wait till the fetcher is done for a clean shutdown -----
+ fetcher.cancel();
+ fetcherRunner.join();
+
+ // check that there were no errors in the fetcher
+ final Throwable fetcherError = error.get();
+ if (fetcherError != null) {
+ throw new Exception("Exception in the fetcher", fetcherError);
+ }
+ final Throwable committerError = commitError.get();
+ if (committerError != null) {
+ throw new Exception("Exception in the committer", committerError);
+ }
+ }
+
+ @Test
+ public void ensureOffsetsGetCommitted() throws Exception {
+
+ // test data
+ final KafkaTopicPartition testPartition1 = new KafkaTopicPartition("test", 42);
+ final KafkaTopicPartition testPartition2 = new KafkaTopicPartition("another", 99);
+
+ final Map<KafkaTopicPartition, Long> testCommitData1 = new HashMap<>();
+ testCommitData1.put(testPartition1, 11L);
+ testCommitData1.put(testPartition2, 18L);
+
+ final Map<KafkaTopicPartition, Long> testCommitData2 = new HashMap<>();
+ testCommitData2.put(testPartition1, 19L);
+ testCommitData2.put(testPartition2, 28L);
+
+ final BlockingQueue<Map<TopicPartition, OffsetAndMetadata>> commitStore = new LinkedBlockingQueue<>();
+
+
+ // ----- the mock consumer with poll(), wakeup(), and commit(A)sync calls ----
+
+ final MultiShotLatch blockerLatch = new MultiShotLatch();
+
+ KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
+
+ when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
+ @Override
+ public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException {
+ blockerLatch.await();
+ return ConsumerRecords.empty();
+ }
+ });
+
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) {
+ blockerLatch.trigger();
+ return null;
+ }
+ }).when(mockConsumer).wakeup();
+
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) {
+ @SuppressWarnings("unchecked")
+ Map<TopicPartition, OffsetAndMetadata> offsets =
+ (Map<TopicPartition, OffsetAndMetadata>) invocation.getArguments()[0];
+
+ OffsetCommitCallback callback = (OffsetCommitCallback) invocation.getArguments()[1];
+
+ commitStore.add(offsets);
+ callback.onComplete(offsets, null);
+
+ return null;
+ }
+ }).when(mockConsumer).commitAsync(
+ Mockito.<Map<TopicPartition, OffsetAndMetadata>>any(), any(OffsetCommitCallback.class));
+
+ // make sure the fetcher creates the mock consumer
+ whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+ // ----- create the test fetcher -----
+
+ @SuppressWarnings("unchecked")
+ SourceContext<String> sourceContext = mock(SourceContext.class);
+ List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42));
+ KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
+ StreamingRuntimeContext context = mock(StreamingRuntimeContext.class);
+
+ final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
+ sourceContext, topics, null, null, context, schema, new Properties(), 0L, false);
+
+ // ----- run the fetcher -----
+
+ final AtomicReference<Throwable> error = new AtomicReference<>();
+ final Thread fetcherRunner = new Thread("fetcher runner") {
+
+ @Override
+ public void run() {
+ try {
+ fetcher.runFetchLoop();
+ } catch (Throwable t) {
+ error.set(t);
+ }
+ }
+ };
+ fetcherRunner.start();
+
+ // ----- trigger the first offset commit -----
+
+ fetcher.commitSpecificOffsetsToKafka(testCommitData1);
+ Map<TopicPartition, OffsetAndMetadata> result1 = commitStore.take();
+
+ for (Entry<TopicPartition, OffsetAndMetadata> entry : result1.entrySet()) {
+ TopicPartition partition = entry.getKey();
+ if (partition.topic().equals("test")) {
+ assertEquals(42, partition.partition());
+ assertEquals(11L, entry.getValue().offset());
+ }
+ else if (partition.topic().equals("another")) {
+ assertEquals(99, partition.partition());
+ assertEquals(18L, entry.getValue().offset());
+ }
+ }
+
+ // ----- trigger the second offset commit -----
+
+ fetcher.commitSpecificOffsetsToKafka(testCommitData2);
+ Map<TopicPartition, OffsetAndMetadata> result2 = commitStore.take();
+
+ for (Entry<TopicPartition, OffsetAndMetadata> entry : result2.entrySet()) {
+ TopicPartition partition = entry.getKey();
+ if (partition.topic().equals("test")) {
+ assertEquals(42, partition.partition());
+ assertEquals(19L, entry.getValue().offset());
+ }
+ else if (partition.topic().equals("another")) {
+ assertEquals(99, partition.partition());
+ assertEquals(28L, entry.getValue().offset());
+ }
+ }
+
+ // ----- test done, wait till the fetcher is done for a clean shutdown -----
+ fetcher.cancel();
+ fetcherRunner.join();
+
+ // check that there were no errors in the fetcher
+ final Throwable caughtError = error.get();
+ if (caughtError != null) {
+ throw new Exception("Exception in the fetcher", caughtError);
+ }
+ }
+}