You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/11/02 10:00:07 UTC

[09/11] flink git commit: [hotfix][kafka] Move checkpointing enable checking to initializeState

[hotfix][kafka] Move checkpointing enable checking to initializeState

initializeState is called before open and since both of those functions
relay on chosen semantic, that means checkpointing enable check should
happen in initializeState.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/425ffe26
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/425ffe26
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/425ffe26

Branch: refs/heads/master
Commit: 425ffe268f0c5aceac084b522af04736d2298da7
Parents: 856b6ba
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Wed Oct 25 18:08:46 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Thu Nov 2 12:43:20 2017 +0800

----------------------------------------------------------------------
 .../streaming/connectors/kafka/FlinkKafkaProducer011.java | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/425ffe26/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index 6242a20..a69c730 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -524,11 +524,6 @@ public class FlinkKafkaProducer011<IN>
 	 */
 	@Override
 	public void open(Configuration configuration) throws Exception {
-		if (semantic != Semantic.NONE && !((StreamingRuntimeContext) this.getRuntimeContext()).isCheckpointingEnabled()) {
-			LOG.warn("Using {} semantic, but checkpointing is not enabled. Switching to {} semantic.", semantic, Semantic.NONE);
-			semantic = Semantic.NONE;
-		}
-
 		if (logFailuresOnly) {
 			callback = new Callback() {
 				@Override
@@ -787,6 +782,11 @@ public class FlinkKafkaProducer011<IN>
 
 	@Override
 	public void initializeState(FunctionInitializationContext context) throws Exception {
+		if (semantic != Semantic.NONE && !((StreamingRuntimeContext) this.getRuntimeContext()).isCheckpointingEnabled()) {
+			LOG.warn("Using {} semantic, but checkpointing is not enabled. Switching to {} semantic.", semantic, Semantic.NONE);
+			semantic = Semantic.NONE;
+		}
+
 		nextTransactionalIdHintState = context.getOperatorStateStore().getUnionListState(
 			NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);