You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/09/30 14:47:16 UTC

flink git commit: [FLINK-4702] [kafka connector] Commit offsets to Kafka asynchronously and don't block on polls

Repository: flink
Updated Branches:
  refs/heads/release-1.1 caa0fbb21 -> 90d77594f


[FLINK-4702] [kafka connector] Commit offsets to Kafka asynchronously and don't block on polls

Letting the Kafka commit block on polls means that 'notifyCheckpointComplete()' may take
very long. This is mostly relevant for low-throughput Kafka topics.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/90d77594
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/90d77594
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/90d77594

Branch: refs/heads/release-1.1
Commit: 90d77594fffda1d8d15658d363c478ea6430514e
Parents: caa0fbb
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Sep 29 18:09:51 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Sep 30 12:39:53 2016 +0200

----------------------------------------------------------------------
 .../kafka/internal/Kafka09Fetcher.java          |  73 +++--
 .../connectors/kafka/Kafka09FetcherTest.java    | 304 +++++++++++++++++++
 2 files changed, 355 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/90d77594/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
index 9c861c9..1da2259 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
@@ -37,6 +37,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
@@ -50,6 +51,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * A fetcher that fetches data from Kafka brokers via the Kafka 0.9 consumer API.
@@ -74,18 +76,24 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem
 	/** The maximum number of milliseconds to wait for a fetch batch */
 	private final long pollTimeout;
 
-	/** Mutex to guard against concurrent access to the non-threadsafe Kafka consumer */
-	private final Object consumerLock = new Object();
+	/** The next offsets that the main thread should commit */
+	private final AtomicReference<Map<TopicPartition, OffsetAndMetadata>> nextOffsetsToCommit;
+	
+	/** The callback invoked by Kafka once an offset commit is complete */
+	private final OffsetCommitCallback offsetCommitCallback;
 
 	/** Reference to the Kafka consumer, once it is created */
 	private volatile KafkaConsumer<byte[], byte[]> consumer;
-
+	
 	/** Reference to the proxy, forwarding exceptions from the fetch thread to the main thread */
 	private volatile ExceptionProxy errorHandler;
 
 	/** Flag to mark the main work loop as alive */
 	private volatile boolean running = true;
 
+	/** Flag tracking whether the latest commit request has completed */
+	private volatile boolean commitInProgress;
+
 	// ------------------------------------------------------------------------
 
 	public Kafka09Fetcher(
@@ -105,6 +113,8 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem
 		this.runtimeContext = runtimeContext;
 		this.kafkaProperties = kafkaProperties;
 		this.pollTimeout = pollTimeout;
+		this.nextOffsetsToCommit = new AtomicReference<>();
+		this.offsetCommitCallback = new CommitCallback();
 
 		// if checkpointing is enabled, we are not automatically committing to Kafka.
 		kafkaProperties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
@@ -203,19 +213,23 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem
 
 			// main fetch loop
 			while (running) {
+
+				// check if there is something to commit
+				final Map<TopicPartition, OffsetAndMetadata> toCommit = nextOffsetsToCommit.getAndSet(null);
+				if (toCommit != null && !commitInProgress) {
+					// reset the work-to-be committed, so we don't repeatedly commit the same
+					// also record that a commit is already in progress
+					commitInProgress = true;
+					consumer.commitAsync(toCommit, offsetCommitCallback);
+				}
+
 				// get the next batch of records
 				final ConsumerRecords<byte[], byte[]> records;
-				synchronized (consumerLock) {
-					try {
-						records = consumer.poll(pollTimeout);
-					}
-					catch (WakeupException we) {
-						if (running) {
-							throw we;
-						} else {
-							continue;
-						}
-					}
+				try {
+					records = consumer.poll(pollTimeout);
+				}
+				catch (WakeupException we) {
+					continue;
 				}
 
 				// get the records for each topic partition
@@ -252,10 +266,9 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem
 		}
 		finally {
 			try {
-				synchronized (consumerLock) {
-					consumer.close();
-				}
-			} catch (Throwable t) {
+				consumer.close();
+			}
+			catch (Throwable t) {
 				LOG.warn("Error while closing Kafka 0.9 consumer", t);
 			}
 		}
@@ -283,10 +296,14 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem
 			}
 		}
 
