You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/17 17:48:48 UTC

[GitHub] [beam] johnjcasey commented on a diff in pull request #21899: Beam/21742 add warning to risky kafka config

johnjcasey commented on code in PR #21899:
URL: https://github.com/apache/beam/pull/21899#discussion_r900390329


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -1328,6 +1331,40 @@ && runnerPrefersLegacyRead(input.getPipeline().getOptions()))) {
       return input.apply(new ReadFromKafkaViaSDF<>(this, keyCoder, valueCoder));
     }
 
+    private void warnAboutUnsafeConfigurations(PBegin input) {
+      Long checkpointingInterval =
+          input
+              .getPipeline()
+              .getOptions()
+              .as(FakeFlinkPipelineOptions.class)
+              .getCheckpointingInterval();
+      String autoOffsetReset = (String) getConsumerConfig().get(AUTO_OFFSET_RESET_CONFIG);
+      if (checkpointingInterval != null
+          && checkpointingInterval != -1
+          && Boolean.TRUE.equals(getConsumerConfig().get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG))
+          && !isCommitOffsetsInFinalizeEnabled()
+          && (autoOffsetReset == null || "latest".equals(autoOffsetReset))) {
+        LOG.warn(
+            "When using the Flink runner with checkpointingInterval enabled,"
+                + " Kafka enable.auto.commit enabled,"
+                + " and Kafka auto.offset.reset set to latest or unset,"
+                + " there is a chance for every checkpoint to time out,"
+                + " which will cause data loss."
+                + " We recommend setting commitOffsetInFinalize to true in ReadFromKafka,"
+                + " enable.auto.commit to false, and auto.offset.reset to none");
+      }
+    }
+
+    // This class is designed to mimic the Flink pipeline options, so we can check for the
+    // checkpointingInterval property, but without needing to depend on the Flink runner
+    // Do not use this

Review Comment:
   I want this to be private, but the compiler doesn't like private interfaces



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org