You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/07/13 15:17:15 UTC

[GitHub] [beam] johnjcasey opened a new pull request, #22261: Add comments and logs to warn about Kafka sdf not properly restarting

johnjcasey opened a new pull request, #22261:
URL: https://github.com/apache/beam/pull/22261

   Also added workaround steps
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] chamikaramj commented on a diff in pull request #22261: Disable KafkaIO SDF while it is tested and fixed

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on code in PR #22261:
URL: https://github.com/apache/beam/pull/22261#discussion_r922409814


##########
sdks/python/apache_beam/transforms/external.py:
##########
@@ -706,27 +706,12 @@ class JavaJarExpansionService(object):
   This can be passed into an ExternalTransform as the expansion_service
   argument which will spawn a subprocess using this jar to expand the
   transform.
-
-  Args:

Review Comment:
   Please don't revert changes to external.py since this feature and doc updates are useful in general.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on pull request #22261: Add comments and logs to warn about Kafka sdf not properly restarting

Posted by GitBox <gi...@apache.org>.
Abacn commented on PR #22261:
URL: https://github.com/apache/beam/pull/22261#issuecomment-1184733415

   pydoc precommit failure is due to 
   ```
   packages/apache_beam/io/kafka.py:docstring of apache_beam.io.kafka:67: WARNING: Unexpected indentation.
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #22261: Disable KafkaIO SDF while it is tested and fixed

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #22261:
URL: https://github.com/apache/beam/pull/22261#issuecomment-1190824355

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey closed pull request #22261: Disable KafkaIO SDF while it is tested and fixed

Posted by GitBox <gi...@apache.org>.
johnjcasey closed pull request #22261: Disable KafkaIO SDF while it is tested and fixed
URL: https://github.com/apache/beam/pull/22261


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] chamikaramj commented on a diff in pull request #22261: Add comments and logs to warn about Kafka sdf not properly restarting

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on code in PR #22261:
URL: https://github.com/apache/beam/pull/22261#discussion_r922339733


##########
sdks/python/apache_beam/io/kafka.py:
##########
@@ -76,16 +76,33 @@
 
   For more information specific to Flink runner see:
   - https://beam.apache.org/documentation/runners/flink/
+
+  Reading via Kafka SDF is currently broken, and will cause the pipeline

Review Comment:
   I think we can remove Python updates since we made UnboundedSource wrapped SDF Kafka the default for now: https://github.com/apache/beam/pull/22286



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -543,6 +543,11 @@
  * For any significant significant updates to this I/O connector, please consider involving
  * corresponding code reviewers mentioned <a
  * href="https://github.com/apache/beam/blob/master/sdks/java/io/kafka/OWNERS">here</a>.
+ *
+ * <h1>Reading from Kafka SDF is currently broken, as re-starting the pipeline will cause the
+ * consumer to start from scratch. See <a
+ * href="https://github.com/apache/beam/issues/21730">this</a>. Current workaround is to use
+ * --experimental_option=use_unbounded_sdf_wrapper to use the Unbounded implementation</h1>

Review Comment:
   "For runners that require SDF, current workaround is to use ..."
   
   Also, pls confirm that this works for Java pipelines that use Dataflow Runner v2.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on a diff in pull request #22261: Disable KafkaIO SDF while it is tested and fixed

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on code in PR #22261:
URL: https://github.com/apache/beam/pull/22261#discussion_r926894307


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -1336,28 +1335,34 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
       Coder<K> keyCoder = getKeyCoder(coderRegistry);
       Coder<V> valueCoder = getValueCoder(coderRegistry);
 
-      final KafkaIOReadImplementationCompatibilityResult compatibility =
-          KafkaIOReadImplementationCompatibility.getCompatibility(this);
-
-      // For a number of cases, we prefer using the UnboundedSource Kafka over the new SDF-based
-      // Kafka source, for example,
-      // * Experiments 'beam_fn_api_use_deprecated_read' and use_deprecated_read will result in
-      // legacy UnboundeSource being used.
-      // * Experiment 'use_unbounded_sdf_wrapper' will result in legacy UnboundeSource being used
-      // but will be wrapped by an SDF.
-      // * Some runners or selected features may not be compatible with SDF-based Kafka.
-      if (ExperimentalOptions.hasExperiment(
-              input.getPipeline().getOptions(), "beam_fn_api_use_deprecated_read")
-          || ExperimentalOptions.hasExperiment(
-              input.getPipeline().getOptions(), "use_deprecated_read")
-          || ExperimentalOptions.hasExperiment(
-              input.getPipeline().getOptions(), "use_unbounded_sdf_wrapper")
-          || compatibility.supportsOnly(KafkaIOReadImplementation.LEGACY)
-          || (compatibility.supports(KafkaIOReadImplementation.LEGACY)
-              && runnerPrefersLegacyRead(input.getPipeline().getOptions()))) {
-        return input.apply(new ReadFromKafkaViaUnbounded<>(this, keyCoder, valueCoder));
-      }
-      return input.apply(new ReadFromKafkaViaSDF<>(this, keyCoder, valueCoder));
+      // Reading from Kafka SDF is currently broken, as re-starting the pipeline will cause the

Review Comment:
   If you have this use case, there are probably others. We won't disable this at all then



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #22261: Add comments and logs to warn about Kafka sdf not properly restarting

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #22261:
URL: https://github.com/apache/beam/pull/22261#issuecomment-1184656831

   Run PythonDocs PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] chamikaramj commented on a diff in pull request #22261: Disable KafkaIO SDF while it is tested and fixed

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on code in PR #22261:
URL: https://github.com/apache/beam/pull/22261#discussion_r922402119


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -1345,6 +1339,12 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
       final KafkaIOReadImplementationCompatibilityResult compatibility =
           KafkaIOReadImplementationCompatibility.getCompatibility(this);
 
+      // Reading from Kafka SDF is currently broken, as re-starting the pipeline will cause the
+      // consumer to start from scratch. See https://github.com/apache/beam/issues/21730.

Review Comment:
   Please create a new Github issue that more generically covers issues with SDF Kafka and test coverage and reference here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] nbali commented on a diff in pull request #22261: Disable KafkaIO SDF while it is tested and fixed

Posted by GitBox <gi...@apache.org>.
nbali commented on code in PR #22261:
URL: https://github.com/apache/beam/pull/22261#discussion_r926116045


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -1336,28 +1335,34 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
       Coder<K> keyCoder = getKeyCoder(coderRegistry);
       Coder<V> valueCoder = getValueCoder(coderRegistry);
 
-      final KafkaIOReadImplementationCompatibilityResult compatibility =
-          KafkaIOReadImplementationCompatibility.getCompatibility(this);
-
-      // For a number of cases, we prefer using the UnboundedSource Kafka over the new SDF-based
-      // Kafka source, for example,
-      // * Experiments 'beam_fn_api_use_deprecated_read' and use_deprecated_read will result in
-      // legacy UnboundeSource being used.
-      // * Experiment 'use_unbounded_sdf_wrapper' will result in legacy UnboundeSource being used
-      // but will be wrapped by an SDF.
-      // * Some runners or selected features may not be compatible with SDF-based Kafka.
-      if (ExperimentalOptions.hasExperiment(
-              input.getPipeline().getOptions(), "beam_fn_api_use_deprecated_read")
-          || ExperimentalOptions.hasExperiment(
-              input.getPipeline().getOptions(), "use_deprecated_read")
-          || ExperimentalOptions.hasExperiment(
-              input.getPipeline().getOptions(), "use_unbounded_sdf_wrapper")
-          || compatibility.supportsOnly(KafkaIOReadImplementation.LEGACY)
-          || (compatibility.supports(KafkaIOReadImplementation.LEGACY)
-              && runnerPrefersLegacyRead(input.getPipeline().getOptions()))) {
-        return input.apply(new ReadFromKafkaViaUnbounded<>(this, keyCoder, valueCoder));
-      }
-      return input.apply(new ReadFromKafkaViaSDF<>(this, keyCoder, valueCoder));
+      // Reading from Kafka SDF is currently broken, as re-starting the pipeline will cause the

Review Comment:
   Basically just add to the whole chunk of code you commented here a check like this:
   
   ```
             || ExperimentalOptions.hasExperiment(
                 input.getPipeline().getOptions(), "use_unbounded_sdf_wrapper")
             || getConsumerConfig().get(ConsumerConfig.GROUP_ID_CONFIG) != null
             || compatibility.supportsOnly(KafkaIOReadImplementation.LEGACY)
   ```



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -1336,28 +1335,34 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
       Coder<K> keyCoder = getKeyCoder(coderRegistry);
       Coder<V> valueCoder = getValueCoder(coderRegistry);
 
-      final KafkaIOReadImplementationCompatibilityResult compatibility =
-          KafkaIOReadImplementationCompatibility.getCompatibility(this);
-
-      // For a number of cases, we prefer using the UnboundedSource Kafka over the new SDF-based
-      // Kafka source, for example,
-      // * Experiments 'beam_fn_api_use_deprecated_read' and use_deprecated_read will result in
-      // legacy UnboundeSource being used.
-      // * Experiment 'use_unbounded_sdf_wrapper' will result in legacy UnboundeSource being used
-      // but will be wrapped by an SDF.
-      // * Some runners or selected features may not be compatible with SDF-based Kafka.
-      if (ExperimentalOptions.hasExperiment(
-              input.getPipeline().getOptions(), "beam_fn_api_use_deprecated_read")
-          || ExperimentalOptions.hasExperiment(
-              input.getPipeline().getOptions(), "use_deprecated_read")
-          || ExperimentalOptions.hasExperiment(
-              input.getPipeline().getOptions(), "use_unbounded_sdf_wrapper")
-          || compatibility.supportsOnly(KafkaIOReadImplementation.LEGACY)
-          || (compatibility.supports(KafkaIOReadImplementation.LEGACY)
-              && runnerPrefersLegacyRead(input.getPipeline().getOptions()))) {
-        return input.apply(new ReadFromKafkaViaUnbounded<>(this, keyCoder, valueCoder));
-      }
-      return input.apply(new ReadFromKafkaViaSDF<>(this, keyCoder, valueCoder));
+      // Reading from Kafka SDF is currently broken, as re-starting the pipeline will cause the

Review Comment:
   This isn't a valid reason to _completely_ disable Kafka SDF. What if the consumer is totally fine with starting from scratch? I have business need that requires scanning time ranges - that is only being supported by SDF - without caring about any previous consumer offset. Disable it ONLY if `group.id` is provided.
   
   https://github.com/apache/beam/blob/367173f6245f8df514b2e0526ab659665344f5a6/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java#L93-L94



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on a diff in pull request #22261: Disable KafkaIO SDF while it is tested and fixed

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on code in PR #22261:
URL: https://github.com/apache/beam/pull/22261#discussion_r926867277


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -1336,28 +1335,34 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
       Coder<K> keyCoder = getKeyCoder(coderRegistry);
       Coder<V> valueCoder = getValueCoder(coderRegistry);
 
-      final KafkaIOReadImplementationCompatibilityResult compatibility =
-          KafkaIOReadImplementationCompatibility.getCompatibility(this);
-
-      // For a number of cases, we prefer using the UnboundedSource Kafka over the new SDF-based
-      // Kafka source, for example,
-      // * Experiments 'beam_fn_api_use_deprecated_read' and use_deprecated_read will result in
-      // legacy UnboundeSource being used.
-      // * Experiment 'use_unbounded_sdf_wrapper' will result in legacy UnboundeSource being used
-      // but will be wrapped by an SDF.
-      // * Some runners or selected features may not be compatible with SDF-based Kafka.
-      if (ExperimentalOptions.hasExperiment(
-              input.getPipeline().getOptions(), "beam_fn_api_use_deprecated_read")
-          || ExperimentalOptions.hasExperiment(
-              input.getPipeline().getOptions(), "use_deprecated_read")
-          || ExperimentalOptions.hasExperiment(
-              input.getPipeline().getOptions(), "use_unbounded_sdf_wrapper")
-          || compatibility.supportsOnly(KafkaIOReadImplementation.LEGACY)
-          || (compatibility.supports(KafkaIOReadImplementation.LEGACY)
-              && runnerPrefersLegacyRead(input.getPipeline().getOptions()))) {
-        return input.apply(new ReadFromKafkaViaUnbounded<>(this, keyCoder, valueCoder));
-      }
-      return input.apply(new ReadFromKafkaViaSDF<>(this, keyCoder, valueCoder));
+      // Reading from Kafka SDF is currently broken, as re-starting the pipeline will cause the

Review Comment:
   This is temporary. We primarily want to make sure that a new user won't run into this problem. We intend to fix this as rapidly as possible. If you have a less typical use case, that will still work on existing versions of Beam, while we try to get this fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #22261: Disable KafkaIO SDF while it is tested and fixed

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #22261:
URL: https://github.com/apache/beam/pull/22261#issuecomment-1190807802

   Run Python_PVR_Flink PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #22261: Add comments and logs to warn about Kafka sdf not properly restarting

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #22261:
URL: https://github.com/apache/beam/pull/22261#issuecomment-1184656605

   Run Portable_Python PreCommit
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] chamikaramj commented on a diff in pull request #22261: Add comments and logs to warn about Kafka sdf not properly restarting

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on code in PR #22261:
URL: https://github.com/apache/beam/pull/22261#discussion_r920226998


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -543,6 +543,11 @@
  * For any significant significant updates to this I/O connector, please consider involving
  * corresponding code reviewers mentioned <a
  * href="https://github.com/apache/beam/blob/master/sdks/java/io/kafka/OWNERS">here</a>.
+ *
+ * <h1>Reading from Kafka SDF is currently broken, as re-starting the pipeline will cause the
+ * consumer to start from scratch. See <a
+ * href="https://github.com/apache/beam/issues/21730">this</a>. Current workaround is to use
+ * --experimental_option=use_deprecated_read to use the Unbounded implementation</h1>

Review Comment:
   I don't think this will work for Dataflow Runner v2 Java pipelines. Can you try ?
   
   "use_unbounded_sdf_wrapper" should work but I only tried it for x-lang.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #22261: Add comments and logs to warn about Kafka sdf not properly restarting

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #22261:
URL: https://github.com/apache/beam/pull/22261#issuecomment-1184624456

   Assigning reviewers. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @pabloem for label python.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
   
   The PR bot will only process comments in the main thread (not review comments).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #22261: Add comments and logs to warn about Kafka sdf not properly restarting

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #22261:
URL: https://github.com/apache/beam/pull/22261#issuecomment-1184656714

   Run Python PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #22261: Add comments and logs to warn about Kafka sdf not properly restarting

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #22261:
URL: https://github.com/apache/beam/pull/22261#issuecomment-1183403708

   Run Spotless Precommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #22261: Add comments and logs to warn about Kafka sdf not properly restarting

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #22261:
URL: https://github.com/apache/beam/pull/22261#issuecomment-1183457372

   Run PythonDocs PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #22261: Add comments and logs to warn about Kafka sdf not properly restarting

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #22261:
URL: https://github.com/apache/beam/pull/22261#issuecomment-1183456324

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #22261: Add comments and logs to warn about Kafka sdf not properly restarting

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #22261:
URL: https://github.com/apache/beam/pull/22261#issuecomment-1185643963

   R: @chamikaramj 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #22261: Add comments and logs to warn about Kafka sdf not properly restarting

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #22261:
URL: https://github.com/apache/beam/pull/22261#issuecomment-1184864096

   codecov is exclusively unrelated changes, so there is no need to interact with it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #22261: Disable KafkaIO SDF while it is tested and fixed

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #22261:
URL: https://github.com/apache/beam/pull/22261#issuecomment-1190834354

   Run Python_PVR_Flink PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] codecov[bot] commented on pull request #22261: Add comments and logs to warn about Kafka sdf not properly restarting

Posted by GitBox <gi...@apache.org>.
codecov[bot] commented on PR #22261:
URL: https://github.com/apache/beam/pull/22261#issuecomment-1184518328

   # [Codecov](https://codecov.io/gh/apache/beam/pull/22261?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#22261](https://codecov.io/gh/apache/beam/pull/22261?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (aeb6b6e) into [master](https://codecov.io/gh/apache/beam/commit/e6de18c4d63e556b911b596e3884dd2a6e0450e4?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (e6de18c) will **decrease** coverage by `0.01%`.
   > The diff coverage is `66.66%`.
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #22261      +/-   ##
   ==========================================
   - Coverage   74.26%   74.24%   -0.02%     
   ==========================================
     Files         702      702              
     Lines       93019    93022       +3     
   ==========================================
   - Hits        69076    69066      -10     
   - Misses      22676    22689      +13     
     Partials     1267     1267              
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | python | `83.62% <66.66%> (-0.02%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/22261?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [sdks/python/apache\_beam/io/kafka.py](https://codecov.io/gh/apache/beam/pull/22261/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8va2Fma2EucHk=) | `78.57% <66.66%> (-1.43%)` | :arrow_down: |
   | [.../python/apache\_beam/testing/test\_stream\_service.py](https://codecov.io/gh/apache/beam/pull/22261/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy90ZXN0X3N0cmVhbV9zZXJ2aWNlLnB5) | `88.09% <0.00%> (-4.77%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/source\_test\_utils.py](https://codecov.io/gh/apache/beam/pull/22261/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vc291cmNlX3Rlc3RfdXRpbHMucHk=) | `88.01% <0.00%> (-1.39%)` | :arrow_down: |
   | [...che\_beam/runners/interactive/interactive\_runner.py](https://codecov.io/gh/apache/beam/pull/22261/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9pbnRlcmFjdGl2ZV9ydW5uZXIucHk=) | `90.06% <0.00%> (-1.33%)` | :arrow_down: |
   | [...eam/runners/portability/fn\_api\_runner/execution.py](https://codecov.io/gh/apache/beam/pull/22261/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL2V4ZWN1dGlvbi5weQ==) | `92.44% <0.00%> (-0.65%)` | :arrow_down: |
   | [sdks/python/apache\_beam/runners/direct/executor.py](https://codecov.io/gh/apache/beam/pull/22261/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvZXhlY3V0b3IucHk=) | `96.46% <0.00%> (-0.55%)` | :arrow_down: |
   | [...hon/apache\_beam/runners/worker/bundle\_processor.py](https://codecov.io/gh/apache/beam/pull/22261/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvYnVuZGxlX3Byb2Nlc3Nvci5weQ==) | `93.42% <0.00%> (ø)` | |
   | [sdks/python/apache\_beam/runners/common.py](https://codecov.io/gh/apache/beam/pull/22261/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9jb21tb24ucHk=) | `88.71% <0.00%> (+0.12%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/22261?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/22261?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [e6de18c...aeb6b6e](https://codecov.io/gh/apache/beam/pull/22261?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on a diff in pull request #22261: Add comments and logs to warn about Kafka sdf not properly restarting

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on code in PR #22261:
URL: https://github.com/apache/beam/pull/22261#discussion_r920231692


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -543,6 +543,11 @@
  * For any significant significant updates to this I/O connector, please consider involving
  * corresponding code reviewers mentioned <a
  * href="https://github.com/apache/beam/blob/master/sdks/java/io/kafka/OWNERS">here</a>.
+ *
+ * <h1>Reading from Kafka SDF is currently broken, as re-starting the pipeline will cause the
+ * consumer to start from scratch. See <a
+ * href="https://github.com/apache/beam/issues/21730">this</a>. Current workaround is to use
+ * --experimental_option=use_deprecated_read to use the Unbounded implementation</h1>

Review Comment:
   will do once your change is merged



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #22261: Add comments and logs to warn about Kafka sdf not properly restarting

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #22261:
URL: https://github.com/apache/beam/pull/22261#issuecomment-1183476395

   Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment `assign set of reviewers`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #22261: Add comments and logs to warn about Kafka sdf not properly restarting

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #22261:
URL: https://github.com/apache/beam/pull/22261#issuecomment-1185687995

   Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] nbali commented on a diff in pull request #22261: Disable KafkaIO SDF while it is tested and fixed

Posted by GitBox <gi...@apache.org>.
nbali commented on code in PR #22261:
URL: https://github.com/apache/beam/pull/22261#discussion_r926883593


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -1336,28 +1335,34 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
       Coder<K> keyCoder = getKeyCoder(coderRegistry);
       Coder<V> valueCoder = getValueCoder(coderRegistry);
 
-      final KafkaIOReadImplementationCompatibilityResult compatibility =
-          KafkaIOReadImplementationCompatibility.getCompatibility(this);
-
-      // For a number of cases, we prefer using the UnboundedSource Kafka over the new SDF-based
-      // Kafka source, for example,
-      // * Experiments 'beam_fn_api_use_deprecated_read' and use_deprecated_read will result in
-      // legacy UnboundeSource being used.
-      // * Experiment 'use_unbounded_sdf_wrapper' will result in legacy UnboundeSource being used
-      // but will be wrapped by an SDF.
-      // * Some runners or selected features may not be compatible with SDF-based Kafka.
-      if (ExperimentalOptions.hasExperiment(
-              input.getPipeline().getOptions(), "beam_fn_api_use_deprecated_read")
-          || ExperimentalOptions.hasExperiment(
-              input.getPipeline().getOptions(), "use_deprecated_read")
-          || ExperimentalOptions.hasExperiment(
-              input.getPipeline().getOptions(), "use_unbounded_sdf_wrapper")
-          || compatibility.supportsOnly(KafkaIOReadImplementation.LEGACY)
-          || (compatibility.supports(KafkaIOReadImplementation.LEGACY)
-              && runnerPrefersLegacyRead(input.getPipeline().getOptions()))) {
-        return input.apply(new ReadFromKafkaViaUnbounded<>(this, keyCoder, valueCoder));
-      }
-      return input.apply(new ReadFromKafkaViaSDF<>(this, keyCoder, valueCoder));
+      // Reading from Kafka SDF is currently broken, as re-starting the pipeline will cause the

Review Comment:
   If this gets merged into master and gets released it's not temporary. Totally removing the SDF support is a breaking change. It should be as minimally breaking as possible. I have shown one precondition that indicates it works just fine even in this bugged state. There could be even more. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org