You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/08/25 05:11:29 UTC

[GitHub] [beam] nehsyc opened a new pull request #12678: [BEAM-10703] Add a step property for shardable states during Dataflow graph translation (Java)

nehsyc opened a new pull request #12678:
URL: https://github.com/apache/beam/pull/12678


   To enable optimization for the transform `GroupIntoBatches`, add a new step property when translating stateful DoFns to inform the backend that the special transform allows shardable states.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [x] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [x] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](htt
 ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_
 Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_P
 ostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website | Whitespace
   --- | --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) <br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/be
 am_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   ![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg)
   ![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg)
   ![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #12678: [BEAM-10703] Add a step property for shardable states during Dataflow graph translation (Java)

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #12678:
URL: https://github.com/apache/beam/pull/12678#issuecomment-691312842


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #12678: [BEAM-10703] Add a step property for shardable states during Dataflow graph translation (Java)

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #12678:
URL: https://github.com/apache/beam/pull/12678#discussion_r477492609



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
##########
@@ -100,8 +99,7 @@ public long getBatchSize() {
         ParDo.of(new GroupIntoBatchesDoFn<>(batchSize, allowedLateness, keyCoder, valueCoder)));
   }
 
-  @VisibleForTesting
-  static class GroupIntoBatchesDoFn<K, InputT>
+  public static class GroupIntoBatchesDoFn<K, InputT>

