You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/03/17 17:01:33 UTC

[GitHub] [beam] egalpin opened a new pull request #17112: [BEAM-14064] fix es io windowing

egalpin opened a new pull request #17112:
URL: https://github.com/apache/beam/pull/17112


   Ensure ElasticsearchIO#write output maintains input windows.
   
   ------------------------
   
   Supersedes #17097
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on a change in pull request #17112: [BEAM-14064] fix es io windowing

Posted by GitBox <gi...@apache.org>.
je-ik commented on a change in pull request #17112:
URL: https://github.com/apache/beam/pull/17112#discussion_r831886429



##########
File path: sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -2246,27 +2286,43 @@ public static StatefulBatching fromSpec(BulkIO spec) {
         }
 
         return input
-            .apply(ParDo.of(new Reshuffle.AssignShardFn<>(spec.getMaxParallelRequestsPerWindow())))
+            .apply(ParDo.of(new Reshuffle.AssignShardFn<>(spec.getMaxParallelRequests())))
             .apply(groupIntoBatches);
       }
     }
 
     @Override
     public PCollectionTuple expand(PCollection<Document> input) {
       ConnectionConfiguration connectionConfiguration = getConnectionConfiguration();
-
       checkState(connectionConfiguration != null, "withConnectionConfiguration() is required");
 
+      PCollection<Document> docResults;
+      PCollection<Document> globalDocs = input.apply(Window.into(new GlobalWindows()));
+
       if (getUseStatefulBatches()) {
-        return input
-            .apply(StatefulBatching.fromSpec(this))
-            .apply(
-                ParDo.of(new BulkIOStatefulFn(this))
-                    .withOutputTags(Write.SUCCESSFUL_WRITES, TupleTagList.of(Write.FAILED_WRITES)));
+        docResults =
+            globalDocs
+                .apply(StatefulBatching.fromSpec(this))
+                .apply(ParDo.of(new BulkIOStatefulFn(this)));
       } else {
-        return input.apply(
-            ParDo.of(new BulkIOBundleFn(this))
-                .withOutputTags(Write.SUCCESSFUL_WRITES, TupleTagList.of(Write.FAILED_WRITES)));
+        docResults = globalDocs.apply(ParDo.of(new BulkIOBundleFn(this)));
+      }
+
+      return docResults
+          .setWindowingStrategyInternal(input.getWindowingStrategy())
+          .apply(
+              ParDo.of(new ResultFilteringFn())
+                  .withOutputTags(Write.SUCCESSFUL_WRITES, TupleTagList.of(Write.FAILED_WRITES)));
+    }
+
+    private static class ResultFilteringFn extends DoFn<Document, Document> {
+      @ProcessElement
+      public void processElement(@Element Document doc, MultiOutputReceiver out) {
+        if (doc.getHasError()) {
+          out.get(Write.FAILED_WRITES).outputWithTimestamp(doc, doc.getTimestamp());
+        } else {
+          out.get(Write.SUCCESSFUL_WRITES).outputWithTimestamp(doc, doc.getTimestamp());

Review comment:
       Yeah, that is about my doubts as well. Flink does not enforce size of bundle as 1, but instead handles them (optionally) in alignment with checkpoint barriers (and maxBundleSize). Watermarks flow with the data and (for stateless) DoFn I really don't see a way that it could guarantee holding the watermark between bundles.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on a change in pull request #17112: [BEAM-14064] fix es io windowing

Posted by GitBox <gi...@apache.org>.
je-ik commented on a change in pull request #17112:
URL: https://github.com/apache/beam/pull/17112#discussion_r830875900



##########
File path: sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -2246,27 +2286,43 @@ public static StatefulBatching fromSpec(BulkIO spec) {
         }
 
         return input
-            .apply(ParDo.of(new Reshuffle.AssignShardFn<>(spec.getMaxParallelRequestsPerWindow())))
+            .apply(ParDo.of(new Reshuffle.AssignShardFn<>(spec.getMaxParallelRequests())))
             .apply(groupIntoBatches);
       }
     }
 
     @Override
     public PCollectionTuple expand(PCollection<Document> input) {
       ConnectionConfiguration connectionConfiguration = getConnectionConfiguration();
-
       checkState(connectionConfiguration != null, "withConnectionConfiguration() is required");
 
+      PCollection<Document> docResults;
+      PCollection<Document> globalDocs = input.apply(Window.into(new GlobalWindows()));
+
       if (getUseStatefulBatches()) {
-        return input
-            .apply(StatefulBatching.fromSpec(this))
-            .apply(
-                ParDo.of(new BulkIOStatefulFn(this))
-                    .withOutputTags(Write.SUCCESSFUL_WRITES, TupleTagList.of(Write.FAILED_WRITES)));
+        docResults =
+            globalDocs
+                .apply(StatefulBatching.fromSpec(this))
+                .apply(ParDo.of(new BulkIOStatefulFn(this)));
       } else {
-        return input.apply(
-            ParDo.of(new BulkIOBundleFn(this))
-                .withOutputTags(Write.SUCCESSFUL_WRITES, TupleTagList.of(Write.FAILED_WRITES)));
+        docResults = globalDocs.apply(ParDo.of(new BulkIOBundleFn(this)));
+      }
+
+      return docResults
+          .setWindowingStrategyInternal(input.getWindowingStrategy())
+          .apply(
+              ParDo.of(new ResultFilteringFn())
+                  .withOutputTags(Write.SUCCESSFUL_WRITES, TupleTagList.of(Write.FAILED_WRITES)));
+    }
+
+    private static class ResultFilteringFn extends DoFn<Document, Document> {
+      @ProcessElement
+      public void processElement(@Element Document doc, MultiOutputReceiver out) {
+        if (doc.getHasError()) {
+          out.get(Write.FAILED_WRITES).outputWithTimestamp(doc, doc.getTimestamp());
+        } else {
+          out.get(Write.SUCCESSFUL_WRITES).outputWithTimestamp(doc, doc.getTimestamp());

Review comment:
       Looking at the code in `SimpleDoFnRunner` I think that:
    a) `DoFnProcessContext.outputWithTimestamp` results in call to `checkTimestamp` (https://github.com/apache/beam/blob/096aeeff833679af3068eea2f00b280a589256e8/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L409)
    b) the check will throw exception if the output timestamp is before the current element's timestamp (https://github.com/apache/beam/blob/096aeeff833679af3068eea2f00b280a589256e8/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L444)
    
   I *think* you can fix that by overriding `getAllowedTimestampSkew()` (returning some large value), and *provided* that - as you write - output watermark is held between bundles, then this should be correct. But I actually really doubt it is the case, I don't know how flink runner for instance would ensure that, bundle elements and watermarks are completely interleaved in this particular runner, because the runner does not have a "natural" concept of a bundle.
   
   @robertwb maybe you could give some more insights?
   
   > Are there testing utilities that can synthesize a bundle to test this?
   
   You could probably use `TestStream` to verify the behavior on DirectRunner. I _believe_, that all elements that out add using `addElements` will end up in the same bundle.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on a change in pull request #17112: [BEAM-14064] fix es io windowing

Posted by GitBox <gi...@apache.org>.
je-ik commented on a change in pull request #17112:
URL: https://github.com/apache/beam/pull/17112#discussion_r829808452



##########
File path: sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -2246,27 +2286,43 @@ public static StatefulBatching fromSpec(BulkIO spec) {
         }
 
         return input
-            .apply(ParDo.of(new Reshuffle.AssignShardFn<>(spec.getMaxParallelRequestsPerWindow())))
+            .apply(ParDo.of(new Reshuffle.AssignShardFn<>(spec.getMaxParallelRequests())))
             .apply(groupIntoBatches);
       }
     }
 
     @Override
     public PCollectionTuple expand(PCollection<Document> input) {
       ConnectionConfiguration connectionConfiguration = getConnectionConfiguration();
-
       checkState(connectionConfiguration != null, "withConnectionConfiguration() is required");
 
+      PCollection<Document> docResults;
+      PCollection<Document> globalDocs = input.apply(Window.into(new GlobalWindows()));
+
       if (getUseStatefulBatches()) {
-        return input
-            .apply(StatefulBatching.fromSpec(this))
-            .apply(
-                ParDo.of(new BulkIOStatefulFn(this))
-                    .withOutputTags(Write.SUCCESSFUL_WRITES, TupleTagList.of(Write.FAILED_WRITES)));
+        docResults =
+            globalDocs
+                .apply(StatefulBatching.fromSpec(this))
+                .apply(ParDo.of(new BulkIOStatefulFn(this)));
       } else {
-        return input.apply(
-            ParDo.of(new BulkIOBundleFn(this))
-                .withOutputTags(Write.SUCCESSFUL_WRITES, TupleTagList.of(Write.FAILED_WRITES)));
+        docResults = globalDocs.apply(ParDo.of(new BulkIOBundleFn(this)));
+      }
+
+      return docResults
+          .setWindowingStrategyInternal(input.getWindowingStrategy())
+          .apply(
+              ParDo.of(new ResultFilteringFn())
+                  .withOutputTags(Write.SUCCESSFUL_WRITES, TupleTagList.of(Write.FAILED_WRITES)));
+    }
+
+    private static class ResultFilteringFn extends DoFn<Document, Document> {
+      @ProcessElement
+      public void processElement(@Element Document doc, MultiOutputReceiver out) {
+        if (doc.getHasError()) {
+          out.get(Write.FAILED_WRITES).outputWithTimestamp(doc, doc.getTimestamp());
+        } else {
+          out.get(Write.SUCCESSFUL_WRITES).outputWithTimestamp(doc, doc.getTimestamp());

Review comment:
       This looks like we still have the same issue with "outputWithTimestamp being before the current element timestamp", no? This can be fixed using the deprecated `getAllowedTimestampSkew`, but essentially it demonstrates the same deficiency.
   
   Outputting with timestamp that might be (at least in theory) arbitrarily delayed after output watermark means that if user does a stateful (e.g. GBK) transform, some elements might get randomly dropped (although have been actually written).
   
   I think we could work-around modifying the ElasticsearchIO transform, so that if the statful batching is not used, we return PDone instead. That way user cannot rely on any guarantees of what has been written. Or would have to ensure nothing gets dropped (setting allowedLateness to some high value, but that looks like highly suspicious side-effect of the transform).
   
   I don't know what is correct solution here, it seems to me, that the root cause is the logical gap in stateless DoFn and how we work with watermarks there.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] egalpin commented on pull request #17112: [BEAM-14064] fix es io windowing

Posted by GitBox <gi...@apache.org>.
egalpin commented on pull request #17112:
URL: https://github.com/apache/beam/pull/17112#issuecomment-1071331739


   R: @je-ik @echauchot 
   
   This PR is an alternative implementation from https://github.com/apache/beam/pull/17097 which maintains bundle-based batching while also ensuring windows are maintained across ElatichsearchIO#write.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] egalpin commented on a change in pull request #17112: [BEAM-14064] fix es io windowing

Posted by GitBox <gi...@apache.org>.
egalpin commented on a change in pull request #17112:
URL: https://github.com/apache/beam/pull/17112#discussion_r830232322



##########
File path: sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -2246,27 +2286,43 @@ public static StatefulBatching fromSpec(BulkIO spec) {
         }
 
         return input
-            .apply(ParDo.of(new Reshuffle.AssignShardFn<>(spec.getMaxParallelRequestsPerWindow())))
+            .apply(ParDo.of(new Reshuffle.AssignShardFn<>(spec.getMaxParallelRequests())))
             .apply(groupIntoBatches);
       }
     }
 
     @Override
     public PCollectionTuple expand(PCollection<Document> input) {
       ConnectionConfiguration connectionConfiguration = getConnectionConfiguration();
-
       checkState(connectionConfiguration != null, "withConnectionConfiguration() is required");
 
+      PCollection<Document> docResults;
+      PCollection<Document> globalDocs = input.apply(Window.into(new GlobalWindows()));
+
       if (getUseStatefulBatches()) {
-        return input
-            .apply(StatefulBatching.fromSpec(this))
-            .apply(
-                ParDo.of(new BulkIOStatefulFn(this))
-                    .withOutputTags(Write.SUCCESSFUL_WRITES, TupleTagList.of(Write.FAILED_WRITES)));
+        docResults =
+            globalDocs
+                .apply(StatefulBatching.fromSpec(this))
+                .apply(ParDo.of(new BulkIOStatefulFn(this)));
       } else {
-        return input.apply(
-            ParDo.of(new BulkIOBundleFn(this))
-                .withOutputTags(Write.SUCCESSFUL_WRITES, TupleTagList.of(Write.FAILED_WRITES)));
+        docResults = globalDocs.apply(ParDo.of(new BulkIOBundleFn(this)));
+      }
+
+      return docResults
+          .setWindowingStrategyInternal(input.getWindowingStrategy())
+          .apply(
+              ParDo.of(new ResultFilteringFn())
+                  .withOutputTags(Write.SUCCESSFUL_WRITES, TupleTagList.of(Write.FAILED_WRITES)));
+    }
+
+    private static class ResultFilteringFn extends DoFn<Document, Document> {
+      @ProcessElement
+      public void processElement(@Element Document doc, MultiOutputReceiver out) {
+        if (doc.getHasError()) {
+          out.get(Write.FAILED_WRITES).outputWithTimestamp(doc, doc.getTimestamp());
+        } else {
+          out.get(Write.SUCCESSFUL_WRITES).outputWithTimestamp(doc, doc.getTimestamp());

Review comment:
       You might be completely correct, I definitely don't feel I have a solid grasp just yet, but I'll give my interpretation of the root cause of the error that prompted this change and my understanding of how this PR might solve it.  I'm also operating under the assumption that the watermark is not advanced between elements of a single bundle, but instead advanced between bundles.
   
   My understanding of the root cause is that it stems from the situation where there are elements in a bundle which belong to different windows.  This can even happen when using GIB because a bundle could have 2 elements where each element is a batch output from GIB.  Entities from each element in the bundle are then buffered.  If there were fewer entities in the bundle element than the max allowable number that can be buffered, the `@ProcessElement` completes with no output.  Then, another element from the bundle begins processing in `@ProcessElement`, this time with an element belonging to a window "ahead" in time than the window of the prior element.  Now any buffered entities cannot be output using `outputWithTimestamp` because the buffered entities have timestamps which are invalid for the window to which the element currently being processed by `@ProcessElement` belongs i.e. in the original Errors, the timestamps are not invalid for the pipeline with respect to the watermark, 
 but are invalid when a different data from one window is output using another arbitrary window.
   
   (this is my understanding, not necessarily fact/reality).
   
   This PR might fix that issue by removing the error of outputting with invalid timestamps.  _If_ the watermark is held until an entire bundle is processed, then the existing error is not arising because the timestamps are invalid compared to the watermark, but strictly due to the window context from which they are attempting to be output.  By rewindowing into the GlobalWindow before all other processing, the Window context should always be the GlobalWindow for all elements in a bundle.  Then when we output with timestamp, the timestamps should no longer fail validation because they would be within the allowable bounds of [watermark, GlobalWindow.MAX_TIMESTAMP).  Then we re-window back into the original/input windowing strategy before putting dealing with everything in the global window.
   
   Thoughts? Please correct above assumptions where I have things wrong.  CC @lukecwik as they deserve credit for the idea of reifying timestamps and global windowing -> rewindowing as a potential solution.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] egalpin commented on a change in pull request #17112: [BEAM-14064] fix es io windowing

Posted by GitBox <gi...@apache.org>.
egalpin commented on a change in pull request #17112:
URL: https://github.com/apache/beam/pull/17112#discussion_r831402503



##########
File path: sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -2246,27 +2286,43 @@ public static StatefulBatching fromSpec(BulkIO spec) {
         }
 
         return input
-            .apply(ParDo.of(new Reshuffle.AssignShardFn<>(spec.getMaxParallelRequestsPerWindow())))
+            .apply(ParDo.of(new Reshuffle.AssignShardFn<>(spec.getMaxParallelRequests())))
             .apply(groupIntoBatches);
       }
     }
 
     @Override
     public PCollectionTuple expand(PCollection<Document> input) {
       ConnectionConfiguration connectionConfiguration = getConnectionConfiguration();
-
       checkState(connectionConfiguration != null, "withConnectionConfiguration() is required");
 
+      PCollection<Document> docResults;
+      PCollection<Document> globalDocs = input.apply(Window.into(new GlobalWindows()));
+
       if (getUseStatefulBatches()) {
-        return input
-            .apply(StatefulBatching.fromSpec(this))
-            .apply(
-                ParDo.of(new BulkIOStatefulFn(this))
-                    .withOutputTags(Write.SUCCESSFUL_WRITES, TupleTagList.of(Write.FAILED_WRITES)));
+        docResults =
+            globalDocs
+                .apply(StatefulBatching.fromSpec(this))
+                .apply(ParDo.of(new BulkIOStatefulFn(this)));
       } else {
-        return input.apply(
-            ParDo.of(new BulkIOBundleFn(this))
-                .withOutputTags(Write.SUCCESSFUL_WRITES, TupleTagList.of(Write.FAILED_WRITES)));
+        docResults = globalDocs.apply(ParDo.of(new BulkIOBundleFn(this)));
+      }
+
+      return docResults
+          .setWindowingStrategyInternal(input.getWindowingStrategy())
+          .apply(
+              ParDo.of(new ResultFilteringFn())
+                  .withOutputTags(Write.SUCCESSFUL_WRITES, TupleTagList.of(Write.FAILED_WRITES)));
+    }
+
+    private static class ResultFilteringFn extends DoFn<Document, Document> {
+      @ProcessElement
+      public void processElement(@Element Document doc, MultiOutputReceiver out) {
+        if (doc.getHasError()) {
+          out.get(Write.FAILED_WRITES).outputWithTimestamp(doc, doc.getTimestamp());
+        } else {
+          out.get(Write.SUCCESSFUL_WRITES).outputWithTimestamp(doc, doc.getTimestamp());

Review comment:
       I'll look into creating a test which fails the previous implementation with the same error as reported in BEAM-14064 and then see if this change set can make it pass. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] egalpin commented on a change in pull request #17112: [BEAM-14064] fix es io windowing

Posted by GitBox <gi...@apache.org>.
egalpin commented on a change in pull request #17112:
URL: https://github.com/apache/beam/pull/17112#discussion_r831401872



##########
File path: sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -2246,27 +2286,43 @@ public static StatefulBatching fromSpec(BulkIO spec) {
         }
 
         return input
-            .apply(ParDo.of(new Reshuffle.AssignShardFn<>(spec.getMaxParallelRequestsPerWindow())))
+            .apply(ParDo.of(new Reshuffle.AssignShardFn<>(spec.getMaxParallelRequests())))
             .apply(groupIntoBatches);
       }
     }
 
     @Override
     public PCollectionTuple expand(PCollection<Document> input) {
       ConnectionConfiguration connectionConfiguration = getConnectionConfiguration();
-
       checkState(connectionConfiguration != null, "withConnectionConfiguration() is required");
 
+      PCollection<Document> docResults;
+      PCollection<Document> globalDocs = input.apply(Window.into(new GlobalWindows()));
+
       if (getUseStatefulBatches()) {
-        return input
-            .apply(StatefulBatching.fromSpec(this))
-            .apply(
-                ParDo.of(new BulkIOStatefulFn(this))
-                    .withOutputTags(Write.SUCCESSFUL_WRITES, TupleTagList.of(Write.FAILED_WRITES)));
+        docResults =
+            globalDocs
+                .apply(StatefulBatching.fromSpec(this))
+                .apply(ParDo.of(new BulkIOStatefulFn(this)));
       } else {
-        return input.apply(
-            ParDo.of(new BulkIOBundleFn(this))
-                .withOutputTags(Write.SUCCESSFUL_WRITES, TupleTagList.of(Write.FAILED_WRITES)));
+        docResults = globalDocs.apply(ParDo.of(new BulkIOBundleFn(this)));
+      }
+
+      return docResults
+          .setWindowingStrategyInternal(input.getWindowingStrategy())
+          .apply(
+              ParDo.of(new ResultFilteringFn())
+                  .withOutputTags(Write.SUCCESSFUL_WRITES, TupleTagList.of(Write.FAILED_WRITES)));
+    }
+
+    private static class ResultFilteringFn extends DoFn<Document, Document> {
+      @ProcessElement
+      public void processElement(@Element Document doc, MultiOutputReceiver out) {
+        if (doc.getHasError()) {
+          out.get(Write.FAILED_WRITES).outputWithTimestamp(doc, doc.getTimestamp());
+        } else {
+          out.get(Write.SUCCESSFUL_WRITES).outputWithTimestamp(doc, doc.getTimestamp());

Review comment:
       @je-ik thanks for the follow up, I really appreciate your time and input as I want to make the best fix possible here.  Agreed that I would need to add something to ResultFilterFn like:
   
   ```java
         @Override
         public Duration getAllowedTimestampSkew() {
           return Duration.millis(Long.MAX_VALUE);
         }
   ```
   
   But I still wonder, as you point out, whether or not watermarks are updated during the processing of, or only between the processing of, bundles.  In the case of a runner like Flink with no notion of bundles, or in an imaginary runner where bundles were always of size 1, I suppose maybe the logic still holds true? Only after every bundle, i.e. after every element, has been processed can the watermark be updated.  ElasticsearchIO would be ok in that case as well, because `@FinishBundle` would output all elements before processing the element was done which would also imply being output before the watermark was updated?  Thinking out loud here...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on a change in pull request #17112: [BEAM-14064] fix es io windowing

Posted by GitBox <gi...@apache.org>.
je-ik commented on a change in pull request #17112:
URL: https://github.com/apache/beam/pull/17112#discussion_r834096613



##########
File path: sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -2246,27 +2286,43 @@ public static StatefulBatching fromSpec(BulkIO spec) {
         }
 
         return input
-            .apply(ParDo.of(new Reshuffle.AssignShardFn<>(spec.getMaxParallelRequestsPerWindow())))
+            .apply(ParDo.of(new Reshuffle.AssignShardFn<>(spec.getMaxParallelRequests())))
             .apply(groupIntoBatches);
       }
     }
 
     @Override
     public PCollectionTuple expand(PCollection<Document> input) {
       ConnectionConfiguration connectionConfiguration = getConnectionConfiguration();
-
       checkState(connectionConfiguration != null, "withConnectionConfiguration() is required");
 
+      PCollection<Document> docResults;
+      PCollection<Document> globalDocs = input.apply(Window.into(new GlobalWindows()));
+
       if (getUseStatefulBatches()) {
-        return input
-            .apply(StatefulBatching.fromSpec(this))
-            .apply(
-                ParDo.of(new BulkIOStatefulFn(this))
-                    .withOutputTags(Write.SUCCESSFUL_WRITES, TupleTagList.of(Write.FAILED_WRITES)));
+        docResults =
+            globalDocs
+                .apply(StatefulBatching.fromSpec(this))
+                .apply(ParDo.of(new BulkIOStatefulFn(this)));
       } else {
-        return input.apply(
-            ParDo.of(new BulkIOBundleFn(this))
-                .withOutputTags(Write.SUCCESSFUL_WRITES, TupleTagList.of(Write.FAILED_WRITES)));
+        docResults = globalDocs.apply(ParDo.of(new BulkIOBundleFn(this)));
+      }
+
+      return docResults
+          .setWindowingStrategyInternal(input.getWindowingStrategy())
+          .apply(
+              ParDo.of(new ResultFilteringFn())
+                  .withOutputTags(Write.SUCCESSFUL_WRITES, TupleTagList.of(Write.FAILED_WRITES)));
+    }
+
+    private static class ResultFilteringFn extends DoFn<Document, Document> {
+      @ProcessElement
+      public void processElement(@Element Document doc, MultiOutputReceiver out) {
+        if (doc.getHasError()) {
+          out.get(Write.FAILED_WRITES).outputWithTimestamp(doc, doc.getTimestamp());
+        } else {
+          out.get(Write.SUCCESSFUL_WRITES).outputWithTimestamp(doc, doc.getTimestamp());

Review comment:
       I created a thread for this: https://lists.apache.org/thread/10db7l9bhnhmo484myps723sfxtjwwmv




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on pull request #17112: [BEAM-14064] fix es io windowing

Posted by GitBox <gi...@apache.org>.
je-ik commented on pull request #17112:
URL: https://github.com/apache/beam/pull/17112#issuecomment-1081568512


   Based on the discussion https://lists.apache.org/thread/10db7l9bhnhmo484myps723sfxtjwwmv I think that if we add the `getAllowedTimestampSkew()` call we can safely merge this. There will be some fix needed in FlinkRunner (and a \@ValidatesRunner test). I created https://issues.apache.org/jira/browse/BEAM-14196 to track this and will look into that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] commented on pull request #17112: [BEAM-14064] fix es io windowing

Posted by GitBox <gi...@apache.org>.
codecov[bot] commented on pull request #17112:
URL: https://github.com/apache/beam/pull/17112#issuecomment-1073837956


   # [Codecov](https://codecov.io/gh/apache/beam/pull/17112?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#17112](https://codecov.io/gh/apache/beam/pull/17112?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (096aeef) into [master](https://codecov.io/gh/apache/beam/commit/d08cef67418b351db15b355b59785a37cda63fef?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (d08cef6) will **increase** coverage by `0.01%`.
   > The diff coverage is `70.58%`.
   
   > :exclamation: Current head 096aeef differs from pull request most recent head e318f7d. Consider uploading reports for the commit e318f7d to get more accurate results
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #17112      +/-   ##
   ==========================================
   + Coverage   73.95%   73.97%   +0.01%     
   ==========================================
     Files         669      671       +2     
     Lines       88159    88227      +68     
   ==========================================
   + Hits        65201    65267      +66     
   - Misses      21846    21848       +2     
     Partials     1112     1112              
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | go | `49.45% <100.00%> (+0.10%)` | :arrow_up: |
   | python | `83.66% <23.07%> (+0.01%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/17112?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [sdks/go/pkg/beam/core/util/reflectx/tags.go](https://codecov.io/gh/apache/beam/pull/17112/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9jb3JlL3V0aWwvcmVmbGVjdHgvdGFncy5nbw==) | `0.00% <ø> (ø)` | |
   | [sdks/go/pkg/beam/forward.go](https://codecov.io/gh/apache/beam/pull/17112/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9mb3J3YXJkLmdv) | `40.00% <ø> (ø)` | |
   | [sdks/python/apache\_beam/io/gcp/bigquery.py](https://codecov.io/gh/apache/beam/pull/17112/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5LnB5) | `63.70% <18.18%> (+0.07%)` | :arrow_up: |
   | [...apache\_beam/runners/dataflow/internal/apiclient.py](https://codecov.io/gh/apache/beam/pull/17112/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kYXRhZmxvdy9pbnRlcm5hbC9hcGljbGllbnQucHk=) | `77.27% <50.00%> (-0.13%)` | :arrow_down: |
   | [sdks/go/pkg/beam/core/runtime/pipelinex/replace.go](https://codecov.io/gh/apache/beam/pull/17112/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9jb3JlL3J1bnRpbWUvcGlwZWxpbmV4L3JlcGxhY2UuZ28=) | `66.66% <100.00%> (+0.71%)` | :arrow_up: |
   | [sdks/go/pkg/beam/options/jobopts/options.go](https://codecov.io/gh/apache/beam/pull/17112/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9vcHRpb25zL2pvYm9wdHMvb3B0aW9ucy5nbw==) | `87.27% <100.00%> (ø)` | |
   | [...hon/apache\_beam/runners/worker/bundle\_processor.py](https://codecov.io/gh/apache/beam/pull/17112/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvYnVuZGxlX3Byb2Nlc3Nvci5weQ==) | `93.64% <0.00%> (ø)` | |
   | [sdks/go/pkg/beam/options/jobopts/stringSlice.go](https://codecov.io/gh/apache/beam/pull/17112/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9vcHRpb25zL2pvYm9wdHMvc3RyaW5nU2xpY2UuZ28=) | `71.42% <0.00%> (ø)` | |
   | ... and [4 more](https://codecov.io/gh/apache/beam/pull/17112/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/17112?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/17112?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [d08cef6...e318f7d](https://codecov.io/gh/apache/beam/pull/17112?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on a change in pull request #17112: [BEAM-14064] fix es io windowing

Posted by GitBox <gi...@apache.org>.
je-ik commented on a change in pull request #17112:
URL: https://github.com/apache/beam/pull/17112#discussion_r834063058



##########
File path: sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -2246,27 +2286,43 @@ public static StatefulBatching fromSpec(BulkIO spec) {
         }
 
         return input
-            .apply(ParDo.of(new Reshuffle.AssignShardFn<>(spec.getMaxParallelRequestsPerWindow())))
+            .apply(ParDo.of(new Reshuffle.AssignShardFn<>(spec.getMaxParallelRequests())))
             .apply(groupIntoBatches);
       }
     }
 
     @Override
     public PCollectionTuple expand(PCollection<Document> input) {
       ConnectionConfiguration connectionConfiguration = getConnectionConfiguration();
-
       checkState(connectionConfiguration != null, "withConnectionConfiguration() is required");
 
+      PCollection<Document> docResults;
+      PCollection<Document> globalDocs = input.apply(Window.into(new GlobalWindows()));
+
       if (getUseStatefulBatches()) {
-        return input
-            .apply(StatefulBatching.fromSpec(this))
-            .apply(
-                ParDo.of(new BulkIOStatefulFn(this))
-                    .withOutputTags(Write.SUCCESSFUL_WRITES, TupleTagList.of(Write.FAILED_WRITES)));
+        docResults =
+            globalDocs
+                .apply(StatefulBatching.fromSpec(this))
+                .apply(ParDo.of(new BulkIOStatefulFn(this)));
       } else {
-        return input.apply(
-            ParDo.of(new BulkIOBundleFn(this))
-                .withOutputTags(Write.SUCCESSFUL_WRITES, TupleTagList.of(Write.FAILED_WRITES)));
+        docResults = globalDocs.apply(ParDo.of(new BulkIOBundleFn(this)));
+      }
+
+      return docResults
+          .setWindowingStrategyInternal(input.getWindowingStrategy())
+          .apply(
+              ParDo.of(new ResultFilteringFn())
+                  .withOutputTags(Write.SUCCESSFUL_WRITES, TupleTagList.of(Write.FAILED_WRITES)));
+    }
+
+    private static class ResultFilteringFn extends DoFn<Document, Document> {
+      @ProcessElement
+      public void processElement(@Element Document doc, MultiOutputReceiver out) {
+        if (doc.getHasError()) {
+          out.get(Write.FAILED_WRITES).outputWithTimestamp(doc, doc.getTimestamp());
+        } else {
+          out.get(Write.SUCCESSFUL_WRITES).outputWithTimestamp(doc, doc.getTimestamp());

Review comment:
       I think we are going in cycles here. :) Maybe we could start a new dev@ thread to help clarifying things out. FlinkRunner definitely does not have bundles of size 1 (that is what would mean to call @StartBundle and @FinishBundle) before/after each element. That would make all "in-memory" batching void on such runner.
   I think it implies one of two possibilities:
    a) either the watermark update only on boundaries of bundles is *not* part of the model, or
    b) FlinkRunner does not adhere to this model




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] egalpin commented on a change in pull request #17112: [BEAM-14064] fix es io windowing

Posted by GitBox <gi...@apache.org>.
egalpin commented on a change in pull request #17112:
URL: https://github.com/apache/beam/pull/17112#discussion_r833670820



##########
File path: sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -2246,27 +2286,43 @@ public static StatefulBatching fromSpec(BulkIO spec) {
         }
 
         return input
-            .apply(ParDo.of(new Reshuffle.AssignShardFn<>(spec.getMaxParallelRequestsPerWindow())))
+            .apply(ParDo.of(new Reshuffle.AssignShardFn<>(spec.getMaxParallelRequests())))
             .apply(groupIntoBatches);
       }
     }
 
     @Override
     public PCollectionTuple expand(PCollection<Document> input) {
       ConnectionConfiguration connectionConfiguration = getConnectionConfiguration();
-
       checkState(connectionConfiguration != null, "withConnectionConfiguration() is required");
 
+      PCollection<Document> docResults;
+      PCollection<Document> globalDocs = input.apply(Window.into(new GlobalWindows()));
+
       if (getUseStatefulBatches()) {
-        return input
-            .apply(StatefulBatching.fromSpec(this))
-            .apply(
-                ParDo.of(new BulkIOStatefulFn(this))
-                    .withOutputTags(Write.SUCCESSFUL_WRITES, TupleTagList.of(Write.FAILED_WRITES)));
+        docResults =
+            globalDocs
+                .apply(StatefulBatching.fromSpec(this))
+                .apply(ParDo.of(new BulkIOStatefulFn(this)));
       } else {
-        return input.apply(
-            ParDo.of(new BulkIOBundleFn(this))
-                .withOutputTags(Write.SUCCESSFUL_WRITES, TupleTagList.of(Write.FAILED_WRITES)));
+        docResults = globalDocs.apply(ParDo.of(new BulkIOBundleFn(this)));
+      }
+
+      return docResults
+          .setWindowingStrategyInternal(input.getWindowingStrategy())
+          .apply(
+              ParDo.of(new ResultFilteringFn())
+                  .withOutputTags(Write.SUCCESSFUL_WRITES, TupleTagList.of(Write.FAILED_WRITES)));
+    }
+
+    private static class ResultFilteringFn extends DoFn<Document, Document> {
+      @ProcessElement
+      public void processElement(@Element Document doc, MultiOutputReceiver out) {
+        if (doc.getHasError()) {
+          out.get(Write.FAILED_WRITES).outputWithTimestamp(doc, doc.getTimestamp());
+        } else {
+          out.get(Write.SUCCESSFUL_WRITES).outputWithTimestamp(doc, doc.getTimestamp());

Review comment:
       @je-ik What I meant to convey was that I think the solution as laid out in this PR may be resilient to runners where the watermark is updated after every element _iff_ we assume that this implies calling `@StartBundle` and `@FinishBundle` for each element (i.e. as if bundle size was enforced to be always 1).  Such an environment would cause all buffered elements to be flushed (via `@FinishBundle`) in the context of the window from which the elements originated before the watermark was updated.
   
   It sounds like this notion is supported based on comments in the related dev@ thread[1].
   
   [1] https://lists.apache.org/thread/7foy455spg43xo77zhrs62gc1m383t50 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] egalpin commented on pull request #17112: [BEAM-14064] fix es io windowing

Posted by GitBox <gi...@apache.org>.
egalpin commented on pull request #17112:
URL: https://github.com/apache/beam/pull/17112#issuecomment-1071331739


   R: @je-ik @echauchot 
   
   This PR is an alternative implementation from https://github.com/apache/beam/pull/17097 which maintains bundle-based batching while also ensuring windows are maintained across ElatichsearchIO#write.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #17112: [BEAM-14064] fix es io windowing

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #17112:
URL: https://github.com/apache/beam/pull/17112#issuecomment-1073837956


   # [Codecov](https://codecov.io/gh/apache/beam/pull/17112?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#17112](https://codecov.io/gh/apache/beam/pull/17112?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (096aeef) into [master](https://codecov.io/gh/apache/beam/commit/d08cef67418b351db15b355b59785a37cda63fef?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (d08cef6) will **increase** coverage by `0.01%`.
   > The diff coverage is `70.58%`.
   
   > :exclamation: Current head 096aeef differs from pull request most recent head c080e7f. Consider uploading reports for the commit c080e7f to get more accurate results
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #17112      +/-   ##
   ==========================================
   + Coverage   73.95%   73.97%   +0.01%     
   ==========================================
     Files         669      671       +2     
     Lines       88159    88227      +68     
   ==========================================
   + Hits        65201    65267      +66     
   - Misses      21846    21848       +2     
     Partials     1112     1112              
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | go | `49.45% <100.00%> (+0.10%)` | :arrow_up: |
   | python | `83.66% <23.07%> (+0.01%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/17112?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [sdks/go/pkg/beam/core/util/reflectx/tags.go](https://codecov.io/gh/apache/beam/pull/17112/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9jb3JlL3V0aWwvcmVmbGVjdHgvdGFncy5nbw==) | `0.00% <ø> (ø)` | |
   | [sdks/go/pkg/beam/forward.go](https://codecov.io/gh/apache/beam/pull/17112/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9mb3J3YXJkLmdv) | `40.00% <ø> (ø)` | |
   | [sdks/python/apache\_beam/io/gcp/bigquery.py](https://codecov.io/gh/apache/beam/pull/17112/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5LnB5) | `63.70% <18.18%> (+0.07%)` | :arrow_up: |
   | [...apache\_beam/runners/dataflow/internal/apiclient.py](https://codecov.io/gh/apache/beam/pull/17112/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kYXRhZmxvdy9pbnRlcm5hbC9hcGljbGllbnQucHk=) | `77.27% <50.00%> (-0.13%)` | :arrow_down: |
   | [sdks/go/pkg/beam/core/runtime/pipelinex/replace.go](https://codecov.io/gh/apache/beam/pull/17112/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9jb3JlL3J1bnRpbWUvcGlwZWxpbmV4L3JlcGxhY2UuZ28=) | `66.66% <100.00%> (+0.71%)` | :arrow_up: |
   | [sdks/go/pkg/beam/options/jobopts/options.go](https://codecov.io/gh/apache/beam/pull/17112/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9vcHRpb25zL2pvYm9wdHMvb3B0aW9ucy5nbw==) | `87.27% <100.00%> (ø)` | |
   | [...hon/apache\_beam/runners/worker/bundle\_processor.py](https://codecov.io/gh/apache/beam/pull/17112/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvYnVuZGxlX3Byb2Nlc3Nvci5weQ==) | `93.64% <0.00%> (ø)` | |
   | [sdks/go/pkg/beam/options/jobopts/stringSlice.go](https://codecov.io/gh/apache/beam/pull/17112/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9vcHRpb25zL2pvYm9wdHMvc3RyaW5nU2xpY2UuZ28=) | `71.42% <0.00%> (ø)` | |
   | ... and [4 more](https://codecov.io/gh/apache/beam/pull/17112/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/17112?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/17112?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [d08cef6...c080e7f](https://codecov.io/gh/apache/beam/pull/17112?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on a change in pull request #17112: [BEAM-14064] fix es io windowing

Posted by GitBox <gi...@apache.org>.
je-ik commented on a change in pull request #17112:
URL: https://github.com/apache/beam/pull/17112#discussion_r829808452



##########
File path: sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -2246,27 +2286,43 @@ public static StatefulBatching fromSpec(BulkIO spec) {
         }
 
         return input
-            .apply(ParDo.of(new Reshuffle.AssignShardFn<>(spec.getMaxParallelRequestsPerWindow())))
+            .apply(ParDo.of(new Reshuffle.AssignShardFn<>(spec.getMaxParallelRequests())))
             .apply(groupIntoBatches);
       }
     }
 
     @Override
     public PCollectionTuple expand(PCollection<Document> input) {
       ConnectionConfiguration connectionConfiguration = getConnectionConfiguration();
-
       checkState(connectionConfiguration != null, "withConnectionConfiguration() is required");
 
+      PCollection<Document> docResults;
+      PCollection<Document> globalDocs = input.apply(Window.into(new GlobalWindows()));
+
       if (getUseStatefulBatches()) {
-        return input
-            .apply(StatefulBatching.fromSpec(this))
-            .apply(
-                ParDo.of(new BulkIOStatefulFn(this))
-                    .withOutputTags(Write.SUCCESSFUL_WRITES, TupleTagList.of(Write.FAILED_WRITES)));
+        docResults =
+            globalDocs
+                .apply(StatefulBatching.fromSpec(this))
+                .apply(ParDo.of(new BulkIOStatefulFn(this)));
       } else {
-        return input.apply(
-            ParDo.of(new BulkIOBundleFn(this))
-                .withOutputTags(Write.SUCCESSFUL_WRITES, TupleTagList.of(Write.FAILED_WRITES)));
+        docResults = globalDocs.apply(ParDo.of(new BulkIOBundleFn(this)));
+      }
+
+      return docResults
+          .setWindowingStrategyInternal(input.getWindowingStrategy())
+          .apply(
+              ParDo.of(new ResultFilteringFn())
+                  .withOutputTags(Write.SUCCESSFUL_WRITES, TupleTagList.of(Write.FAILED_WRITES)));
+    }
+
+    private static class ResultFilteringFn extends DoFn<Document, Document> {
+      @ProcessElement
+      public void processElement(@Element Document doc, MultiOutputReceiver out) {
+        if (doc.getHasError()) {
+          out.get(Write.FAILED_WRITES).outputWithTimestamp(doc, doc.getTimestamp());
+        } else {
+          out.get(Write.SUCCESSFUL_WRITES).outputWithTimestamp(doc, doc.getTimestamp());

Review comment:
       This looks like we still have the same issue with "outputWithTimestamp being before the current element timestamp", no? This can be fixed using the deprecated `getAllowedTimestampSkew`, but essentially it demonstrates the same deficiency.
   
   Outputting with timestamp that might be (at least in theory) arbitrarily delayed after output watermark means that if user does a stateful (e.g. GBK) transform, some elements might get randomly dropped (although have been actually written).
   
   I think we could work-around modifying the ElasticsearchIO transform, so that if the statful batching is not used, we return PDone instead. That way user cannot rely on any guarantees of what has been written. Or would have to ensure nothing gets dropped (setting allowedLateness to some high value, but that looks like highly suspicious side-effect of the transform).
   
   I don't know what is correct solution here, it seems to me, that the root cause is the logical gap in stateless DoFn and how we work with watermarks there.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] egalpin commented on a change in pull request #17112: [BEAM-14064] fix es io windowing

Posted by GitBox <gi...@apache.org>.
egalpin commented on a change in pull request #17112:
URL: https://github.com/apache/beam/pull/17112#discussion_r830249718



##########
File path: sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -2246,27 +2286,43 @@ public static StatefulBatching fromSpec(BulkIO spec) {
         }
 
         return input
-            .apply(ParDo.of(new Reshuffle.AssignShardFn<>(spec.getMaxParallelRequestsPerWindow())))
+            .apply(ParDo.of(new Reshuffle.AssignShardFn<>(spec.getMaxParallelRequests())))
             .apply(groupIntoBatches);
       }
     }
 
     @Override
     public PCollectionTuple expand(PCollection<Document> input) {
       ConnectionConfiguration connectionConfiguration = getConnectionConfiguration();
-
       checkState(connectionConfiguration != null, "withConnectionConfiguration() is required");
 
+      PCollection<Document> docResults;
+      PCollection<Document> globalDocs = input.apply(Window.into(new GlobalWindows()));
+
       if (getUseStatefulBatches()) {
-        return input
-            .apply(StatefulBatching.fromSpec(this))
-            .apply(
-                ParDo.of(new BulkIOStatefulFn(this))
-                    .withOutputTags(Write.SUCCESSFUL_WRITES, TupleTagList.of(Write.FAILED_WRITES)));
+        docResults =
+            globalDocs
+                .apply(StatefulBatching.fromSpec(this))
+                .apply(ParDo.of(new BulkIOStatefulFn(this)));
       } else {
-        return input.apply(
-            ParDo.of(new BulkIOBundleFn(this))
-                .withOutputTags(Write.SUCCESSFUL_WRITES, TupleTagList.of(Write.FAILED_WRITES)));
+        docResults = globalDocs.apply(ParDo.of(new BulkIOBundleFn(this)));
+      }
+
+      return docResults
+          .setWindowingStrategyInternal(input.getWindowingStrategy())
+          .apply(
+              ParDo.of(new ResultFilteringFn())
+                  .withOutputTags(Write.SUCCESSFUL_WRITES, TupleTagList.of(Write.FAILED_WRITES)));
+    }
+
+    private static class ResultFilteringFn extends DoFn<Document, Document> {
+      @ProcessElement
+      public void processElement(@Element Document doc, MultiOutputReceiver out) {
+        if (doc.getHasError()) {
+          out.get(Write.FAILED_WRITES).outputWithTimestamp(doc, doc.getTimestamp());
+        } else {
+          out.get(Write.SUCCESSFUL_WRITES).outputWithTimestamp(doc, doc.getTimestamp());

Review comment:
       Are there testing utilities that can synthesize a bundle to test this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org