You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/03/18 17:56:48 UTC

[GitHub] [beam] egalpin commented on a change in pull request #17112: [BEAM-14064] fix es io windowing

egalpin commented on a change in pull request #17112:
URL: https://github.com/apache/beam/pull/17112#discussion_r830232322



##########
File path: sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -2246,27 +2286,43 @@ public static StatefulBatching fromSpec(BulkIO spec) {
         }
 
         return input
-            .apply(ParDo.of(new Reshuffle.AssignShardFn<>(spec.getMaxParallelRequestsPerWindow())))
+            .apply(ParDo.of(new Reshuffle.AssignShardFn<>(spec.getMaxParallelRequests())))
             .apply(groupIntoBatches);
       }
     }
 
     @Override
     public PCollectionTuple expand(PCollection<Document> input) {
       ConnectionConfiguration connectionConfiguration = getConnectionConfiguration();
-
       checkState(connectionConfiguration != null, "withConnectionConfiguration() is required");
 
+      PCollection<Document> docResults;
+      PCollection<Document> globalDocs = input.apply(Window.into(new GlobalWindows()));
+
       if (getUseStatefulBatches()) {
-        return input
-            .apply(StatefulBatching.fromSpec(this))
-            .apply(
-                ParDo.of(new BulkIOStatefulFn(this))
-                    .withOutputTags(Write.SUCCESSFUL_WRITES, TupleTagList.of(Write.FAILED_WRITES)));
+        docResults =
+            globalDocs
+                .apply(StatefulBatching.fromSpec(this))
+                .apply(ParDo.of(new BulkIOStatefulFn(this)));
       } else {
-        return input.apply(
-            ParDo.of(new BulkIOBundleFn(this))
-                .withOutputTags(Write.SUCCESSFUL_WRITES, TupleTagList.of(Write.FAILED_WRITES)));
+        docResults = globalDocs.apply(ParDo.of(new BulkIOBundleFn(this)));
+      }
+
+      return docResults
+          .setWindowingStrategyInternal(input.getWindowingStrategy())
+          .apply(
+              ParDo.of(new ResultFilteringFn())
+                  .withOutputTags(Write.SUCCESSFUL_WRITES, TupleTagList.of(Write.FAILED_WRITES)));
+    }
+
+    private static class ResultFilteringFn extends DoFn<Document, Document> {
+      @ProcessElement
+      public void processElement(@Element Document doc, MultiOutputReceiver out) {
+        if (doc.getHasError()) {
+          out.get(Write.FAILED_WRITES).outputWithTimestamp(doc, doc.getTimestamp());
+        } else {
+          out.get(Write.SUCCESSFUL_WRITES).outputWithTimestamp(doc, doc.getTimestamp());

Review comment:
       You might be completely correct, I definitely don't feel I have a solid grasp just yet, but I'll give my interpretation of the root cause of the error that prompted this change and my understanding of how this PR might solve it.  I'm also operating under the assumption that the watermark is not advanced between elements of a single bundle, but instead advanced between bundles.
   
   My understanding of the root cause is that it stems from the situation where there are elements in a bundle which belong to different windows.  This can even happen when using GIB because a bundle could have 2 elements where each element is a batch output from GIB.  Entities from each element in the bundle are then buffered.  If there were fewer entities in the bundle element than the max allowable number that can be buffered, the `@ProcessElement` completes with no output.  Then, another element from the bundle begins processing in `@ProcessElement`, this time with an element belonging to a window "ahead" in time than the window of the prior element.  Now any buffered entities cannot be output using `outputWithTimestamp` because the buffered entities have timestamps which are invalid for the window to which the element currently being processed by `@ProcessElement` belongs i.e. in the original Errors, the timestamps are not invalid for the pipeline with respect to the watermark, 
 but are invalid when a different data from one window is output using another arbitrary window.
   
   (this is my understanding, not necessarily fact/reality).
   
   This PR might fix that issue by removing the error of outputting with invalid timestamps.  _If_ the watermark is held until an entire bundle is processed, then the existing error is not arising because the timestamps are invalid compared to the watermark, but strictly due to the window context from which they are attempting to be output.  By rewindowing into the GlobalWindow before all other processing, the Window context should always be the GlobalWindow for all elements in a bundle.  Then when we output with timestamp, the timestamps should no longer fail validation because they would be within the allowable bounds of [watermark, GlobalWindow.MAX_TIMESTAMP).  Then we re-window back into the original/input windowing strategy before putting dealing with everything in the global window.
   
   Thoughts? Please correct above assumptions where I have things wrong.  CC @lukecwik as they deserve credit for the idea of reifying timestamps and global windowing -> rewindowing as a potential solution.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org