You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/17 17:30:12 UTC

[GitHub] [beam] Abacn commented on a diff in pull request #21899: Beam/21742 add warning to risky kafka config

Abacn commented on code in PR #21899:
URL: https://github.com/apache/beam/pull/21899#discussion_r900377853


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -1328,6 +1331,40 @@ && runnerPrefersLegacyRead(input.getPipeline().getOptions()))) {
       return input.apply(new ReadFromKafkaViaSDF<>(this, keyCoder, valueCoder));
     }
 
+    private void warnAboutUnsafeConfigurations(PBegin input) {
+      Long checkpointingInterval =
+          input
+              .getPipeline()
+              .getOptions()
+              .as(FakeFlinkPipelineOptions.class)
+              .getCheckpointingInterval();
+      String autoOffsetReset = (String) getConsumerConfig().get(AUTO_OFFSET_RESET_CONFIG);
+      if (checkpointingInterval != null
+          && checkpointingInterval != -1
+          && Boolean.TRUE.equals(getConsumerConfig().get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG))
+          && !isCommitOffsetsInFinalizeEnabled()
+          && (autoOffsetReset == null || "latest".equals(autoOffsetReset))) {
+        LOG.warn(
+            "When using the Flink runner with checkpointingInterval enabled,"
+                + " Kafka enable.auto.commit enabled,"
+                + " and Kafka auto.offset.reset set to latest or unset,"
+                + " there is a chance for every checkpoint to time out,"
+                + " which will cause data loss."
+                + " We recommend setting commitOffsetInFinalize to true in ReadFromKafka,"
+                + " enable.auto.commit to false, and auto.offset.reset to none");
+      }
+    }
+
+    // This class is designed to mimic the Flink pipeline options, so we can check for the
+    // checkpointingInterval property, but without needing to depend on the Flink runner
+    // Do not use this

Review Comment:
   It would be nice if there is some pattern we can follow when verifying the compatibility of pipeline option with a PTransform. Does this kind of check exist somewhere in the code base? Just a little bit weird that claiming a public interface (which is needed for PipelineOption) but say do not use it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org