You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/11/02 10:00:01 UTC

[03/11] flink git commit: [FLINK-7732][kafka-consumer] Do not commit to kafka Flink's sentinel offsets

[FLINK-7732][kafka-consumer] Do not commit to kafka Flink's sentinel offsets

This closes #4928.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c61d1860
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c61d1860
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c61d1860

Branch: refs/heads/master
Commit: c61d18605ace57adbe84a2b05a50308043452398
Parents: b7d3589
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Tue Oct 31 15:38:32 2017 +0100
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Thu Nov 2 12:33:55 2017 +0800

----------------------------------------------------------------------
 .../kafka/internals/Kafka08Fetcher.java         |  2 +-
 .../kafka/internal/Kafka09Fetcher.java          |  6 ++-
 .../kafka/internals/AbstractFetcher.java        | 18 +++++++-
 .../KafkaTopicPartitionStateSentinel.java       |  3 ++
 .../kafka/internals/AbstractFetcherTest.java    | 48 +++++++++++++++++++-
 5 files changed, 72 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c61d1860/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
index 7359e91..8bcd663 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
@@ -348,7 +348,7 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
 	// ------------------------------------------------------------------------
 
 	@Override
-	public void commitInternalOffsetsToKafka(
+	protected void doCommitInternalOffsetsToKafka(
 			Map<KafkaTopicPartition, Long> offsets,
 			@Nonnull KafkaCommitCallback commitCallback) throws Exception {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c61d1860/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
index cef70fe..51f69cd 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
@@ -44,6 +44,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
+import static org.apache.flink.util.Preconditions.checkState;
+
 /**
  * A fetcher that fetches data from Kafka brokers via the Kafka 0.9 consumer API.
  *
@@ -212,7 +214,7 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> {
 	}
 
 	@Override
-	public void commitInternalOffsetsToKafka(
+	protected void doCommitInternalOffsetsToKafka(
 			Map<KafkaTopicPartition, Long> offsets,
 			@Nonnull KafkaCommitCallback commitCallback) throws Exception {
 
@@ -224,6 +226,8 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> {
 		for (KafkaTopicPartitionState<TopicPartition> partition : partitions) {
 			Long lastProcessedOffset = offsets.get(partition.getKafkaTopicPartition());
 			if (lastProcessedOffset != null) {
+				checkState(lastProcessedOffset >= 0, "Illegal offset value to commit");
+
 				// committed offsets through the KafkaConsumer need to be 1 more than the last processed offset.
 				// This does not affect Flink's checkpoints/saved state.
 				long offsetToCommit = lastProcessedOffset + 1;

http://git-wip-us.apache.org/repos/asf/flink/blob/c61d1860/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index 11f97b2..a128174 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -36,6 +36,7 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -242,10 +243,25 @@ public abstract class AbstractFetcher<T, KPH> {
 	 * @param commitCallback The callback that the user should trigger when a commit request completes or fails.
 	 * @throws Exception This method forwards exceptions.
 	 */
-	public abstract void commitInternalOffsetsToKafka(
+	public final void commitInternalOffsetsToKafka(
+			Map<KafkaTopicPartition, Long> offsets,
+			@Nonnull KafkaCommitCallback commitCallback) throws Exception {
+		// Ignore sentinels. They might appear here if snapshot has started before actual offsets values
+		// replaced sentinels
+		doCommitInternalOffsetsToKafka(filterOutSentinels(offsets), commitCallback);
+	}
+
+	protected abstract void doCommitInternalOffsetsToKafka(
 			Map<KafkaTopicPartition, Long> offsets,
 			@Nonnull KafkaCommitCallback commitCallback) throws Exception;
 
+	private Map<KafkaTopicPartition, Long> filterOutSentinels(Map<KafkaTopicPartition, Long> offsets) {
+		return offsets.entrySet()
+			.stream()
+			.filter(entry -> !KafkaTopicPartitionStateSentinel.isSentinel(entry.getValue()))
+			.collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue()));
+	}
+
 	/**
 	 * Creates the Kafka version specific representation of the given
 	 * topic partition.

http://git-wip-us.apache.org/repos/asf/flink/blob/c61d1860/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java
index c218618..3857991 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java
@@ -52,4 +52,7 @@ public class KafkaTopicPartitionStateSentinel {
 	 */
 	public static final long GROUP_OFFSET = -915623761773L;
 
+	public static boolean isSentinel(long offset) {
+		return offset < 0;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c61d1860/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
index 1063102..46894a1 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
@@ -32,8 +32,10 @@ import org.junit.Test;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -46,6 +48,42 @@ import static org.mockito.Mockito.mock;
 @SuppressWarnings("serial")
 public class AbstractFetcherTest {
 
+	@Test
+	public void testIgnorePartitionStateSentinelInSnapshot() throws Exception {
+		final String testTopic = "test topic name";
+		Map<KafkaTopicPartition, Long> originalPartitions = new HashMap<>();
+		originalPartitions.put(new KafkaTopicPartition(testTopic, 1), KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
+		originalPartitions.put(new KafkaTopicPartition(testTopic, 2), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
+		originalPartitions.put(new KafkaTopicPartition(testTopic, 3), KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
+
+		TestSourceContext<Long> sourceContext = new TestSourceContext<>();
+
+		TestFetcher<Long> fetcher = new TestFetcher<>(
+			sourceContext,
+			originalPartitions,
+			null,
+			null,
+			mock(TestProcessingTimeService.class),
+			0);
+
+		synchronized (sourceContext.getCheckpointLock()) {
+			HashMap<KafkaTopicPartition, Long> currentState = fetcher.snapshotCurrentState();
+			fetcher.commitInternalOffsetsToKafka(currentState, new KafkaCommitCallback() {
+				@Override
+				public void onSuccess() {
+				}
+
+				@Override
+				public void onException(Throwable cause) {
+					throw new RuntimeException("Callback failed", cause);
+				}
+			});
+
+			assertTrue(fetcher.getLastCommittedOffsets().isPresent());
+			assertEquals(Collections.emptyMap(), fetcher.getLastCommittedOffsets().get());
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	//   Record emitting tests
 	// ------------------------------------------------------------------------
@@ -327,6 +365,7 @@ public class AbstractFetcherTest {
 	// ------------------------------------------------------------------------
 
 	private static final class TestFetcher<T> extends AbstractFetcher<T, Object> {
+		protected Optional<Map<KafkaTopicPartition, Long>> lastCommittedOffsets = Optional.empty();
 
 		protected TestFetcher(
 				SourceContext<T> sourceContext,
@@ -362,10 +401,15 @@ public class AbstractFetcherTest {
 		}
 
 		@Override
-		public void commitInternalOffsetsToKafka(
+		protected void doCommitInternalOffsetsToKafka(
 				Map<KafkaTopicPartition, Long> offsets,
 				@Nonnull KafkaCommitCallback callback) throws Exception {
-			throw new UnsupportedOperationException();
+			lastCommittedOffsets = Optional.of(offsets);
+			callback.onSuccess();
+		}
+
+		public Optional<Map<KafkaTopicPartition, Long>> getLastCommittedOffsets() {
+			return lastCommittedOffsets;
 		}
 	}