You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/06/17 02:17:51 UTC
[beam] branch release-2.40.0 updated: Merge pull request #21895: Drops usage of setWindowingStrategyInternal in favour of direct use of WindowFn
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch release-2.40.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.40.0 by this push:
new 07891e67aed Merge pull request #21895: Drops usage of setWindowingStrategyInternal in favour of direct use of WindowFn
new f78453fdd58 Merge pull request #21914 from [cherry-pick][release-2.40.0] Merge pull request #21895: Drops usage of setWindowingStrategyInterna…
07891e67aed is described below
commit 07891e67aeda33f7a2959b4e8fb41edf563e1fc1
Author: Kenneth Knowles <ke...@apache.org>
AuthorDate: Thu Jun 16 07:39:37 2022 -0700
Merge pull request #21895: Drops usage of setWindowingStrategyInternal in favour of direct use of WindowFn
---
.../apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java | 12 ++++++++----
1 file changed, 8 insertions(+), 4 deletions(-)
diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
index 6b3b40a2b5c..c3619a9674a 100644
--- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
+++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
@@ -75,6 +75,7 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.BackOffUtils;
import org.apache.beam.sdk.util.FluentBackoff;
@@ -85,7 +86,6 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
-import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Streams;
@@ -2279,10 +2279,13 @@ public class ElasticsearchIO {
ConnectionConfiguration connectionConfiguration = getConnectionConfiguration();
checkState(connectionConfiguration != null, "withConnectionConfiguration() is required");
- WindowingStrategy<?, ?> originalStrategy = input.getWindowingStrategy();
+ @SuppressWarnings("unchecked")
+ WindowFn<Document, ?> originalWindowFn =
+ (WindowFn<Document, ?>) input.getWindowingStrategy().getWindowFn();
PCollection<Document> docResults;
- PCollection<Document> globalDocs = input.apply(Window.into(new GlobalWindows()));
+ PCollection<Document> globalDocs =
+ input.apply("Window inputs globally", Window.into(new GlobalWindows()));
if (getUseStatefulBatches()) {
docResults =
@@ -2294,7 +2297,8 @@ public class ElasticsearchIO {
}
return docResults
- .setWindowingStrategyInternal(originalStrategy)
+ // Restore windowing of input
+ .apply("Restore original windows", Window.into(originalWindowFn))
.apply(
ParDo.of(new ResultFilteringFn())
.withOutputTags(Write.SUCCESSFUL_WRITES, TupleTagList.of(Write.FAILED_WRITES)));