You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/05/15 07:58:05 UTC

flink git commit: [FLINK-6520] [kafka] Overwrite auto commit props for ON_CHECKPOINTS / DISABLED commit mode

Repository: flink
Updated Branches:
  refs/heads/master 44fb035e0 -> 6fdec897d


[FLINK-6520] [kafka] Overwrite auto commit props for ON_CHECKPOINTS / DISABLED commit mode


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6fdec897
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6fdec897
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6fdec897

Branch: refs/heads/master
Commit: 6fdec897d4b247975587895fa2186e37fe37b970
Parents: 44fb035
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Mon May 15 14:49:50 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Mon May 15 14:49:50 2017 +0800

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaConsumer010.java |  7 ++
 .../connectors/kafka/FlinkKafkaConsumer09.java  |  6 ++
 .../kafka/FlinkKafkaConsumerBase.java           |  5 +
 .../kafka/FlinkKafkaConsumerBaseTest.java       | 74 ++++++++++++++-
 .../connectors/kafka/KafkaConsumerTestBase.java | 98 ++++++++++----------
 5 files changed, 140 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6fdec897/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
index 9e06d6e..23fc84e 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
@@ -29,6 +29,7 @@ import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
 import org.apache.flink.util.PropertiesUtil;
 import org.apache.flink.util.SerializedValue;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 
 import java.util.Collections;
 import java.util.Map;
@@ -138,6 +139,12 @@ public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> {
 
 		boolean useMetrics = !PropertiesUtil.getBoolean(properties, KEY_DISABLE_METRICS, false);
 
+		// make sure that auto commit is disabled when our offset commit mode is ON_CHECKPOINTS;
+		// this overwrites whatever setting the user configured in the properties
+		if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS || offsetCommitMode == OffsetCommitMode.DISABLED) {
+			properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+		}
+
 		return new Kafka010Fetcher<>(
 				sourceContext,
 				assignedPartitionsWithInitialOffsets,

http://git-wip-us.apache.org/repos/asf/flink/blob/6fdec897/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
index d0284ce..e638348 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
@@ -181,6 +181,12 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
 
 		boolean useMetrics = !PropertiesUtil.getBoolean(properties, KEY_DISABLE_METRICS, false);
 
+		// make sure that auto commit is disabled when our offset commit mode is ON_CHECKPOINTS;
+		// this overwrites whatever setting the user configured in the properties
+		if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS || offsetCommitMode == OffsetCommitMode.DISABLED) {
+			properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+		}
+
 		return new Kafka09Fetcher<>(
 				sourceContext,
 				assignedPartitionsWithInitialOffsets,

http://git-wip-us.apache.org/repos/asf/flink/blob/6fdec897/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 4a05efa..87bedce 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -769,4 +769,9 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	HashMap<KafkaTopicPartition, Long> getRestoredState() {
 		return restoredState;
 	}
