You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/06/17 02:17:51 UTC

[beam] branch release-2.40.0 updated: Merge pull request #21895: Drops usage of setWindowingStrategyInternal in favour of direct use of WindowFn

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch release-2.40.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.40.0 by this push:
     new 07891e67aed Merge pull request #21895: Drops usage of setWindowingStrategyInternal in favour of direct use of WindowFn
     new f78453fdd58 Merge pull request #21914 from [cherry-pick][release-2.40.0] Merge pull request #21895: Drops usage of setWindowingStrategyInterna…
07891e67aed is described below

commit 07891e67aeda33f7a2959b4e8fb41edf563e1fc1
Author: Kenneth Knowles <ke...@apache.org>
AuthorDate: Thu Jun 16 07:39:37 2022 -0700

    Merge pull request #21895: Drops usage of setWindowingStrategyInternal in favour of direct use of WindowFn
---
 .../apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java    | 12 ++++++++----
 1 file changed, 8 insertions(+), 4 deletions(-)

diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
index 6b3b40a2b5c..c3619a9674a 100644
--- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
+++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
@@ -75,6 +75,7 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.BackOff;
 import org.apache.beam.sdk.util.BackOffUtils;
 import org.apache.beam.sdk.util.FluentBackoff;
@@ -85,7 +86,6 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
-import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Streams;
@@ -2279,10 +2279,13 @@ public class ElasticsearchIO {
       ConnectionConfiguration connectionConfiguration = getConnectionConfiguration();
       checkState(connectionConfiguration != null, "withConnectionConfiguration() is required");
 
-      WindowingStrategy<?, ?> originalStrategy = input.getWindowingStrategy();
+      @SuppressWarnings("unchecked")
+      WindowFn<Document, ?> originalWindowFn =
+          (WindowFn<Document, ?>) input.getWindowingStrategy().getWindowFn();
 
       PCollection<Document> docResults;
-      PCollection<Document> globalDocs = input.apply(Window.into(new GlobalWindows()));
+      PCollection<Document> globalDocs =
+          input.apply("Window inputs globally", Window.into(new GlobalWindows()));
 
       if (getUseStatefulBatches()) {
         docResults =
@@ -2294,7 +2297,8 @@ public class ElasticsearchIO {
       }
 
       return docResults
-          .setWindowingStrategyInternal(originalStrategy)
+          // Restore windowing of input
+          .apply("Restore original windows", Window.into(originalWindowFn))
           .apply(
               ParDo.of(new ResultFilteringFn())
                   .withOutputTags(Write.SUCCESSFUL_WRITES, TupleTagList.of(Write.FAILED_WRITES)));