Review comment:
       There have been a few ways this has been done in the past:
   * (easiest), record which transforms need this property within the DataflowRunner and then lookup this information during translation (e.g. [doesPCollectionRequireIndexedFormat](https://github.com/apache/beam/blob/b1849ed09fb236906ff0b83b0f394c08b05d4b3c/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L1260))
   * replace the public GroupIntoBatches transform with a Dataflow specific `primitive` that makes any additional information visible that is needed during translation (e.g. [DataflowRunner.CombineGroupedValues](https://github.com/apache/beam/blob/b1849ed09fb236906ff0b83b0f394c08b05d4b3c/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java#L799))




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz merged pull request #12678: [BEAM-10703] Add a step property for shardable states during Dataflow graph translation (Java)

Posted by GitBox <gi...@apache.org>.
boyuanzz merged pull request #12678:
URL: https://github.com/apache/beam/pull/12678


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] nehsyc commented on pull request #12678: [BEAM-10703] Add a step property for shardable states during Dataflow graph translation (Java)

Posted by GitBox <gi...@apache.org>.
nehsyc commented on pull request #12678:
URL: https://github.com/apache/beam/pull/12678#issuecomment-679669283


   R: @boyuanzz 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] nehsyc commented on a change in pull request #12678: [BEAM-10703] Add a step property for shardable states during Dataflow graph translation (Java)

Posted by GitBox <gi...@apache.org>.
nehsyc commented on a change in pull request #12678:
URL: https://github.com/apache/beam/pull/12678#discussion_r476909404



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
##########
@@ -100,8 +99,7 @@ public long getBatchSize() {
         ParDo.of(new GroupIntoBatchesDoFn<>(batchSize, allowedLateness, keyCoder, valueCoder)));
   }
 
-  @VisibleForTesting
-  static class GroupIntoBatchesDoFn<K, InputT>
+  public static class GroupIntoBatchesDoFn<K, InputT>

Review comment:
       I assume that we would still need to access to `GroupIntoBatchesDoFn` if we register a separate translator, to fill USER_FN for example: https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java#L1241.
   
   Can the same transform go though multiple translators? Is that what you were suggesting?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #12678: [BEAM-10703] Add a step property for shardable states during Dataflow graph translation (Java)

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #12678:
URL: https://github.com/apache/beam/pull/12678#discussion_r476618504



##########
File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
##########
@@ -1264,6 +1268,10 @@ private static void translateFn(
     // in streaming but does not work in batch
     if (context.getPipelineOptions().isStreaming() && isStateful) {
       stepContext.addInput(PropertyNames.USES_KEYED_STATE, "true");

Review comment:
       Is `ALLOWS_SHARDABLE_STATE ` supported by both streaming engine and windmill appliance? If it's only for streaming engine, is it safe to populate this field for both?

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
##########
@@ -100,8 +99,7 @@ public long getBatchSize() {
         ParDo.of(new GroupIntoBatchesDoFn<>(batchSize, allowedLateness, keyCoder, valueCoder)));
   }
 
-  @VisibleForTesting
-  static class GroupIntoBatchesDoFn<K, InputT>
+  public static class GroupIntoBatchesDoFn<K, InputT>

Review comment:
       I don't think we want to make `GroupInToBatchesDoFn` as `public`. Instead of adding property in `translateFn `, we can do it at transform level, where you can check `transform instanceof GroupIntoBatches`, for example: https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java#L752




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #12678: [BEAM-10703] Add a step property for shardable states during Dataflow graph translation (Java)

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #12678:
URL: https://github.com/apache/beam/pull/12678#discussion_r477492609



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
##########
@@ -100,8 +99,7 @@ public long getBatchSize() {
         ParDo.of(new GroupIntoBatchesDoFn<>(batchSize, allowedLateness, keyCoder, valueCoder)));
   }
 
-  @VisibleForTesting
-  static class GroupIntoBatchesDoFn<K, InputT>
+  public static class GroupIntoBatchesDoFn<K, InputT>

Review comment:
       There have been a few ways this has been done in the past:
   * (easiest), record which objects need this property within the DataflowRunner and then lookup this information during translation (e.g. [doesPCollectionRequireIndexedFormat](https://github.com/apache/beam/blob/b1849ed09fb236906ff0b83b0f394c08b05d4b3c/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L1260))
   * replace the public GroupIntoBatches transform with a Dataflow specific `primitive` that makes any additional information visible that is needed during translation (e.g. [DataflowRunner.CombineGroupedValues](https://github.com/apache/beam/blob/b1849ed09fb236906ff0b83b0f394c08b05d4b3c/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java#L799))




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #12678: [BEAM-10703] Add a step property for shardable states during Dataflow graph translation (Java)

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #12678:
URL: https://github.com/apache/beam/pull/12678#issuecomment-691312842






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] nehsyc commented on a change in pull request #12678: [BEAM-10703] Add a step property for shardable states during Dataflow graph translation (Java)

Posted by GitBox <gi...@apache.org>.
nehsyc commented on a change in pull request #12678:
URL: https://github.com/apache/beam/pull/12678#discussion_r485991869



##########
File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
##########
@@ -1264,6 +1268,10 @@ private static void translateFn(
     // in streaming but does not work in batch
     if (context.getPipelineOptions().isStreaming() && isStateful) {
       stepContext.addInput(PropertyNames.USES_KEYED_STATE, "true");

Review comment:
       Update on this:
   
   I added an experiment to gate the auto-sharding so this can be merged without waiting for the backend. It will also make the testing easier.
   
   I also added a check for the experiment, "beam_fn_api". My intention was to disable the feature for unified worker but I guess this way we would disable auto-sharding for both unified worker and java worker using fn api - I remember that we are not going to support the latter so it seems fine to me. But let me know if my understanding is incorrect.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #12678: [BEAM-10703] Add a step property for shardable states during Dataflow graph translation (Java)

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #12678:
URL: https://github.com/apache/beam/pull/12678#issuecomment-691312842


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] nehsyc commented on a change in pull request #12678: [BEAM-10703] Add a step property for shardable states during Dataflow graph translation (Java)

Posted by GitBox <gi...@apache.org>.
nehsyc commented on a change in pull request #12678:
URL: https://github.com/apache/beam/pull/12678#discussion_r476835000



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
##########
@@ -100,8 +99,7 @@ public long getBatchSize() {
         ParDo.of(new GroupIntoBatchesDoFn<>(batchSize, allowedLateness, keyCoder, valueCoder)));
   }
 
-  @VisibleForTesting
-  static class GroupIntoBatchesDoFn<K, InputT>
+  public static class GroupIntoBatchesDoFn<K, InputT>

Review comment:
       Yeah I did try matching the transform by `transform instanceof GroupIntoBatches` but got error:
   ```
   Inconvertible types; cannot cast 'org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory.ParDoSingle<InputT,OutputT>' to 'org.apache.beam.sdk.transforms.GroupIntoBatches'
   ```
   Any ideas?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aaltay commented on pull request #12678: [BEAM-10703] Add a step property for shardable states during Dataflow graph translation (Java)

Posted by GitBox <gi...@apache.org>.
aaltay commented on pull request #12678:
URL: https://github.com/apache/beam/pull/12678#issuecomment-694465118


   Is this ready to be merged?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #12678: [BEAM-10703] Add a step property for shardable states during Dataflow graph translation (Java)

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #12678:
URL: https://github.com/apache/beam/pull/12678#issuecomment-691312842






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] nehsyc commented on a change in pull request #12678: [BEAM-10703] Add a step property for shardable states during Dataflow graph translation (Java)

Posted by GitBox <gi...@apache.org>.
nehsyc commented on a change in pull request #12678:
URL: https://github.com/apache/beam/pull/12678#discussion_r485993132



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
##########
@@ -100,8 +99,7 @@ public long getBatchSize() {
         ParDo.of(new GroupIntoBatchesDoFn<>(batchSize, allowedLateness, keyCoder, valueCoder)));
   }
 
-  @VisibleForTesting
-  static class GroupIntoBatchesDoFn<K, InputT>
+  public static class GroupIntoBatchesDoFn<K, InputT>

Review comment:
       Thanks! I applied the first solution recommended above and updated the PR accordingly - I added an override for streaming GroupIntoBatches which records the input PCollection, and also a transform matcher to pass the override validation. PTAL.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #12678: [BEAM-10703] Add a step property for shardable states during Dataflow graph translation (Java)

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #12678:
URL: https://github.com/apache/beam/pull/12678#discussion_r476991127



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
##########
@@ -100,8 +99,7 @@ public long getBatchSize() {
         ParDo.of(new GroupIntoBatchesDoFn<>(batchSize, allowedLateness, keyCoder, valueCoder)));
   }
 
-  @VisibleForTesting
-  static class GroupIntoBatchesDoFn<K, InputT>
+  public static class GroupIntoBatchesDoFn<K, InputT>

Review comment:
       I was thinking about we could check `GroupIntoBatches` directly but it turns out that the translation happens after transform expansions, where we only can get `ParDo(GroupIntoBatchesDoFn)`. One thing we can do to avoid exposing `GroupIntoBatchesDoFn` is to make `DoFnSignature` understand that this is a `shardable stateful DoFn`, like what we do for `RESTRICTION_ENCODING`.  @lukecwik Do you have any suggestion here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] nehsyc edited a comment on pull request #12678: [BEAM-10703] Add a step property for shardable states during Dataflow graph translation (Java)

