You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/05/15 07:58:05 UTC
flink git commit: [FLINK-6520] [kafka] Overwrite auto commit props
for ON_CHECKPOINTS / DISABLED commit mode
Repository: flink
Updated Branches:
refs/heads/master 44fb035e0 -> 6fdec897d
[FLINK-6520] [kafka] Overwrite auto commit props for ON_CHECKPOINTS / DISABLED commit mode
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6fdec897
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6fdec897
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6fdec897
Branch: refs/heads/master
Commit: 6fdec897d4b247975587895fa2186e37fe37b970
Parents: 44fb035
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Mon May 15 14:49:50 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Mon May 15 14:49:50 2017 +0800
----------------------------------------------------------------------
.../connectors/kafka/FlinkKafkaConsumer010.java | 7 ++
.../connectors/kafka/FlinkKafkaConsumer09.java | 6 ++
.../kafka/FlinkKafkaConsumerBase.java | 5 +
.../kafka/FlinkKafkaConsumerBaseTest.java | 74 ++++++++++++++-
.../connectors/kafka/KafkaConsumerTestBase.java | 98 ++++++++++----------
5 files changed, 140 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6fdec897/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
index 9e06d6e..23fc84e 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
@@ -29,6 +29,7 @@ import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
import org.apache.flink.util.PropertiesUtil;
import org.apache.flink.util.SerializedValue;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Collections;
import java.util.Map;
@@ -138,6 +139,12 @@ public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> {
boolean useMetrics = !PropertiesUtil.getBoolean(properties, KEY_DISABLE_METRICS, false);
+ // make sure that auto commit is disabled when our offset commit mode is ON_CHECKPOINTS;
+ // this overwrites whatever setting the user configured in the properties
+ if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS || offsetCommitMode == OffsetCommitMode.DISABLED) {
+ properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+ }
+
return new Kafka010Fetcher<>(
sourceContext,
assignedPartitionsWithInitialOffsets,
http://git-wip-us.apache.org/repos/asf/flink/blob/6fdec897/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
index d0284ce..e638348 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
@@ -181,6 +181,12 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
boolean useMetrics = !PropertiesUtil.getBoolean(properties, KEY_DISABLE_METRICS, false);
+ // make sure that auto commit is disabled when our offset commit mode is ON_CHECKPOINTS;
+ // this overwrites whatever setting the user configured in the properties
+ if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS || offsetCommitMode == OffsetCommitMode.DISABLED) {
+ properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+ }
+
return new Kafka09Fetcher<>(
sourceContext,
assignedPartitionsWithInitialOffsets,
http://git-wip-us.apache.org/repos/asf/flink/blob/6fdec897/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 4a05efa..87bedce 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -769,4 +769,9 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
HashMap<KafkaTopicPartition, Long> getRestoredState() {
return restoredState;
}
+
+ @VisibleForTesting
+ OffsetCommitMode getOffsetCommitMode() {
+ return offsetCommitMode;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6fdec897/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index 4f5b283..ccf2ed2 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -215,6 +215,72 @@ public class FlinkKafkaConsumerBaseTest {
}
@Test
+ public void testConfigureOnCheckpointsCommitMode() {
+
+ DummyFlinkKafkaConsumer consumer = new DummyFlinkKafkaConsumer();
+ consumer.setIsAutoCommitEnabled(true); // this should be ignored
+
+ StreamingRuntimeContext context = mock(StreamingRuntimeContext.class);
+ when(context.getIndexOfThisSubtask()).thenReturn(0);
+ when(context.getNumberOfParallelSubtasks()).thenReturn(1);
+ when(context.isCheckpointingEnabled()).thenReturn(true); // enable checkpointing, auto commit should be ignored
+ consumer.setRuntimeContext(context);
+
+ consumer.open(new Configuration());
+ assertEquals(OffsetCommitMode.ON_CHECKPOINTS, consumer.getOffsetCommitMode());
+ }
+
+ @Test
+ public void testConfigureAutoCommitMode() {
+
+ DummyFlinkKafkaConsumer consumer = new DummyFlinkKafkaConsumer();
+ consumer.setIsAutoCommitEnabled(true);
+
+ StreamingRuntimeContext context = mock(StreamingRuntimeContext.class);
+ when(context.getIndexOfThisSubtask()).thenReturn(0);
+ when(context.getNumberOfParallelSubtasks()).thenReturn(1);
+ when(context.isCheckpointingEnabled()).thenReturn(false); // disable checkpointing, auto commit should be respected
+ consumer.setRuntimeContext(context);
+
+ consumer.open(new Configuration());
+ assertEquals(OffsetCommitMode.KAFKA_PERIODIC, consumer.getOffsetCommitMode());
+ }
+
+ @Test
+ public void testConfigureDisableOffsetCommitWithCheckpointing() {
+
+ DummyFlinkKafkaConsumer consumer = new DummyFlinkKafkaConsumer();
+ consumer.setIsAutoCommitEnabled(true); // this should be ignored
+
+ StreamingRuntimeContext context = mock(StreamingRuntimeContext.class);
+ when(context.getIndexOfThisSubtask()).thenReturn(0);
+ when(context.getNumberOfParallelSubtasks()).thenReturn(1);
+ when(context.isCheckpointingEnabled()).thenReturn(true); // enable checkpointing, auto commit should be ignored
+ consumer.setRuntimeContext(context);
+
+ consumer.setCommitOffsetsOnCheckpoints(false); // disabling offset committing should override everything
+
+ consumer.open(new Configuration());
+ assertEquals(OffsetCommitMode.DISABLED, consumer.getOffsetCommitMode());
+ }
+
+ @Test
+ public void testConfigureDisableOffsetCommitWithoutCheckpointing() {
+
+ DummyFlinkKafkaConsumer consumer = new DummyFlinkKafkaConsumer();
+ consumer.setIsAutoCommitEnabled(false);
+
+ StreamingRuntimeContext context = mock(StreamingRuntimeContext.class);
+ when(context.getIndexOfThisSubtask()).thenReturn(0);
+ when(context.getNumberOfParallelSubtasks()).thenReturn(1);
+ when(context.isCheckpointingEnabled()).thenReturn(false); // disable checkpointing, auto commit should be respected
+ consumer.setRuntimeContext(context);
+
+ consumer.open(new Configuration());
+ assertEquals(OffsetCommitMode.DISABLED, consumer.getOffsetCommitMode());
+ }
+
+ @Test
@SuppressWarnings("unchecked")
public void testSnapshotStateWithCommitOnCheckpointsEnabled() throws Exception {
@@ -496,6 +562,8 @@ public class FlinkKafkaConsumerBaseTest {
private static class DummyFlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T> {
private static final long serialVersionUID = 1L;
+ boolean isAutoCommitEnabled = false;
+
@SuppressWarnings("unchecked")
public DummyFlinkKafkaConsumer() {
super(Arrays.asList("dummy-topic"), (KeyedDeserializationSchema < T >) mock(KeyedDeserializationSchema.class));
@@ -520,7 +588,11 @@ public class FlinkKafkaConsumerBaseTest {
@Override
protected boolean getIsAutoCommitEnabled() {
- return false;
+ return isAutoCommitEnabled;
+ }
+
+ public void setIsAutoCommitEnabled(boolean isAutoCommitEnabled) {
+ this.isAutoCommitEnabled = isAutoCommitEnabled;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6fdec897/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index ba83460..2adb5ec 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -1291,55 +1291,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
}
}
- private static class Tuple2WithTopicSchema implements KeyedDeserializationSchema<Tuple3<Integer, Integer, String>>,
- KeyedSerializationSchema<Tuple3<Integer, Integer, String>> {
-
- private final TypeSerializer<Tuple2<Integer, Integer>> ts;
-
- public Tuple2WithTopicSchema(ExecutionConfig ec) {
- ts = TypeInfoParser.<Tuple2<Integer, Integer>>parse("Tuple2<Integer, Integer>").createSerializer(ec);
- }
-
- @Override
- public Tuple3<Integer, Integer, String> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
- DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message));
- Tuple2<Integer, Integer> t2 = ts.deserialize(in);
- return new Tuple3<>(t2.f0, t2.f1, topic);
- }
-
- @Override
- public boolean isEndOfStream(Tuple3<Integer, Integer, String> nextElement) {
- return false;
- }
-
- @Override
- public TypeInformation<Tuple3<Integer, Integer, String>> getProducedType() {
- return TypeInfoParser.parse("Tuple3<Integer, Integer, String>");
- }
-
- @Override
- public byte[] serializeKey(Tuple3<Integer, Integer, String> element) {
- return null;
- }
-
- @Override
- public byte[] serializeValue(Tuple3<Integer, Integer, String> element) {
- ByteArrayOutputStream by = new ByteArrayOutputStream();
- DataOutputView out = new DataOutputViewStreamWrapper(by);
- try {
- ts.serialize(new Tuple2<>(element.f0, element.f1), out);
- } catch (IOException e) {
- throw new RuntimeException("Error" ,e);
- }
- return by.toByteArray();
- }
-
- @Override
- public String getTargetTopic(Tuple3<Integer, Integer, String> element) {
- return element.f2;
- }
- }
-
/**
* Test Flink's Kafka integration also with very big records (30MB)
* see http://stackoverflow.com/questions/21020347/kafka-sending-a-15mb-message
@@ -2276,4 +2227,53 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
this.numElementsTotal = state.get(0);
}
}
+
+ private static class Tuple2WithTopicSchema implements KeyedDeserializationSchema<Tuple3<Integer, Integer, String>>,
+ KeyedSerializationSchema<Tuple3<Integer, Integer, String>> {
+
+ private final TypeSerializer<Tuple2<Integer, Integer>> ts;
+
+ public Tuple2WithTopicSchema(ExecutionConfig ec) {
+ ts = TypeInfoParser.<Tuple2<Integer, Integer>>parse("Tuple2<Integer, Integer>").createSerializer(ec);
+ }
+
+ @Override
+ public Tuple3<Integer, Integer, String> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
+ DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message));
+ Tuple2<Integer, Integer> t2 = ts.deserialize(in);
+ return new Tuple3<>(t2.f0, t2.f1, topic);
+ }
+
+ @Override
+ public boolean isEndOfStream(Tuple3<Integer, Integer, String> nextElement) {
+ return false;
+ }
+
+ @Override
+ public TypeInformation<Tuple3<Integer, Integer, String>> getProducedType() {
+ return TypeInfoParser.parse("Tuple3<Integer, Integer, String>");
+ }
+
+ @Override
+ public byte[] serializeKey(Tuple3<Integer, Integer, String> element) {
+ return null;
+ }
+
+ @Override
+ public byte[] serializeValue(Tuple3<Integer, Integer, String> element) {
+ ByteArrayOutputStream by = new ByteArrayOutputStream();
+ DataOutputView out = new DataOutputViewStreamWrapper(by);
+ try {
+ ts.serialize(new Tuple2<>(element.f0, element.f1), out);
+ } catch (IOException e) {
+ throw new RuntimeException("Error" ,e);
+ }
+ return by.toByteArray();
+ }
+
+ @Override
+ public String getTargetTopic(Tuple3<Integer, Integer, String> element) {
+ return element.f2;
+ }
+ }
}