You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/11/21 14:57:39 UTC

[GitHub] [beam] mosche opened a new pull request, #24288: [Spark dataset runner] Make sure PCollection views get only broadcasted once if reused

mosche opened a new pull request, #24288:
URL: https://github.com/apache/beam/pull/24288

   The Spark dataset runner currently materializes and broadcasts `PCollections` used as side-input per consuming `ParDo`.
   If a `PCollection` is used as side-input for multiple `ParDos`, it would be evaluated, collected (on the driver) and finally be broadcasted repeatedly for every such `ParDo` (#24035).
   
   To avoid this the broadcast variable is added to `TranslationResult` and will be reused if needed.
   
   Additionally, on the executor side, memory was wasted because we kept both the broadcasted binary data as well as a cached representation of the deserialized values in memory. 
   
   This implementation uses Kryo to broadcast the binary data. During deserialization the binary shape is dropped and only deserialized values are kept. Caching is done per partition for `multimap` materialization (using a lru cache). `iterable` materialization is a no-op as serialization is was already done.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #24288: [Spark dataset runner] Make sure PCollection views get only broadcasted once if reused

Posted by GitBox <gi...@apache.org>.
mosche commented on code in PR #24288:
URL: https://github.com/apache/beam/pull/24288#discussion_r1037806111


##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java:
##########
@@ -83,61 +86,74 @@
       ClassTag.apply(Tuple2.class);
 
   @Override
-  public void translate(ParDo.MultiOutput<InputT, OutputT> transform, Context cxt)
-      throws IOException {
-    String stepName = cxt.getCurrentTransform().getFullName();
-
-    SparkCommonPipelineOptions opts = cxt.getOptions().as(SparkCommonPipelineOptions.class);
-    StorageLevel storageLevel = StorageLevel.fromString(opts.getStorageLevel());
+  public boolean canTranslate(ParDo.MultiOutput<InputT, OutputT> transform) {
+    DoFn<InputT, OutputT> doFn = transform.getFn();
+    DoFnSignature signature = DoFnSignatures.signatureForDoFn(doFn);
 
-    // Check for not supported advanced features
     // TODO: add support of Splittable DoFn
-    DoFn<InputT, OutputT> doFn = transform.getFn();
     checkState(
-        !DoFnSignatures.isSplittable(doFn),
+        !signature.processElement().isSplittable(),
         "Not expected to directly translate splittable DoFn, should have been overridden: %s",
         doFn);
 
     // TODO: add support of states and timers
     checkState(
-        !DoFnSignatures.isStateful(doFn), "States and timers are not supported for the moment.");
+        !signature.usesState() && !signature.usesTimers(),
+        "States and timers are not supported for the moment.");
 
     checkState(
-        !DoFnSignatures.requiresTimeSortedInput(doFn),
+        signature.onWindowExpiration() == null, "onWindowExpiration is not supported: %s", doFn);
+
+    checkState(
+        !signature.processElement().requiresTimeSortedInput(),
         "@RequiresTimeSortedInput is not supported for the moment");
 
+    SparkSideInputReader.validateMaterializations(transform.getSideInputs().values());
+    return true;

Review Comment:
   for ParDos yes 👍  false is used to tell the translator to not translate a composite, but instead rather explode it further



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on a diff in pull request #24288: [Spark dataset runner] Make sure PCollection views get only broadcasted once if reused

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on code in PR #24288:
URL: https://github.com/apache/beam/pull/24288#discussion_r1036906888


##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/SparkSessionFactory.java:
##########
@@ -69,4 +133,75 @@ private static SparkSession.Builder sessionBuilder(
     }
     return SparkSession.builder().config(sparkConf);
   }
+
+  public static class SparkKryoRegistrator implements KryoRegistrator {

Review Comment:
   Should it be `public`? Please, add Javadoc for this class



##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java:
##########
@@ -83,61 +86,74 @@
       ClassTag.apply(Tuple2.class);
 
   @Override
-  public void translate(ParDo.MultiOutput<InputT, OutputT> transform, Context cxt)
-      throws IOException {
-    String stepName = cxt.getCurrentTransform().getFullName();
-
-    SparkCommonPipelineOptions opts = cxt.getOptions().as(SparkCommonPipelineOptions.class);
-    StorageLevel storageLevel = StorageLevel.fromString(opts.getStorageLevel());
+  public boolean canTranslate(ParDo.MultiOutput<InputT, OutputT> transform) {
+    DoFn<InputT, OutputT> doFn = transform.getFn();
+    DoFnSignature signature = DoFnSignatures.signatureForDoFn(doFn);
 
-    // Check for not supported advanced features
     // TODO: add support of Splittable DoFn
-    DoFn<InputT, OutputT> doFn = transform.getFn();
     checkState(
-        !DoFnSignatures.isSplittable(doFn),
+        !signature.processElement().isSplittable(),
         "Not expected to directly translate splittable DoFn, should have been overridden: %s",
         doFn);
 
     // TODO: add support of states and timers
     checkState(
-        !DoFnSignatures.isStateful(doFn), "States and timers are not supported for the moment.");
+        !signature.usesState() && !signature.usesTimers(),
+        "States and timers are not supported for the moment.");
 
     checkState(
-        !DoFnSignatures.requiresTimeSortedInput(doFn),
+        signature.onWindowExpiration() == null, "onWindowExpiration is not supported: %s", doFn);
+
+    checkState(
+        !signature.processElement().requiresTimeSortedInput(),
         "@RequiresTimeSortedInput is not supported for the moment");
 
+    SparkSideInputReader.validateMaterializations(transform.getSideInputs().values());
+    return true;

Review Comment:
   Does it always return `true` in case of passing all checks above?



##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TransformTranslator.java:
##########
@@ -135,6 +135,12 @@ public <T> Dataset<WindowedValue<T>> getDataset(PCollection<T> pCollection) {
       return state.getDataset(pCollection);
     }
 
+    @Override
+    public <T> Broadcast<SideInputValues<T>> getSideInputBroadcast(

Review Comment:
   Should it be `public`?



##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java:
##########
@@ -83,61 +86,74 @@
       ClassTag.apply(Tuple2.class);
 
   @Override
-  public void translate(ParDo.MultiOutput<InputT, OutputT> transform, Context cxt)
-      throws IOException {
-    String stepName = cxt.getCurrentTransform().getFullName();
-
-    SparkCommonPipelineOptions opts = cxt.getOptions().as(SparkCommonPipelineOptions.class);
-    StorageLevel storageLevel = StorageLevel.fromString(opts.getStorageLevel());
+  public boolean canTranslate(ParDo.MultiOutput<InputT, OutputT> transform) {

Review Comment:
   I'd call it something like `isTranslationSupported()`



##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TransformTranslator.java:
##########
@@ -56,13 +57,13 @@
  */
 @Internal
 public abstract class TransformTranslator<
-    InT extends PInput, OutT extends POutput, TransformT extends PTransform<? extends InT, OutT>> {
+    InT extends PInput, OutT extends POutput, TransformT extends PTransform<? super InT, OutT>> {

Review Comment:
   Why it was changed?



##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/CachedSideInputReader.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions;
+
+import static org.apache.beam.sdk.transforms.Materializations.MULTIMAP_MATERIALIZATION_URN;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.transforms.Materialization;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheStats;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * SideInputReader that caches results for costly {@link Materialization Materializations}.
+ * Concurrent access is not expected, but it won't impact correctness.
+ */
+@Internal
+public class CachedSideInputReader implements SideInputReader {

Review Comment:
   Does it support dynamic changes of SideInput ?



##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/SparkSessionFactory.java:
##########
@@ -53,6 +107,16 @@ private static SparkSession.Builder sessionBuilder(
       sparkConf.setJars(jars.toArray(new String[0]));
     }
 
+    if (!sparkConf.contains("spark.serializer")) {

Review Comment:
   Please, add log messages about configuration changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on pull request #24288: [Spark dataset runner] Make sure PCollection views get only broadcasted once if reused

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on PR #24288:
URL: https://github.com/apache/beam/pull/24288#issuecomment-1335145563

   > Though, there's a few other places where visibility can be reduced ...
   
   Sounds good, though, please, take into account the potential breaking changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on pull request #24288: [Spark dataset runner] Make sure PCollection views get only broadcasted once if reused

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on PR #24288:
URL: https://github.com/apache/beam/pull/24288#issuecomment-1335249786

   @mosche Agree. Actually, this runner misses `@Experimental` annotations which is probably still not too late to add. 
   
   On the other hand, I'd argue that is not ready at all to be used since 1) it's not broken and 2) it can execute the most of the Batch pipelines despite missing some features and having some perf issues. 
   
   > Btw, we've decided to rename the runner ... that's going to be one massive breaking change. In my opinion any other (additional) breaking changes until then simply don't matter.
   
   Formally, it can be done by deprecating the packages with old name and wrap all public API to use a package with a new name. Though, as you mentioned, it likely won't be needed because this runner is not much used now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #24288: [Spark dataset runner] Make sure PCollection views get only broadcasted once if reused

Posted by GitBox <gi...@apache.org>.
mosche commented on code in PR #24288:
URL: https://github.com/apache/beam/pull/24288#discussion_r1037806734


##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/CachedSideInputReader.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions;
+
+import static org.apache.beam.sdk.transforms.Materializations.MULTIMAP_MATERIALIZATION_URN;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.transforms.Materialization;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheStats;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * SideInputReader that caches results for costly {@link Materialization Materializations}.
+ * Concurrent access is not expected, but it won't impact correctness.
+ */
+@Internal
+public class CachedSideInputReader implements SideInputReader {

Review Comment:
   No, that's not any different from before ... with broadcast variables there's absolutely no changes possible to side-inputs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on pull request #24288: [Spark dataset runner] Make sure PCollection views get only broadcasted once if reused

Posted by GitBox <gi...@apache.org>.
mosche commented on PR #24288:
URL: https://github.com/apache/beam/pull/24288#issuecomment-1322192961

   R: @aromanenko-dev 
   R: @echauchot 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #24288: [Spark dataset runner] Make sure PCollection views get only broadcasted once if reused

Posted by GitBox <gi...@apache.org>.
mosche commented on code in PR #24288:
URL: https://github.com/apache/beam/pull/24288#discussion_r1037806334


##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java:
##########
@@ -83,61 +86,74 @@
       ClassTag.apply(Tuple2.class);
 
   @Override
-  public void translate(ParDo.MultiOutput<InputT, OutputT> transform, Context cxt)
-      throws IOException {
-    String stepName = cxt.getCurrentTransform().getFullName();
-
-    SparkCommonPipelineOptions opts = cxt.getOptions().as(SparkCommonPipelineOptions.class);
-    StorageLevel storageLevel = StorageLevel.fromString(opts.getStorageLevel());
+  public boolean canTranslate(ParDo.MultiOutput<InputT, OutputT> transform) {

Review Comment:
   I personally prefer canTranslate



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on a diff in pull request #24288: [Spark dataset runner] Make sure PCollection views get only broadcasted once if reused

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on code in PR #24288:
URL: https://github.com/apache/beam/pull/24288#discussion_r1038162905


##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java:
##########
@@ -83,61 +86,74 @@
       ClassTag.apply(Tuple2.class);
 
   @Override
-  public void translate(ParDo.MultiOutput<InputT, OutputT> transform, Context cxt)
-      throws IOException {
-    String stepName = cxt.getCurrentTransform().getFullName();
-
-    SparkCommonPipelineOptions opts = cxt.getOptions().as(SparkCommonPipelineOptions.class);
-    StorageLevel storageLevel = StorageLevel.fromString(opts.getStorageLevel());
+  public boolean canTranslate(ParDo.MultiOutput<InputT, OutputT> transform) {
+    DoFn<InputT, OutputT> doFn = transform.getFn();
+    DoFnSignature signature = DoFnSignatures.signatureForDoFn(doFn);
 
-    // Check for not supported advanced features
     // TODO: add support of Splittable DoFn
-    DoFn<InputT, OutputT> doFn = transform.getFn();
     checkState(
-        !DoFnSignatures.isSplittable(doFn),
+        !signature.processElement().isSplittable(),
         "Not expected to directly translate splittable DoFn, should have been overridden: %s",
         doFn);
 
     // TODO: add support of states and timers
     checkState(
-        !DoFnSignatures.isStateful(doFn), "States and timers are not supported for the moment.");
+        !signature.usesState() && !signature.usesTimers(),
+        "States and timers are not supported for the moment.");
 
     checkState(
-        !DoFnSignatures.requiresTimeSortedInput(doFn),
+        signature.onWindowExpiration() == null, "onWindowExpiration is not supported: %s", doFn);
+
+    checkState(
+        !signature.processElement().requiresTimeSortedInput(),
         "@RequiresTimeSortedInput is not supported for the moment");
 
+    SparkSideInputReader.validateMaterializations(transform.getSideInputs().values());
+    return true;

Review Comment:
   No, I think it's fine since it explains this in Javadoc.
   
   Should not be `RuntimeException` added to method signature?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev merged pull request #24288: [Spark dataset runner] Make sure PCollection views get only broadcasted once if reused

Posted by GitBox <gi...@apache.org>.
aromanenko-dev merged PR #24288:
URL: https://github.com/apache/beam/pull/24288


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #24288: [Spark dataset runner] Make sure PCollection views get only broadcasted once if reused

Posted by GitBox <gi...@apache.org>.
mosche commented on code in PR #24288:
URL: https://github.com/apache/beam/pull/24288#discussion_r1038173366


##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java:
##########
@@ -83,61 +86,74 @@
       ClassTag.apply(Tuple2.class);
 
   @Override
-  public void translate(ParDo.MultiOutput<InputT, OutputT> transform, Context cxt)
-      throws IOException {
-    String stepName = cxt.getCurrentTransform().getFullName();
-
-    SparkCommonPipelineOptions opts = cxt.getOptions().as(SparkCommonPipelineOptions.class);
-    StorageLevel storageLevel = StorageLevel.fromString(opts.getStorageLevel());
+  public boolean canTranslate(ParDo.MultiOutput<InputT, OutputT> transform) {
+    DoFn<InputT, OutputT> doFn = transform.getFn();
+    DoFnSignature signature = DoFnSignatures.signatureForDoFn(doFn);
 
-    // Check for not supported advanced features
     // TODO: add support of Splittable DoFn
-    DoFn<InputT, OutputT> doFn = transform.getFn();
     checkState(
-        !DoFnSignatures.isSplittable(doFn),
+        !signature.processElement().isSplittable(),
         "Not expected to directly translate splittable DoFn, should have been overridden: %s",
         doFn);
 
     // TODO: add support of states and timers
     checkState(
-        !DoFnSignatures.isStateful(doFn), "States and timers are not supported for the moment.");
+        !signature.usesState() && !signature.usesTimers(),
+        "States and timers are not supported for the moment.");
 
     checkState(
-        !DoFnSignatures.requiresTimeSortedInput(doFn),
+        signature.onWindowExpiration() == null, "onWindowExpiration is not supported: %s", doFn);
+
+    checkState(
+        !signature.processElement().requiresTimeSortedInput(),
         "@RequiresTimeSortedInput is not supported for the moment");
 
+    SparkSideInputReader.validateMaterializations(transform.getSideInputs().values());
+    return true;

Review Comment:
   👍 updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #24288: [Spark dataset runner] Make sure PCollection views get only broadcasted once if reused

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #24288:
URL: https://github.com/apache/beam/pull/24288#issuecomment-1322194717

   Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on pull request #24288: [Spark dataset runner] Make sure PCollection views get only broadcasted once if reused

Posted by GitBox <gi...@apache.org>.
mosche commented on PR #24288:
URL: https://github.com/apache/beam/pull/24288#issuecomment-1335228894

   > Sounds good, though, please, take into account the potential breaking changes.
   
   @aromanenko-dev Let's agree on the fact that this runner is currently not used at all, also there's still so many gaps that the runner isn't ready for production usage.  And last but not least, the runner is very noisy about being experimental, breaking changes are to be expected! 
   
   Let's not worry so much about breaking any interface for that runner, but focus on how to get the quality / issues fixed without making that effort unnecessary difficulty so that it can finally be used. Btw, we've decided to rename the runner ... that's going to be one massive breaking change. In my opinion any other (additional) breaking changes until then simply don't matter.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #24288: [Spark dataset runner] Make sure PCollection views get only broadcasted once if reused

Posted by GitBox <gi...@apache.org>.
mosche commented on code in PR #24288:
URL: https://github.com/apache/beam/pull/24288#discussion_r1038150214


##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java:
##########
@@ -83,61 +86,74 @@
       ClassTag.apply(Tuple2.class);
 
   @Override
-  public void translate(ParDo.MultiOutput<InputT, OutputT> transform, Context cxt)
-      throws IOException {
-    String stepName = cxt.getCurrentTransform().getFullName();
-
-    SparkCommonPipelineOptions opts = cxt.getOptions().as(SparkCommonPipelineOptions.class);
-    StorageLevel storageLevel = StorageLevel.fromString(opts.getStorageLevel());
+  public boolean canTranslate(ParDo.MultiOutput<InputT, OutputT> transform) {
+    DoFn<InputT, OutputT> doFn = transform.getFn();
+    DoFnSignature signature = DoFnSignatures.signatureForDoFn(doFn);
 
-    // Check for not supported advanced features
     // TODO: add support of Splittable DoFn
-    DoFn<InputT, OutputT> doFn = transform.getFn();
     checkState(
-        !DoFnSignatures.isSplittable(doFn),
+        !signature.processElement().isSplittable(),
         "Not expected to directly translate splittable DoFn, should have been overridden: %s",
         doFn);
 
     // TODO: add support of states and timers
     checkState(
-        !DoFnSignatures.isStateful(doFn), "States and timers are not supported for the moment.");
+        !signature.usesState() && !signature.usesTimers(),
+        "States and timers are not supported for the moment.");
 
     checkState(
-        !DoFnSignatures.requiresTimeSortedInput(doFn),
+        signature.onWindowExpiration() == null, "onWindowExpiration is not supported: %s", doFn);
+
+    checkState(
+        !signature.processElement().requiresTimeSortedInput(),
         "@RequiresTimeSortedInput is not supported for the moment");
 
+    SparkSideInputReader.validateMaterializations(transform.getSideInputs().values());
+    return true;

Review Comment:
   The right place to check in that case is the javadocs of the default implementation I suppose. Should I rephrase / clarify the javadocs further or does it make sense as is?
   ```
     /**
      * Checks if a composite / primitive transform can be translated. Composites that cannot be
      * translated as is, will be exploded further for translation of their parts.
      *
      * <p>This should be overridden where necessary. If a transform is know to be unsupported, this
      * should throw a runtime exception to give early feedback before any part of the pipeline is run.
      */
     protected boolean canTranslate(TransformT transform) 
     ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on pull request #24288: [Spark dataset runner] Make sure PCollection views get only broadcasted once if reused

Posted by GitBox <gi...@apache.org>.
mosche commented on PR #24288:
URL: https://github.com/apache/beam/pull/24288#issuecomment-1334862993

   Though, there's a few other places where visibility can be reduced ... I'll follow up with a separate PR @aromanenko-dev 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on a diff in pull request #24288: [Spark dataset runner] Make sure PCollection views get only broadcasted once if reused

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on code in PR #24288:
URL: https://github.com/apache/beam/pull/24288#discussion_r1038071514


##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java:
##########
@@ -83,61 +86,74 @@
       ClassTag.apply(Tuple2.class);
 
   @Override
-  public void translate(ParDo.MultiOutput<InputT, OutputT> transform, Context cxt)
-      throws IOException {
-    String stepName = cxt.getCurrentTransform().getFullName();
-
-    SparkCommonPipelineOptions opts = cxt.getOptions().as(SparkCommonPipelineOptions.class);
-    StorageLevel storageLevel = StorageLevel.fromString(opts.getStorageLevel());
+  public boolean canTranslate(ParDo.MultiOutput<InputT, OutputT> transform) {

Review Comment:
   Ok, though sounds a bit confusing, imho



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on a diff in pull request #24288: [Spark dataset runner] Make sure PCollection views get only broadcasted once if reused

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on code in PR #24288:
URL: https://github.com/apache/beam/pull/24288#discussion_r1038134689


##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java:
##########
@@ -83,61 +86,74 @@
       ClassTag.apply(Tuple2.class);
 
   @Override
-  public void translate(ParDo.MultiOutput<InputT, OutputT> transform, Context cxt)
-      throws IOException {
-    String stepName = cxt.getCurrentTransform().getFullName();
-
-    SparkCommonPipelineOptions opts = cxt.getOptions().as(SparkCommonPipelineOptions.class);
-    StorageLevel storageLevel = StorageLevel.fromString(opts.getStorageLevel());
+  public boolean canTranslate(ParDo.MultiOutput<InputT, OutputT> transform) {
+    DoFn<InputT, OutputT> doFn = transform.getFn();
+    DoFnSignature signature = DoFnSignatures.signatureForDoFn(doFn);
 
-    // Check for not supported advanced features
     // TODO: add support of Splittable DoFn
-    DoFn<InputT, OutputT> doFn = transform.getFn();
     checkState(
-        !DoFnSignatures.isSplittable(doFn),
+        !signature.processElement().isSplittable(),
         "Not expected to directly translate splittable DoFn, should have been overridden: %s",
         doFn);
 
     // TODO: add support of states and timers
     checkState(
-        !DoFnSignatures.isStateful(doFn), "States and timers are not supported for the moment.");
+        !signature.usesState() && !signature.usesTimers(),
+        "States and timers are not supported for the moment.");
 
     checkState(
-        !DoFnSignatures.requiresTimeSortedInput(doFn),
+        signature.onWindowExpiration() == null, "onWindowExpiration is not supported: %s", doFn);
+
+    checkState(
+        !signature.processElement().requiresTimeSortedInput(),
         "@RequiresTimeSortedInput is not supported for the moment");
 
+    SparkSideInputReader.validateMaterializations(transform.getSideInputs().values());
+    return true;

Review Comment:
   Yes, I missed `@Override` there with this git diff representation since I thought it's related to `translate()` method .
   Then it's ok though still a bit confusing since it's not clear that it throws an exception without looking into the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on pull request #24288: [Spark dataset runner] Make sure PCollection views get only broadcasted once if reused

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on PR #24288:
URL: https://github.com/apache/beam/pull/24288#issuecomment-1323405177

   @echauchot Sure, I'll take a look, thanks 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] echauchot commented on pull request #24288: [Spark dataset runner] Make sure PCollection views get only broadcasted once if reused

Posted by GitBox <gi...@apache.org>.
echauchot commented on PR #24288:
URL: https://github.com/apache/beam/pull/24288#issuecomment-1322203786

   @aromanenko-dev would you be available to review this one ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on pull request #24288: [Spark dataset runner] Make sure PCollection views get only broadcasted once if reused

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on PR #24288:
URL: https://github.com/apache/beam/pull/24288#issuecomment-1323407166

   Run Spark StructuredStreaming ValidatesRunner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #24288: [Spark dataset runner] Make sure PCollection views get only broadcasted once if reused

Posted by GitBox <gi...@apache.org>.
mosche commented on code in PR #24288:
URL: https://github.com/apache/beam/pull/24288#discussion_r1037811427


##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TransformTranslator.java:
##########
@@ -56,13 +57,13 @@
  */
 @Internal
 public abstract class TransformTranslator<
-    InT extends PInput, OutT extends POutput, TransformT extends PTransform<? extends InT, OutT>> {
+    InT extends PInput, OutT extends POutput, TransformT extends PTransform<? super InT, OutT>> {

Review Comment:
   That was to better support the signature of `AppliedPTransform`, though simplified this to just `InT` to match either direction
   ```
   AppliedPTransform<
       InputT extends PInput,
       OutputT extends POutput,
       TransformT extends PTransform<? super InputT, OutputT>>
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #24288: [Spark dataset runner] Make sure PCollection views get only broadcasted once if reused

Posted by GitBox <gi...@apache.org>.
mosche commented on code in PR #24288:
URL: https://github.com/apache/beam/pull/24288#discussion_r1038122605


##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java:
##########
@@ -83,61 +86,74 @@
       ClassTag.apply(Tuple2.class);
 
   @Override
-  public void translate(ParDo.MultiOutput<InputT, OutputT> transform, Context cxt)
-      throws IOException {
-    String stepName = cxt.getCurrentTransform().getFullName();
-
-    SparkCommonPipelineOptions opts = cxt.getOptions().as(SparkCommonPipelineOptions.class);
-    StorageLevel storageLevel = StorageLevel.fromString(opts.getStorageLevel());
+  public boolean canTranslate(ParDo.MultiOutput<InputT, OutputT> transform) {
+    DoFn<InputT, OutputT> doFn = transform.getFn();
+    DoFnSignature signature = DoFnSignatures.signatureForDoFn(doFn);
 
-    // Check for not supported advanced features
     // TODO: add support of Splittable DoFn
-    DoFn<InputT, OutputT> doFn = transform.getFn();
     checkState(
-        !DoFnSignatures.isSplittable(doFn),
+        !signature.processElement().isSplittable(),
         "Not expected to directly translate splittable DoFn, should have been overridden: %s",
         doFn);
 
     // TODO: add support of states and timers
     checkState(
-        !DoFnSignatures.isStateful(doFn), "States and timers are not supported for the moment.");
+        !signature.usesState() && !signature.usesTimers(),
+        "States and timers are not supported for the moment.");
 
     checkState(
-        !DoFnSignatures.requiresTimeSortedInput(doFn),
+        signature.onWindowExpiration() == null, "onWindowExpiration is not supported: %s", doFn);
+
+    checkState(
+        !signature.processElement().requiresTimeSortedInput(),
         "@RequiresTimeSortedInput is not supported for the moment");
 
+    SparkSideInputReader.validateMaterializations(transform.getSideInputs().values());
+    return true;

Review Comment:
   Not sure I understand your points @aromanenko-dev... Have you missed the `@Override` ?
   ```
   @Override
   public boolean canTranslate(ParDo.MultiOutput<InputT, OutputT> transform) { ... }
   ```
   
   This overrides a default implementation that has to support 3 different cases:
   - `true` if the PTransform can be translated (that's the return value of the default implementation)
   - `false` if a composite `PTransform` cannot be translated and should be further expanded into it's parts (see [this example](https://github.com/apache/beam/blob/master/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGroupedValuesTranslatorBatch.java#L58-L60) in case of `CombineWithContext`)
   - throw an exception if usage of unsupported features is detected
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on a diff in pull request #24288: [Spark dataset runner] Make sure PCollection views get only broadcasted once if reused

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on code in PR #24288:
URL: https://github.com/apache/beam/pull/24288#discussion_r1037201066


##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/CachedSideInputReader.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions;
+
+import static org.apache.beam.sdk.transforms.Materializations.MULTIMAP_MATERIALIZATION_URN;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.transforms.Materialization;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheStats;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * SideInputReader that caches results for costly {@link Materialization Materializations}.
+ * Concurrent access is not expected, but it won't impact correctness.
+ */
+@Internal
+public class CachedSideInputReader implements SideInputReader {

Review Comment:
   Does it support dynamic changes of side input values?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on a diff in pull request #24288: [Spark dataset runner] Make sure PCollection views get only broadcasted once if reused

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on code in PR #24288:
URL: https://github.com/apache/beam/pull/24288#discussion_r1038073895


##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java:
##########
@@ -83,61 +86,74 @@
       ClassTag.apply(Tuple2.class);
 
   @Override
-  public void translate(ParDo.MultiOutput<InputT, OutputT> transform, Context cxt)
-      throws IOException {
-    String stepName = cxt.getCurrentTransform().getFullName();
-
-    SparkCommonPipelineOptions opts = cxt.getOptions().as(SparkCommonPipelineOptions.class);
-    StorageLevel storageLevel = StorageLevel.fromString(opts.getStorageLevel());
+  public boolean canTranslate(ParDo.MultiOutput<InputT, OutputT> transform) {
+    DoFn<InputT, OutputT> doFn = transform.getFn();
+    DoFnSignature signature = DoFnSignatures.signatureForDoFn(doFn);
 
-    // Check for not supported advanced features
     // TODO: add support of Splittable DoFn
-    DoFn<InputT, OutputT> doFn = transform.getFn();
     checkState(
-        !DoFnSignatures.isSplittable(doFn),
+        !signature.processElement().isSplittable(),
         "Not expected to directly translate splittable DoFn, should have been overridden: %s",
         doFn);
 
     // TODO: add support of states and timers
     checkState(
-        !DoFnSignatures.isStateful(doFn), "States and timers are not supported for the moment.");
+        !signature.usesState() && !signature.usesTimers(),
+        "States and timers are not supported for the moment.");
 
     checkState(
-        !DoFnSignatures.requiresTimeSortedInput(doFn),
+        signature.onWindowExpiration() == null, "onWindowExpiration is not supported: %s", doFn);
+
+    checkState(
+        !signature.processElement().requiresTimeSortedInput(),
         "@RequiresTimeSortedInput is not supported for the moment");
 
+    SparkSideInputReader.validateMaterializations(transform.getSideInputs().values());
+    return true;

Review Comment:
   Yes that is why it should not return a boolean since it makes confusing. Instead, either passing by and return void or through an exception. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on a diff in pull request #24288: [Spark dataset runner] Make sure PCollection views get only broadcasted once if reused

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on code in PR #24288:
URL: https://github.com/apache/beam/pull/24288#discussion_r1038074689


##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java:
##########
@@ -83,61 +86,74 @@
       ClassTag.apply(Tuple2.class);
 
   @Override
-  public void translate(ParDo.MultiOutput<InputT, OutputT> transform, Context cxt)
-      throws IOException {
-    String stepName = cxt.getCurrentTransform().getFullName();
-
-    SparkCommonPipelineOptions opts = cxt.getOptions().as(SparkCommonPipelineOptions.class);
-    StorageLevel storageLevel = StorageLevel.fromString(opts.getStorageLevel());
+  public boolean canTranslate(ParDo.MultiOutput<InputT, OutputT> transform) {

Review Comment:
   It looks that this method should not return `boolean` at all (see my other comment below)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on a diff in pull request #24288: [Spark dataset runner] Make sure PCollection views get only broadcasted once if reused

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on code in PR #24288:
URL: https://github.com/apache/beam/pull/24288#discussion_r1038135133


##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java:
##########
@@ -83,61 +86,74 @@
       ClassTag.apply(Tuple2.class);
 
   @Override
-  public void translate(ParDo.MultiOutput<InputT, OutputT> transform, Context cxt)
-      throws IOException {
-    String stepName = cxt.getCurrentTransform().getFullName();
-
-    SparkCommonPipelineOptions opts = cxt.getOptions().as(SparkCommonPipelineOptions.class);
-    StorageLevel storageLevel = StorageLevel.fromString(opts.getStorageLevel());
+  public boolean canTranslate(ParDo.MultiOutput<InputT, OutputT> transform) {

Review Comment:
   Yes, I answered there but it's only half-used here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #24288: [Spark dataset runner] Make sure PCollection views get only broadcasted once if reused

Posted by GitBox <gi...@apache.org>.
mosche commented on code in PR #24288:
URL: https://github.com/apache/beam/pull/24288#discussion_r1037806334


##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java:
##########
@@ -83,61 +86,74 @@
       ClassTag.apply(Tuple2.class);
 
   @Override
-  public void translate(ParDo.MultiOutput<InputT, OutputT> transform, Context cxt)
-      throws IOException {
-    String stepName = cxt.getCurrentTransform().getFullName();
-
-    SparkCommonPipelineOptions opts = cxt.getOptions().as(SparkCommonPipelineOptions.class);
-    StorageLevel storageLevel = StorageLevel.fromString(opts.getStorageLevel());
+  public boolean canTranslate(ParDo.MultiOutput<InputT, OutputT> transform) {

Review Comment:
   I personally prefer `canTranslate` over `isTranslationSupported`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #24288: [Spark dataset runner] Make sure PCollection views get only broadcasted once if reused

Posted by GitBox <gi...@apache.org>.
mosche commented on code in PR #24288:
URL: https://github.com/apache/beam/pull/24288#discussion_r1037803011


##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TransformTranslator.java:
##########
@@ -135,6 +135,12 @@ public <T> Dataset<WindowedValue<T>> getDataset(PCollection<T> pCollection) {
       return state.getDataset(pCollection);
     }
 
+    @Override
+    public <T> Broadcast<SideInputValues<T>> getSideInputBroadcast(

Review Comment:
   yes, concrete impls are in the `batch` package



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #24288: [Spark dataset runner] Make sure PCollection views get only broadcasted once if reused

Posted by GitBox <gi...@apache.org>.
mosche commented on code in PR #24288:
URL: https://github.com/apache/beam/pull/24288#discussion_r1037803912


##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/SparkSessionFactory.java:
##########
@@ -69,4 +133,75 @@ private static SparkSession.Builder sessionBuilder(
     }
     return SparkSession.builder().config(sparkConf);
   }
+
+  public static class SparkKryoRegistrator implements KryoRegistrator {

Review Comment:
   👍 must be accessible to Spark/Kryo, hence public



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on pull request #24288: [Spark dataset runner] Make sure PCollection views get only broadcasted once if reused

Posted by GitBox <gi...@apache.org>.
mosche commented on PR #24288:
URL: https://github.com/apache/beam/pull/24288#issuecomment-1334858349

   > Also, there were added a bunch of new classes/methods that are public. Do we really need them be public? if not then I'd suggest to check them and reduce the scope if possible.
   
   Yes, in these cases public is necessary... different packages or implementing methods of an interface


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on pull request #24288: [Spark dataset runner] Make sure PCollection views get only broadcasted once if reused

Posted by GitBox <gi...@apache.org>.
mosche commented on PR #24288:
URL: https://github.com/apache/beam/pull/24288#issuecomment-1334857634

   Run Spark StructuredStreaming ValidatesRunner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #24288: [Spark dataset runner] Make sure PCollection views get only broadcasted once if reused

Posted by GitBox <gi...@apache.org>.
mosche commented on code in PR #24288:
URL: https://github.com/apache/beam/pull/24288#discussion_r1038122934


##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java:
##########
@@ -83,61 +86,74 @@
       ClassTag.apply(Tuple2.class);
 
   @Override
-  public void translate(ParDo.MultiOutput<InputT, OutputT> transform, Context cxt)
-      throws IOException {
-    String stepName = cxt.getCurrentTransform().getFullName();
-
-    SparkCommonPipelineOptions opts = cxt.getOptions().as(SparkCommonPipelineOptions.class);
-    StorageLevel storageLevel = StorageLevel.fromString(opts.getStorageLevel());
+  public boolean canTranslate(ParDo.MultiOutput<InputT, OutputT> transform) {

Review Comment:
   Supporting `false` is absolutely required, pls see above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on pull request #24288: [Spark dataset runner] Make sure PCollection views get only broadcasted once if reused

Posted by GitBox <gi...@apache.org>.
mosche commented on PR #24288:
URL: https://github.com/apache/beam/pull/24288#issuecomment-1335255623

   @aromanenko-dev There's still a couple of things massively broken ... current master is only working in local mode, just noticed this the last few days. That bug originates from some badly broken metrics interfaces. One of the next things to work on... :/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on pull request #24288: [Spark dataset runner] Make sure PCollection views get only broadcasted once if reused

Posted by GitBox <gi...@apache.org>.
mosche commented on PR #24288:
URL: https://github.com/apache/beam/pull/24288#issuecomment-1335256115

   Good point on `@Experimental`, I'll add that!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on pull request #24288: [Spark dataset runner] Make sure PCollection views get only broadcasted once if reused

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on PR #24288:
URL: https://github.com/apache/beam/pull/24288#issuecomment-1335262718

   > There's still a couple of things massively broken ... current master is only working in local mode, just noticed this the last few days. 
   
   Ouch... It would be great to have any kind of regressions tests for this to detect it earlier (for RDD runner as well). Though it's a different topic for sure.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on pull request #24288: [Spark dataset runner] Make sure PCollection views get only broadcasted once if reused

Posted by GitBox <gi...@apache.org>.
mosche commented on PR #24288:
URL: https://github.com/apache/beam/pull/24288#issuecomment-1335265098

   > Ouch... It would be great to have any kind of regressions tests for this to detect it earlier (for RDD runner as well). Though it's a different topic for sure.
   
   Yes :( Let's discuss this as one of the next things, just relying on the local runner is really bad.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org