You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/07/20 22:58:40 UTC

[GitHub] [beam] nbali commented on a diff in pull request #22261: Disable KafkaIO SDF while it is tested and fixed

nbali commented on code in PR #22261:
URL: https://github.com/apache/beam/pull/22261#discussion_r926116045


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -1336,28 +1335,34 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
       Coder<K> keyCoder = getKeyCoder(coderRegistry);
       Coder<V> valueCoder = getValueCoder(coderRegistry);
 
-      final KafkaIOReadImplementationCompatibilityResult compatibility =
-          KafkaIOReadImplementationCompatibility.getCompatibility(this);
-
-      // For a number of cases, we prefer using the UnboundedSource Kafka over the new SDF-based
-      // Kafka source, for example,
-      // * Experiments 'beam_fn_api_use_deprecated_read' and use_deprecated_read will result in
-      // legacy UnboundeSource being used.
-      // * Experiment 'use_unbounded_sdf_wrapper' will result in legacy UnboundeSource being used
-      // but will be wrapped by an SDF.
-      // * Some runners or selected features may not be compatible with SDF-based Kafka.
-      if (ExperimentalOptions.hasExperiment(
-              input.getPipeline().getOptions(), "beam_fn_api_use_deprecated_read")
-          || ExperimentalOptions.hasExperiment(
-              input.getPipeline().getOptions(), "use_deprecated_read")
-          || ExperimentalOptions.hasExperiment(
-              input.getPipeline().getOptions(), "use_unbounded_sdf_wrapper")
-          || compatibility.supportsOnly(KafkaIOReadImplementation.LEGACY)
-          || (compatibility.supports(KafkaIOReadImplementation.LEGACY)
-              && runnerPrefersLegacyRead(input.getPipeline().getOptions()))) {
-        return input.apply(new ReadFromKafkaViaUnbounded<>(this, keyCoder, valueCoder));
-      }
-      return input.apply(new ReadFromKafkaViaSDF<>(this, keyCoder, valueCoder));
+      // Reading from Kafka SDF is currently broken, as re-starting the pipeline will cause the

Review Comment:
   Basically just add to the whole chunk of code you commented here a check like this:
   
   ```
             || ExperimentalOptions.hasExperiment(
                 input.getPipeline().getOptions(), "use_unbounded_sdf_wrapper")
             || getConsumerConfig().get(ConsumerConfig.GROUP_ID_CONFIG) != null
             || compatibility.supportsOnly(KafkaIOReadImplementation.LEGACY)
   ```



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -1336,28 +1335,34 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
       Coder<K> keyCoder = getKeyCoder(coderRegistry);
       Coder<V> valueCoder = getValueCoder(coderRegistry);
 
-      final KafkaIOReadImplementationCompatibilityResult compatibility =
-          KafkaIOReadImplementationCompatibility.getCompatibility(this);
-
-      // For a number of cases, we prefer using the UnboundedSource Kafka over the new SDF-based
-      // Kafka source, for example,
-      // * Experiments 'beam_fn_api_use_deprecated_read' and use_deprecated_read will result in
-      // legacy UnboundeSource being used.
-      // * Experiment 'use_unbounded_sdf_wrapper' will result in legacy UnboundeSource being used
-      // but will be wrapped by an SDF.
-      // * Some runners or selected features may not be compatible with SDF-based Kafka.
-      if (ExperimentalOptions.hasExperiment(
-              input.getPipeline().getOptions(), "beam_fn_api_use_deprecated_read")
-          || ExperimentalOptions.hasExperiment(
-              input.getPipeline().getOptions(), "use_deprecated_read")
-          || ExperimentalOptions.hasExperiment(
-              input.getPipeline().getOptions(), "use_unbounded_sdf_wrapper")
-          || compatibility.supportsOnly(KafkaIOReadImplementation.LEGACY)
-          || (compatibility.supports(KafkaIOReadImplementation.LEGACY)
-              && runnerPrefersLegacyRead(input.getPipeline().getOptions()))) {
-        return input.apply(new ReadFromKafkaViaUnbounded<>(this, keyCoder, valueCoder));
-      }
-      return input.apply(new ReadFromKafkaViaSDF<>(this, keyCoder, valueCoder));
+      // Reading from Kafka SDF is currently broken, as re-starting the pipeline will cause the

Review Comment:
   This isn't a valid reason to _completely_ disable Kafka SDF. What if the consumer is totally fine with starting from scratch? I have business need that requires scanning time ranges - that is only being supported by SDF - without caring about any previous consumer offset. Disable it ONLY if `group.id` is provided.
   
   https://github.com/apache/beam/blob/367173f6245f8df514b2e0526ab659665344f5a6/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java#L93-L94



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org