You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/11/02 10:00:01 UTC
[03/11] flink git commit: [FLINK-7732][kafka-consumer] Do not commit
to kafka Flink's sentinel offsets
[FLINK-7732][kafka-consumer] Do not commit to kafka Flink's sentinel offsets
This closes #4928.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c61d1860
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c61d1860
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c61d1860
Branch: refs/heads/master
Commit: c61d18605ace57adbe84a2b05a50308043452398
Parents: b7d3589
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Tue Oct 31 15:38:32 2017 +0100
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Thu Nov 2 12:33:55 2017 +0800
----------------------------------------------------------------------
.../kafka/internals/Kafka08Fetcher.java | 2 +-
.../kafka/internal/Kafka09Fetcher.java | 6 ++-
.../kafka/internals/AbstractFetcher.java | 18 +++++++-
.../KafkaTopicPartitionStateSentinel.java | 3 ++
.../kafka/internals/AbstractFetcherTest.java | 48 +++++++++++++++++++-
5 files changed, 72 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c61d1860/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
index 7359e91..8bcd663 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
@@ -348,7 +348,7 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
// ------------------------------------------------------------------------
@Override
- public void commitInternalOffsetsToKafka(
+ protected void doCommitInternalOffsetsToKafka(
Map<KafkaTopicPartition, Long> offsets,
@Nonnull KafkaCommitCallback commitCallback) throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/c61d1860/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
index cef70fe..51f69cd 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
@@ -44,6 +44,8 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
+import static org.apache.flink.util.Preconditions.checkState;
+
/**
* A fetcher that fetches data from Kafka brokers via the Kafka 0.9 consumer API.
*
@@ -212,7 +214,7 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> {
}
@Override
- public void commitInternalOffsetsToKafka(
+ protected void doCommitInternalOffsetsToKafka(
Map<KafkaTopicPartition, Long> offsets,
@Nonnull KafkaCommitCallback commitCallback) throws Exception {
@@ -224,6 +226,8 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> {
for (KafkaTopicPartitionState<TopicPartition> partition : partitions) {
Long lastProcessedOffset = offsets.get(partition.getKafkaTopicPartition());
if (lastProcessedOffset != null) {
+ checkState(lastProcessedOffset >= 0, "Illegal offset value to commit");
+
// committed offsets through the KafkaConsumer need to be 1 more than the last processed offset.
// This does not affect Flink's checkpoints/saved state.
long offsetToCommit = lastProcessedOffset + 1;
http://git-wip-us.apache.org/repos/asf/flink/blob/c61d1860/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index 11f97b2..a128174 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -36,6 +36,7 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -242,10 +243,25 @@ public abstract class AbstractFetcher<T, KPH> {
* @param commitCallback The callback that the user should trigger when a commit request completes or fails.
* @throws Exception This method forwards exceptions.
*/
- public abstract void commitInternalOffsetsToKafka(
+ public final void commitInternalOffsetsToKafka(
+ Map<KafkaTopicPartition, Long> offsets,
+ @Nonnull KafkaCommitCallback commitCallback) throws Exception {
+ // Ignore sentinels. They might appear here if snapshot has started before actual offsets values
+ // replaced sentinels
+ doCommitInternalOffsetsToKafka(filterOutSentinels(offsets), commitCallback);
+ }
+
+ protected abstract void doCommitInternalOffsetsToKafka(
Map<KafkaTopicPartition, Long> offsets,
@Nonnull KafkaCommitCallback commitCallback) throws Exception;
+ private Map<KafkaTopicPartition, Long> filterOutSentinels(Map<KafkaTopicPartition, Long> offsets) {
+ return offsets.entrySet()
+ .stream()
+ .filter(entry -> !KafkaTopicPartitionStateSentinel.isSentinel(entry.getValue()))
+ .collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue()));
+ }
+
/**
* Creates the Kafka version specific representation of the given
* topic partition.
http://git-wip-us.apache.org/repos/asf/flink/blob/c61d1860/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java
index c218618..3857991 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java
@@ -52,4 +52,7 @@ public class KafkaTopicPartitionStateSentinel {
*/
public static final long GROUP_OFFSET = -915623761773L;
+ public static boolean isSentinel(long offset) {
+ return offset < 0;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c61d1860/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
index 1063102..46894a1 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
@@ -32,8 +32,10 @@ import org.junit.Test;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -46,6 +48,42 @@ import static org.mockito.Mockito.mock;
@SuppressWarnings("serial")
public class AbstractFetcherTest {
+ @Test
+ public void testIgnorePartitionStateSentinelInSnapshot() throws Exception {
+ final String testTopic = "test topic name";
+ Map<KafkaTopicPartition, Long> originalPartitions = new HashMap<>();
+ originalPartitions.put(new KafkaTopicPartition(testTopic, 1), KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
+ originalPartitions.put(new KafkaTopicPartition(testTopic, 2), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
+ originalPartitions.put(new KafkaTopicPartition(testTopic, 3), KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
+
+ TestSourceContext<Long> sourceContext = new TestSourceContext<>();
+
+ TestFetcher<Long> fetcher = new TestFetcher<>(
+ sourceContext,
+ originalPartitions,
+ null,
+ null,
+ mock(TestProcessingTimeService.class),
+ 0);
+
+ synchronized (sourceContext.getCheckpointLock()) {
+ HashMap<KafkaTopicPartition, Long> currentState = fetcher.snapshotCurrentState();
+ fetcher.commitInternalOffsetsToKafka(currentState, new KafkaCommitCallback() {
+ @Override
+ public void onSuccess() {
+ }
+
+ @Override
+ public void onException(Throwable cause) {
+ throw new RuntimeException("Callback failed", cause);
+ }
+ });
+
+ assertTrue(fetcher.getLastCommittedOffsets().isPresent());
+ assertEquals(Collections.emptyMap(), fetcher.getLastCommittedOffsets().get());
+ }
+ }
+
// ------------------------------------------------------------------------
// Record emitting tests
// ------------------------------------------------------------------------
@@ -327,6 +365,7 @@ public class AbstractFetcherTest {
// ------------------------------------------------------------------------
private static final class TestFetcher<T> extends AbstractFetcher<T, Object> {
+ protected Optional<Map<KafkaTopicPartition, Long>> lastCommittedOffsets = Optional.empty();
protected TestFetcher(
SourceContext<T> sourceContext,
@@ -362,10 +401,15 @@ public class AbstractFetcherTest {
}
@Override
- public void commitInternalOffsetsToKafka(
+ protected void doCommitInternalOffsetsToKafka(
Map<KafkaTopicPartition, Long> offsets,
@Nonnull KafkaCommitCallback callback) throws Exception {
- throw new UnsupportedOperationException();
+ lastCommittedOffsets = Optional.of(offsets);
+ callback.onSuccess();
+ }
+
+ public Optional<Map<KafkaTopicPartition, Long>> getLastCommittedOffsets() {
+ return lastCommittedOffsets;
}
}