You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2020/08/03 18:12:00 UTC

[jira] [Work logged] (BEAM-10341) Support drain in SDF

     [ https://issues.apache.org/jira/browse/BEAM-10341?focusedWorklogId=465849&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-465849 ]

ASF GitHub Bot logged work on BEAM-10341:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 03/Aug/20 18:11
            Start Date: 03/Aug/20 18:11
    Worklog Time Spent: 10m 
      Work Description: lukecwik commented on a change in pull request #12371:
URL: https://github.com/apache/beam/pull/12371#discussion_r464580334



##########
File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/SplittableParDoExpander.java
##########
@@ -214,39 +222,124 @@ public MessageWithComponents getReplacement(
             generateUniqueId(
                 transformId + "/ProcessSizedElementsAndRestrictions",
                 existingComponents::containsTransforms);
-        {
-          PTransform.Builder processSizedElementsAndRestrictions = PTransform.newBuilder();
-          processSizedElementsAndRestrictions.putInputs(mainInputName, splitAndSizeOutId);
-          processSizedElementsAndRestrictions.putAllInputs(sideInputs);
-          processSizedElementsAndRestrictions.putAllOutputs(splittableParDo.getOutputsMap());
-          processSizedElementsAndRestrictions.setUniqueName(
-              generateUniquePCollectonName(
-                  splittableParDo.getUniqueName() + "/ProcessSizedElementsAndRestrictions",
-                  existingComponents));
-          processSizedElementsAndRestrictions.setSpec(
-              FunctionSpec.newBuilder()
-                  .setUrn(
-                      PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN)
-                  .setPayload(splittableParDo.getSpec().getPayload()));
-          processSizedElementsAndRestrictions.setEnvironmentId(splittableParDo.getEnvironmentId());
+        if (!isDrain) {

Review comment:
       Should we able to simplify this more with?
   ```
   PTransform.Builder newCompositeRoot = ... add pair w/ restriction and split and size ...
   String processSizedElementsInputPCollectionId = splitAndSizeOutId;
   if (isDrain) {
     ... add drain transform ...
     newCompositeRoot.add drain transform
     String processSizedElementsInputPCollectionId = truncateAndSizeOutId;
   }
   ... add process sized elements transform ...
   newCompositeRoot.add process sized transform
   ```
   
   I don't think we should need two copies of adding `ProcessSizedElementsAndRestrictions`
   

##########
File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/SplittableParDoExpander.java
##########
@@ -91,13 +91,21 @@ public static TransformReplacement createSizedReplacement() {
    * .
    */
   public static TransformReplacement createTruncateReplacement() {
-    return TruncateReplacement.INSTANCE;
+    return SizedReplacement.DRAIN_INSTANCE;
   }
 
   /** See {@link #createSizedReplacement()} for details. */
   private static class SizedReplacement implements TransformReplacement {
 
     private static final SizedReplacement INSTANCE = new SizedReplacement();
+    private static final SizedReplacement DRAIN_INSTANCE = new SizedReplacement().withDrain();
+
+    private boolean isDrain = false;

Review comment:
       It would be nice if we could make this final by using `@AutoBuilder` allowing for:
   ```suggestion
       private final boolean isDrain;
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 465849)
    Time Spent: 11h 20m  (was: 11h 10m)

> Support drain in SDF
> --------------------
>
>                 Key: BEAM-10341
>                 URL: https://issues.apache.org/jira/browse/BEAM-10341
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-java-harness, sdk-py-harness
>            Reporter: Boyuan Zhang
>            Priority: P2
>          Time Spent: 11h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)