-		if (this.consumer != null) {
-			synchronized (consumerLock) {
-				this.consumer.commitSync(offsetsToCommit);
-			}
+		// record the work to be committed by the main consumer thread and make sure the consumer notices that
+		if (nextOffsetsToCommit.getAndSet(offsetsToCommit) != null) {
+			LOG.warn("Committing offsets to Kafka takes longer than the checkpoint interval. " +
+					"Skipping commit of previous offsets because newer complete checkpoint offsets are available. " +
+					"This does not compromise Flink's checkpoint integrity.");
+		}
+		if (consumer != null) {
+			consumer.wakeup();
 		}
 	}
 
@@ -301,4 +318,16 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem
 		}
 		return result;
 	}
+
+	private class CommitCallback implements OffsetCommitCallback {
+
+		@Override
+		public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception ex) {
+			commitInProgress = false;
+
+			if (ex != null) {
+				LOG.warn("Committing offsets to Kafka failed. This does not compromise Flink's checkpoints.", ex);
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/90d77594/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
new file mode 100644
index 0000000..4fd6c9f
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.core.testutils.MultiShotLatch;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.TopicPartition;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyLong;
+import static org.powermock.api.mockito.PowerMockito.doAnswer;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+/**
+ * Unit tests for the {@link Kafka09Fetcher}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(Kafka09Fetcher.class)
+public class Kafka09FetcherTest {
+
+	@Test
+	public void testCommitDoesNotBlock() throws Exception {
+
+		// test data
+		final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42);
+		final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>();
+		testCommitData.put(testPartition, 11L);
+
+		// to synchronize when the consumer is in its blocking method
+		final OneShotLatch sync = new OneShotLatch();
+
+		// ----- the mock consumer with blocking poll calls ----
+		final MultiShotLatch blockerLatch = new MultiShotLatch();
+		
+		KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
+		when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
+			
+			@Override
+			public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException {
+				sync.trigger();
+				blockerLatch.await();
+				return ConsumerRecords.empty();
+			}
+		});
+
+		doAnswer(new Answer<Void>() {
+			@Override
+			public Void answer(InvocationOnMock invocation) {
+				blockerLatch.trigger();
+				return null;
+			}
+		}).when(mockConsumer).wakeup();
+
+		// make sure the fetcher creates the mock consumer
+		whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+		// ----- create the test fetcher -----
+
+		@SuppressWarnings("unchecked")
+		SourceContext<String> sourceContext = mock(SourceContext.class);
+		List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42));
+		KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
+		StreamingRuntimeContext context = mock(StreamingRuntimeContext.class);
+		
+		final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
+				sourceContext, topics, null, null, context, schema, new Properties(), 0L, false);
+
+		// ----- run the fetcher -----
+
+		final AtomicReference<Throwable> error = new AtomicReference<>();
+		final Thread fetcherRunner = new Thread("fetcher runner") {
+
+			@Override
+			public void run() {
+				try {
+					fetcher.runFetchLoop();
+				} catch (Throwable t) {
+					error.set(t);
+				}
+			}
+		};
+		fetcherRunner.start();
+
+		// wait until the fetcher has reached the method of interest
+		sync.await();
+
+		// ----- trigger the offset commit -----
+		
+		final AtomicReference<Throwable> commitError = new AtomicReference<>();
+		final Thread committer = new Thread("committer runner") {
+			@Override
+			public void run() {
+				try {
+					fetcher.commitSpecificOffsetsToKafka(testCommitData);
+				} catch (Throwable t) {
+					commitError.set(t);
+				}
+			}
+		};
+		committer.start();
+
+		// ----- ensure that the committer finishes in time  -----
+		committer.join(30000);
+		assertFalse("The committer did not finish in time", committer.isAlive());
+
+		// ----- test done, wait till the fetcher is done for a clean shutdown -----
+		fetcher.cancel();
+		fetcherRunner.join();
+
+		// check that there were no errors in the fetcher
+		final Throwable fetcherError = error.get();
+		if (fetcherError != null) {
+			throw new Exception("Exception in the fetcher", fetcherError);
+		}
+		final Throwable committerError = commitError.get();
+		if (committerError != null) {
+			throw new Exception("Exception in the committer", committerError);
+		}
+	}
+
+	@Test
+	public void ensureOffsetsGetCommitted() throws Exception {
+		
+		// test data
+		final KafkaTopicPartition testPartition1 = new KafkaTopicPartition("test", 42);
+		final KafkaTopicPartition testPartition2 = new KafkaTopicPartition("another", 99);
+		
+		final Map<KafkaTopicPartition, Long> testCommitData1 = new HashMap<>();
+		testCommitData1.put(testPartition1, 11L);
+		testCommitData1.put(testPartition2, 18L);
+
+		final Map<KafkaTopicPartition, Long> testCommitData2 = new HashMap<>();
+		testCommitData2.put(testPartition1, 19L);
+		testCommitData2.put(testPartition2, 28L);
+
+		final BlockingQueue<Map<TopicPartition, OffsetAndMetadata>> commitStore = new LinkedBlockingQueue<>();
+
+
+		// ----- the mock consumer with poll(), wakeup(), and commit(A)sync calls ----
+
+		final MultiShotLatch blockerLatch = new MultiShotLatch();
+
+		KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
+
+		when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
+			@Override
+			public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException {
+				blockerLatch.await();
+				return ConsumerRecords.empty();
+			}
+		});
+
+		doAnswer(new Answer<Void>() {
+			@Override
+			public Void answer(InvocationOnMock invocation) {
+				blockerLatch.trigger();
+				return null;
+			}
+		}).when(mockConsumer).wakeup();
+
+		doAnswer(new Answer<Void>() {
+			@Override
+			public Void answer(InvocationOnMock invocation) {
+				@SuppressWarnings("unchecked")
+				Map<TopicPartition, OffsetAndMetadata> offsets = 
+						(Map<TopicPartition, OffsetAndMetadata>) invocation.getArguments()[0];
+
+				OffsetCommitCallback callback = (OffsetCommitCallback) invocation.getArguments()[1];
+
+				commitStore.add(offsets);
+				callback.onComplete(offsets, null);
+
+				return null; 
+			}
+		}).when(mockConsumer).commitAsync(
+				Mockito.<Map<TopicPartition, OffsetAndMetadata>>any(), any(OffsetCommitCallback.class));
+
+		// make sure the fetcher creates the mock consumer
+		whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+		// ----- create the test fetcher -----
+
+		@SuppressWarnings("unchecked")
+		SourceContext<String> sourceContext = mock(SourceContext.class);
+		List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42));
+		KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
+		StreamingRuntimeContext context = mock(StreamingRuntimeContext.class);
+
+		final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
+				sourceContext, topics, null, null, context, schema, new Properties(), 0L, false);
+
+		// ----- run the fetcher -----
+
+		final AtomicReference<Throwable> error = new AtomicReference<>();
+		final Thread fetcherRunner = new Thread("fetcher runner") {
+
+			@Override
+			public void run() {
+				try {
+					fetcher.runFetchLoop();
+				} catch (Throwable t) {
+					error.set(t);
+				}
+			}
+		};
+		fetcherRunner.start();
+
+		// ----- trigger the first offset commit -----
+
+		fetcher.commitSpecificOffsetsToKafka(testCommitData1);
+		Map<TopicPartition, OffsetAndMetadata> result1 = commitStore.take();
+
+		for (Entry<TopicPartition, OffsetAndMetadata> entry : result1.entrySet()) {
+			TopicPartition partition = entry.getKey();
+			if (partition.topic().equals("test")) {
+				assertEquals(42, partition.partition());
+				assertEquals(11L, entry.getValue().offset());
+			}
+			else if (partition.topic().equals("another")) {
+				assertEquals(99, partition.partition());
+				assertEquals(18L, entry.getValue().offset());
+			}
+		}
+
+		// ----- trigger the second offset commit -----
+
+		fetcher.commitSpecificOffsetsToKafka(testCommitData2);
+		Map<TopicPartition, OffsetAndMetadata> result2 = commitStore.take();
+
+		for (Entry<TopicPartition, OffsetAndMetadata> entry : result2.entrySet()) {
+			TopicPartition partition = entry.getKey();
+			if (partition.topic().equals("test")) {
+				assertEquals(42, partition.partition());
+				assertEquals(19L, entry.getValue().offset());
+			}
+			else if (partition.topic().equals("another")) {
+				assertEquals(99, partition.partition());
+				assertEquals(28L, entry.getValue().offset());
+			}
+		}
+		
+		// ----- test done, wait till the fetcher is done for a clean shutdown -----
+		fetcher.cancel();
+		fetcherRunner.join();
+
+		// check that there were no errors in the fetcher
+		final Throwable caughtError = error.get();
+		if (caughtError != null) {
+			throw new Exception("Exception in the fetcher", caughtError);
+		}
+	}
+}