Posted by GitBox <gi...@apache.org>.
nehsyc edited a comment on pull request #12678:
URL: https://github.com/apache/beam/pull/12678#issuecomment-679680154


   Please advice if there is a better way than simply matching the DoFn with GroupIntoBatchesFn. This should not be merged until the backend can recognize the new property. But would like to send it out anyway to collect early feedback.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] nehsyc commented on pull request #12678: [BEAM-10703] Add a step property for shardable states during Dataflow graph translation (Java)

Posted by GitBox <gi...@apache.org>.
nehsyc commented on pull request #12678:
URL: https://github.com/apache/beam/pull/12678#issuecomment-679680154


   Please advice if there is a better way than simply matching the DoFn with GroupIntoBatchesFn. This should not be merged before the backend can recognize the option. But would like to collect early feedback.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] nehsyc commented on a change in pull request #12678: [BEAM-10703] Add a step property for shardable states during Dataflow graph translation (Java)

Posted by GitBox <gi...@apache.org>.
nehsyc commented on a change in pull request #12678:
URL: https://github.com/apache/beam/pull/12678#discussion_r476839321



##########
File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
##########
@@ -1264,6 +1268,10 @@ private static void translateFn(
     // in streaming but does not work in batch
     if (context.getPipelineOptions().isStreaming() && isStateful) {
       stepContext.addInput(PropertyNames.USES_KEYED_STATE, "true");

Review comment:
       As a first step, it's only supported by streaming engine. But it's fine to set it in both cases since the field will be ignored in appliance. We will need to wait for the backend to recognize this property anyway, otherwise the pipeline will check fail.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #12678: [BEAM-10703] Add a step property for shardable states during Dataflow graph translation (Java)

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #12678:
URL: https://github.com/apache/beam/pull/12678#discussion_r476862148



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
##########
@@ -100,8 +99,7 @@ public long getBatchSize() {
         ParDo.of(new GroupIntoBatchesDoFn<>(batchSize, allowedLateness, keyCoder, valueCoder)));
   }
 
-  @VisibleForTesting
-  static class GroupIntoBatchesDoFn<K, InputT>
+  public static class GroupIntoBatchesDoFn<K, InputT>

Review comment:
       Have you tried `registerTransformTranslator `?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org