+
+	@VisibleForTesting
+	OffsetCommitMode getOffsetCommitMode() {
+		return offsetCommitMode;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6fdec897/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index 4f5b283..ccf2ed2 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -215,6 +215,72 @@ public class FlinkKafkaConsumerBaseTest {
 	}
 
 	@Test
+	public void testConfigureOnCheckpointsCommitMode() {
+
+		DummyFlinkKafkaConsumer consumer = new DummyFlinkKafkaConsumer();
+		consumer.setIsAutoCommitEnabled(true); // this should be ignored
+
+		StreamingRuntimeContext context = mock(StreamingRuntimeContext.class);
+		when(context.getIndexOfThisSubtask()).thenReturn(0);
+		when(context.getNumberOfParallelSubtasks()).thenReturn(1);
+		when(context.isCheckpointingEnabled()).thenReturn(true); // enable checkpointing, auto commit should be ignored
+		consumer.setRuntimeContext(context);
+
+		consumer.open(new Configuration());
+		assertEquals(OffsetCommitMode.ON_CHECKPOINTS, consumer.getOffsetCommitMode());
+	}
+
+	@Test
+	public void testConfigureAutoCommitMode() {
+
+		DummyFlinkKafkaConsumer consumer = new DummyFlinkKafkaConsumer();
+		consumer.setIsAutoCommitEnabled(true);
+
+		StreamingRuntimeContext context = mock(StreamingRuntimeContext.class);
+		when(context.getIndexOfThisSubtask()).thenReturn(0);
+		when(context.getNumberOfParallelSubtasks()).thenReturn(1);
+		when(context.isCheckpointingEnabled()).thenReturn(false); // disable checkpointing, auto commit should be respected
+		consumer.setRuntimeContext(context);
+
+		consumer.open(new Configuration());
+		assertEquals(OffsetCommitMode.KAFKA_PERIODIC, consumer.getOffsetCommitMode());
+	}
+
+	@Test
+	public void testConfigureDisableOffsetCommitWithCheckpointing() {
+
+		DummyFlinkKafkaConsumer consumer = new DummyFlinkKafkaConsumer();
+		consumer.setIsAutoCommitEnabled(true); // this should be ignored
+
+		StreamingRuntimeContext context = mock(StreamingRuntimeContext.class);
+		when(context.getIndexOfThisSubtask()).thenReturn(0);
+		when(context.getNumberOfParallelSubtasks()).thenReturn(1);
+		when(context.isCheckpointingEnabled()).thenReturn(true); // enable checkpointing, auto commit should be ignored
+		consumer.setRuntimeContext(context);
+
+		consumer.setCommitOffsetsOnCheckpoints(false); // disabling offset committing should override everything
+
+		consumer.open(new Configuration());
+		assertEquals(OffsetCommitMode.DISABLED, consumer.getOffsetCommitMode());
+	}
+
+	@Test
+	public void testConfigureDisableOffsetCommitWithoutCheckpointing() {
+
+		DummyFlinkKafkaConsumer consumer = new DummyFlinkKafkaConsumer();
+		consumer.setIsAutoCommitEnabled(false);
+
+		StreamingRuntimeContext context = mock(StreamingRuntimeContext.class);
+		when(context.getIndexOfThisSubtask()).thenReturn(0);
+		when(context.getNumberOfParallelSubtasks()).thenReturn(1);
+		when(context.isCheckpointingEnabled()).thenReturn(false); // disable checkpointing, auto commit should be respected
+		consumer.setRuntimeContext(context);
+
+		consumer.open(new Configuration());
+		assertEquals(OffsetCommitMode.DISABLED, consumer.getOffsetCommitMode());
+	}
+
+	@Test
 	@SuppressWarnings("unchecked")
 	public void testSnapshotStateWithCommitOnCheckpointsEnabled() throws Exception {
 
@@ -496,6 +562,8 @@ public class FlinkKafkaConsumerBaseTest {
 	private static class DummyFlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T> {
 		private static final long serialVersionUID = 1L;
 
+		boolean isAutoCommitEnabled = false;
+
 		@SuppressWarnings("unchecked")
 		public DummyFlinkKafkaConsumer() {
 			super(Arrays.asList("dummy-topic"), (KeyedDeserializationSchema < T >) mock(KeyedDeserializationSchema.class));
@@ -520,7 +588,11 @@ public class FlinkKafkaConsumerBaseTest {
 
 		@Override
 		protected boolean getIsAutoCommitEnabled() {
-			return false;
+			return isAutoCommitEnabled;
+		}
+
+		public void setIsAutoCommitEnabled(boolean isAutoCommitEnabled) {
+			this.isAutoCommitEnabled = isAutoCommitEnabled;
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6fdec897/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index ba83460..2adb5ec 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -1291,55 +1291,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		}
 	}
 
-	private static class Tuple2WithTopicSchema implements KeyedDeserializationSchema<Tuple3<Integer, Integer, String>>,
-			KeyedSerializationSchema<Tuple3<Integer, Integer, String>> {
-
-		private final TypeSerializer<Tuple2<Integer, Integer>> ts;
-		
-		public Tuple2WithTopicSchema(ExecutionConfig ec) {
-			ts = TypeInfoParser.<Tuple2<Integer, Integer>>parse("Tuple2<Integer, Integer>").createSerializer(ec);
-		}
-
-		@Override
-		public Tuple3<Integer, Integer, String> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
-			DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message));
-			Tuple2<Integer, Integer> t2 = ts.deserialize(in);
-			return new Tuple3<>(t2.f0, t2.f1, topic);
-		}
-
-		@Override
-		public boolean isEndOfStream(Tuple3<Integer, Integer, String> nextElement) {
-			return false;
-		}
-
-		@Override
-		public TypeInformation<Tuple3<Integer, Integer, String>> getProducedType() {
-			return TypeInfoParser.parse("Tuple3<Integer, Integer, String>");
-		}
-
-		@Override
-		public byte[] serializeKey(Tuple3<Integer, Integer, String> element) {
-			return null;
-		}
-
-		@Override
-		public byte[] serializeValue(Tuple3<Integer, Integer, String> element) {
-			ByteArrayOutputStream by = new ByteArrayOutputStream();
-			DataOutputView out = new DataOutputViewStreamWrapper(by);
-			try {
-				ts.serialize(new Tuple2<>(element.f0, element.f1), out);
-			} catch (IOException e) {
-				throw new RuntimeException("Error" ,e);
-			}
-			return by.toByteArray();
-		}
-
-		@Override
-		public String getTargetTopic(Tuple3<Integer, Integer, String> element) {
-			return element.f2;
-		}
-	}
-
 	/**
 	 * Test Flink's Kafka integration also with very big records (30MB)
 	 * see http://stackoverflow.com/questions/21020347/kafka-sending-a-15mb-message
@@ -2276,4 +2227,53 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			this.numElementsTotal = state.get(0);
 		}
 	}
+
+	private static class Tuple2WithTopicSchema implements KeyedDeserializationSchema<Tuple3<Integer, Integer, String>>,
+		KeyedSerializationSchema<Tuple3<Integer, Integer, String>> {
+
+		private final TypeSerializer<Tuple2<Integer, Integer>> ts;
+
+		public Tuple2WithTopicSchema(ExecutionConfig ec) {
+			ts = TypeInfoParser.<Tuple2<Integer, Integer>>parse("Tuple2<Integer, Integer>").createSerializer(ec);
+		}
+
+		@Override
+		public Tuple3<Integer, Integer, String> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
+			DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message));
+			Tuple2<Integer, Integer> t2 = ts.deserialize(in);
+			return new Tuple3<>(t2.f0, t2.f1, topic);
+		}
+
+		@Override
+		public boolean isEndOfStream(Tuple3<Integer, Integer, String> nextElement) {
+			return false;
+		}
+
+		@Override
+		public TypeInformation<Tuple3<Integer, Integer, String>> getProducedType() {
+			return TypeInfoParser.parse("Tuple3<Integer, Integer, String>");
+		}
+
+		@Override
+		public byte[] serializeKey(Tuple3<Integer, Integer, String> element) {
+			return null;
+		}
+
+		@Override
+		public byte[] serializeValue(Tuple3<Integer, Integer, String> element) {
+			ByteArrayOutputStream by = new ByteArrayOutputStream();
+			DataOutputView out = new DataOutputViewStreamWrapper(by);
+			try {
+				ts.serialize(new Tuple2<>(element.f0, element.f1), out);
+			} catch (IOException e) {
+				throw new RuntimeException("Error" ,e);
+			}
+			return by.toByteArray();
+		}
+
+		@Override
+		public String getTargetTopic(Tuple3<Integer, Integer, String> element) {
+			return element.f2;
+		}
+	}
 }