You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/08/08 15:36:54 UTC

[GitHub] [beam] mosche opened a new pull request, #22620: Support VR test including TestStream for Spark runner in streaming mode

mosche opened a new pull request, #22620:
URL: https://github.com/apache/beam/pull/22620

   Run VR tests for Spark streaming runner rather than custom tests (test are already run as part of the "normal" unit test run).
   
   If `forceStreaming` is set to `true`, the `TestSparkRunner` will replace `Read.Bounded` with `UnboundedReadFromBoundedSource` so tests are run in streaming mode.
   Additionally this PR adds support for `TestStream`.
   
   Closes #22472
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #22620: Support VR test including TestStream for Spark runner in streaming mode

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #22620:
URL: https://github.com/apache/beam/pull/22620#issuecomment-1235428728

   Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @lukecwik for label java.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kennknowles commented on pull request #22620: Support VR test including TestStream for Spark runner in streaming mode

Posted by GitBox <gi...@apache.org>.
kennknowles commented on PR #22620:
URL: https://github.com/apache/beam/pull/22620#issuecomment-1245936485

   I don't know if anyone currently around would be familiar with SparkRunner watermark propagation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #22620: Support VR test including TestStream for Spark runner in streaming mode

Posted by GitBox <gi...@apache.org>.
mosche commented on code in PR #22620:
URL: https://github.com/apache/beam/pull/22620#discussion_r968579281


##########
runners/spark/spark_runner.gradle:
##########
@@ -260,21 +265,60 @@ def validatesRunnerStreaming = tasks.register("validatesRunnerStreaming", Test)
   group = "Verification"
   // Disable gradle cache
   outputs.upToDateWhen { false }
-  def pipelineOptions = JsonOutput.toJson([
-    "--runner=TestSparkRunner",
-    "--forceStreaming=true",
-    "--enableSparkMetricSinks=true",
-  ])
-  systemProperty "beamTestPipelineOptions", pipelineOptions
+  systemProperties sparkTestProperties(["--enableSparkMetricSinks": "true", "--forceStreaming": "true"])
 
   classpath = configurations.validatesRunner
-  testClassesDirs += files(project.sourceSets.test.output.classesDirs)
+  testClassesDirs += files(
+    project(":sdks:java:core").sourceSets.test.output.classesDirs,

Review Comment:
   Yes, in fact VR test were never run/supported for Spark in streaming mode. I just stumbled on this accidentally when I started looking into bugs related to onWindowExpiration :(
   Instead there was some custom tests in the module that try to mimic the VR test, but they only cover a very small part (and also run as unit tests). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kennknowles commented on pull request #22620: Support VR test including TestStream for Spark runner in streaming mode

Posted by GitBox <gi...@apache.org>.
kennknowles commented on PR #22620:
URL: https://github.com/apache/beam/pull/22620#issuecomment-1247251523

   > * Nevertheless, it also showed that the approach is somehow flawed. Some bounded test cases simply cannot be forced into a streaming execution, e.g. any GroupByKey will fail on the GlobalWindow if there's no trigger set.
   
   In the Beam model, this condition is that a GroupByKey of an _unbounded_ PCollection in global window must have a trigger. But you can still have a bounded PCollection in streaming mode.
   
   So the summary is:
   
    - forcing a run in streaming mode, but leaving bounded PCollections as bounded is OK
    - automatically making all PCollections unbounded is flawed (but still can be useful to find bugs sometimes)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kennknowles commented on a diff in pull request #22620: Support VR test including TestStream for Spark runner in streaming mode

Posted by GitBox <gi...@apache.org>.
kennknowles commented on code in PR #22620:
URL: https://github.com/apache/beam/pull/22620#discussion_r943767277


##########
sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java:
##########
@@ -105,6 +105,9 @@ public void testLateDataAccumulating() {
             .advanceWatermarkTo(instant.plus(Duration.standardMinutes(6)))
             // These elements are late but within the allowed lateness
             .addElements(TimestampedValue.of(4L, instant), TimestampedValue.of(5L, instant))
+            .advanceWatermarkTo(instant.plus(Duration.standardMinutes(10)))

Review Comment:
   Just responding to let you know I have been on vacation and I will look at this later today.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on pull request #22620: Support VR test including TestStream for Spark runner in streaming mode

Posted by GitBox <gi...@apache.org>.
mosche commented on PR #22620:
URL: https://github.com/apache/beam/pull/22620#issuecomment-1246691653

   Run Spark ValidatesRunner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on pull request #22620: Support VR test including TestStream for Spark runner in streaming mode

Posted by GitBox <gi...@apache.org>.
mosche commented on PR #22620:
URL: https://github.com/apache/beam/pull/22620#issuecomment-1250763077

   @kennknowles fine to merge this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kennknowles commented on pull request #22620: Support VR test including TestStream for Spark runner in streaming mode

Posted by GitBox <gi...@apache.org>.
kennknowles commented on PR #22620:
URL: https://github.com/apache/beam/pull/22620#issuecomment-1245939772

   I think it is valuable to get these tests running and disable them. The test and list of disabled tests can be a real representation of the current state. That way things that are green can stay green.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kennknowles commented on pull request #22620: Support VR test including TestStream for Spark runner in streaming mode

Posted by GitBox <gi...@apache.org>.
kennknowles commented on PR #22620:
URL: https://github.com/apache/beam/pull/22620#issuecomment-1245939875

   run spark validatesrunner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #22620: Support VR test including TestStream for Spark runner in streaming mode

Posted by GitBox <gi...@apache.org>.
mosche commented on code in PR #22620:
URL: https://github.com/apache/beam/pull/22620#discussion_r969534303


##########
runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java:
##########
@@ -153,4 +192,84 @@ private static void awaitWatermarksOrTimeout(
     } while ((timeoutMillis -= batchDurationMillis) > 0
         && globalWatermark.isBefore(stopPipelineWatermark));
   }
+
+  /**
+   * Override factory to replace {@link Read.Unbounded} with {@link UnboundedReadFromBoundedSource}
+   * to force streaming mode.
+   */
+  private static class UnboundedReadFromBoundedSourceOverrideFactory<T>

Review Comment:
   Happy to do that though I'm not entirely sure if the factory is of much value by itself. I also had to fix the outputs in a non trivial way using a visitor after the replacement, see https://github.com/apache/beam/pull/22620/files#diff-d81f49eb0330230bd03ce6cd33b5f70f59c443aac57741e877ececbada32b16bR246-R274. I couldn't find a way to achieve this in `mapOutputs` of the override factory itself. 
   Let me know what you think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on pull request #22620: Support VR test including TestStream for Spark runner in streaming mode

Posted by GitBox <gi...@apache.org>.
mosche commented on PR #22620:
URL: https://github.com/apache/beam/pull/22620#issuecomment-1243952655

   @kennknowles FYI, a significant number of VR tests are constantly failing here. If I run them independently they usually succeed. It looks like there's some indeterminism around watermark propagation in the runner, see https://github.com/apache/beam/issues/23129.
   Wondering, would you know anyone who's familiar with that code?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #22620: Support VR test including TestStream for Spark runner in streaming mode

Posted by GitBox <gi...@apache.org>.
mosche commented on code in PR #22620:
URL: https://github.com/apache/beam/pull/22620#discussion_r962732328


##########
sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java:
##########
@@ -105,6 +105,9 @@ public void testLateDataAccumulating() {
             .advanceWatermarkTo(instant.plus(Duration.standardMinutes(6)))
             // These elements are late but within the allowed lateness
             .addElements(TimestampedValue.of(4L, instant), TimestampedValue.of(5L, instant))
+            .advanceWatermarkTo(instant.plus(Duration.standardMinutes(10)))

Review Comment:
   @kennknowles Finally back to this, if you could have a look it would be great:)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on pull request #22620: Support VR test including TestStream for Spark runner in streaming mode

Posted by GitBox <gi...@apache.org>.
mosche commented on PR #22620:
URL: https://github.com/apache/beam/pull/22620#issuecomment-1209116853

   Run Spark ValidatesRunner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kennknowles merged pull request #22620: Support VR test including TestStream for Spark runner in streaming mode

Posted by GitBox <gi...@apache.org>.
kennknowles merged PR #22620:
URL: https://github.com/apache/beam/pull/22620


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on pull request #22620: Support VR test including TestStream for Spark runner in streaming mode

Posted by GitBox <gi...@apache.org>.
mosche commented on PR #22620:
URL: https://github.com/apache/beam/pull/22620#issuecomment-1209096944

   Run Spark ValidatesRunner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kennknowles commented on pull request #22620: Support VR test including TestStream for Spark runner in streaming mode

Posted by GitBox <gi...@apache.org>.
kennknowles commented on PR #22620:
URL: https://github.com/apache/beam/pull/22620#issuecomment-1243916292

   Run Spark ValidatesRunner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #22620: Support VR test including TestStream for Spark runner in streaming mode

Posted by GitBox <gi...@apache.org>.
mosche commented on code in PR #22620:
URL: https://github.com/apache/beam/pull/22620#discussion_r969328513


##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java:
##########
@@ -85,7 +85,14 @@ public JavaStreamingContext call() throws Exception {
     EvaluationContext ctxt = new EvaluationContext(jsc, pipeline, options, jssc);
     // update cache candidates
     SparkRunner.updateCacheCandidates(pipeline, translator, ctxt);
-    pipeline.traverseTopologically(new SparkRunner.Evaluator(translator, ctxt));
+    try {
+      pipeline.traverseTopologically(new SparkRunner.Evaluator(translator, ctxt));
+    } catch (RuntimeException e) {
+      jssc.stop(false, false);

Review Comment:
   I created a separate issue/PR for this: https://github.com/apache/beam/pull/23204



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #22620: Support VR test including TestStream for Spark runner in streaming mode

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #22620:
URL: https://github.com/apache/beam/pull/22620#issuecomment-1220603872

   Reminder, please take a look at this pr: @lukecwik 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #22620: Support VR test including TestStream for Spark runner in streaming mode

Posted by GitBox <gi...@apache.org>.
mosche commented on code in PR #22620:
URL: https://github.com/apache/beam/pull/22620#discussion_r969582799


##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/TestDStream.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation.streaming;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder.SparkWatermarks;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.testing.TestStream.ElementEvent;
+import org.apache.beam.sdk.testing.TestStream.ProcessingTimeEvent;
+import org.apache.beam.sdk.testing.TestStream.WatermarkEvent;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.JavaSparkContext$;
+import org.apache.spark.rdd.RDD;
+import org.apache.spark.streaming.StreamingContext;
+import org.apache.spark.streaming.Time;
+import org.apache.spark.streaming.dstream.InputDStream;
+import org.joda.time.Instant;
+import scala.Option;
+import scala.reflect.ClassTag;
+
+public class TestDStream<T> extends InputDStream<WindowedValue<T>> {
+  private final Coder<WindowedValue<T>> coder;
+
+  @SuppressFBWarnings("SE_TRANSIENT_FIELD_NOT_RESTORED") // not intended for use after serialization
+  private final transient List<TestStream.Event<T>> events;
+
+  private int event = 0;
+
+  private boolean insertEmptyBatch = false;
+
+  private long lastMillis = 0;
+
+  private Instant lastWatermark = Instant.EPOCH;
+
+  public TestDStream(TestStream<T> test, StreamingContext ssc) {
+    super(ssc, classTag());
+    this.coder = WindowedValue.getFullCoder(test.getValueCoder(), GlobalWindow.Coder.INSTANCE);
+    this.events = test.getEvents();
+  }
+
+  @Override
+  public Option<RDD<WindowedValue<T>>> compute(Time validTime) {
+    Preconditions.checkStateNotNull(events);
+
+    TestStream.Event<T> event = insertEmptyBatch ? null : nextEvent();

Review Comment:
   Done ✔️ 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #22620: Support VR test including TestStream for Spark runner in streaming mode

Posted by GitBox <gi...@apache.org>.
mosche commented on code in PR #22620:
URL: https://github.com/apache/beam/pull/22620#discussion_r969580706


##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/TestDStream.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation.streaming;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder.SparkWatermarks;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.testing.TestStream.ElementEvent;
+import org.apache.beam.sdk.testing.TestStream.ProcessingTimeEvent;
+import org.apache.beam.sdk.testing.TestStream.WatermarkEvent;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.JavaSparkContext$;
+import org.apache.spark.rdd.RDD;
+import org.apache.spark.streaming.StreamingContext;
+import org.apache.spark.streaming.Time;
+import org.apache.spark.streaming.dstream.InputDStream;
+import org.joda.time.Instant;
+import scala.Option;
+import scala.reflect.ClassTag;
+
+public class TestDStream<T> extends InputDStream<WindowedValue<T>> {
+  private final Coder<WindowedValue<T>> coder;
+
+  @SuppressFBWarnings("SE_TRANSIENT_FIELD_NOT_RESTORED") // not intended for use after serialization
+  private final transient List<TestStream.Event<T>> events;

Review Comment:
   Checker is running, but not catching this apparently. I added `@Nullable`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #22620: Support VR test including TestStream for Spark runner in streaming mode

Posted by GitBox <gi...@apache.org>.
mosche commented on code in PR #22620:
URL: https://github.com/apache/beam/pull/22620#discussion_r969229061


##########
sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java:
##########
@@ -415,7 +418,12 @@ public void testEarlyPanesOfWindow() {
   }
 
   @Test
-  @Category({ValidatesRunner.class, UsesTestStream.class, UsesTestStreamWithMultipleStages.class})
+  @Category({
+    ValidatesRunner.class,
+    UsesTestStream.class,
+    UsesTestStreamWithMultipleStages.class,
+    UsesStatefulParDo.class

Review Comment:
   Done ✔️ 
   https://github.com/apache/beam/pull/23202 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on pull request #22620: Support VR test including TestStream for Spark runner in streaming mode

Posted by GitBox <gi...@apache.org>.
mosche commented on PR #22620:
URL: https://github.com/apache/beam/pull/22620#issuecomment-1208303417

   Unfortunately I'm stuck with some flaky tests. It looks like watermarks are not advanced in a deterministic way.
   Below some logs of `org.apache.beam.sdk.schemas.AvroSchemaTest.testAvroPipelineGroupBy` (edited for readability).
   
   Successful run (watermark advanced early enough, so that timer is triggered):
   
   ```
   14:42:52,938 [3] TRACE SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: input elements: [ValueInGlobalWindow{value=Row1, pane=NO_FIRING}]
   14:42:52,940 [3] TRACE SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: non expired input elements: [ValueInGlobalWindow{value=Row1, pane=NO_FIRING}]
   14:42:52,949 [3] TRACE WindowTracing  - ReduceFnRunner.scheduleGarbageCollectionTimer: Scheduling at GLOBALW_MAX for key:Row2; window:GlobalWindow where inputWatermark:BOUNDEDW_MIN; outputWatermark:null
   14:42:52,957 [3] TRACE WindowTracing  - WatermarkHold.addHolds: element hold at GLOBALW_MAX is on time for key:Row2; window:GlobalWindow; inputWatermark:BOUNDEDW_MIN; outputWatermark:null
   14:42:52,960 [3] DEBUG SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: timerInternals before advance are SparkTimerInternals{highWatermark=BOUNDEDW_MIN, synchronizedProcessingTime=EPOCH, timers=[TimerData{timerId=0, timerFamilyId=, namespace=Window(GlobalWindow), timestamp=GLOBALW_MAX, outputTimestamp=GLOBALW_MAX, domain=EVENT_TIME, deleted=false}], inputWatermark=BOUNDEDW_MIN}
   14:42:52,961 [3] DEBUG SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: timers eligible for processing are [] [inputWatermark: BOUNDEDW_MIN]
   14:42:52,962 [3] TRACE SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: output elements are  0
   ```
   ```
   14:42:53,137 [spark-listener-group-appStatus] INFO  GlobalWatermarkHolder  - Put new watermark block: {0=SparkWatermarks{lowWatermark=BOUNDEDW_MIN, highWatermark=BOUNDEDW_MAX, synchronizedProcessingTime=NOW}}
   ```
   ```
   14:42:53,146 [15] DEBUG SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: timerInternals before advance are SparkTimerInternals{highWatermark=BOUNDEDW_MAX, synchronizedProcessingTime=NOW, timers=[TimerData{timerId=0, timerFamilyId=, namespace=Window(GlobalWindow), timestamp=GLOBALW_MAX, outputTimestamp=GLOBALW_MAX, domain=EVENT_TIME, deleted=false}], inputWatermark=BOUNDEDW_MIN}
   14:42:53,146 [15] DEBUG SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: timers eligible for processing are [TimerData{timerId=0, timerFamilyId=, namespace=Window(GlobalWindow), timestamp=GLOBALW_MAX, outputTimestamp=GLOBALW_MAX, domain=EVENT_TIME, deleted=false}] [inputWatermark: BOUNDEDW_MAX]
   ```
   ```
   14:42:53,146 [15] DEBUG WindowTracing  - ReduceFnRunner: Received timer key:Row2; window:GlobalWindow; data:TimerData{timerId=0, timerFamilyId=, namespace=Window(GlobalWindow), timestamp=GLOBALW_MAX, outputTimestamp=GLOBALW_MAX, domain=EVENT_TIME, deleted=false} with inputWatermark:BOUNDEDW_MAX; outputWatermark:null
   14:42:53,148 [15] DEBUG WindowTracing  - ReduceFnRunner: Cleaning up for key:Row2; window:GlobalWindow with inputWatermark:BOUNDEDW_MAX; outputWatermark:null
   14:42:53,148 [15] DEBUG WindowTracing  - WatermarkHold.extractAndRelease: for key:Row2; window:GlobalWindow; inputWatermark:BOUNDEDW_MAX; outputWatermark:null
   14:42:53,149 [15] DEBUG WindowTracing  - WatermarkHold.extractAndRelease.read: clearing for key:Row2; window:GlobalWindow
   14:42:53,150 [15] DEBUG WindowTracing  - describePane: ON_TIME pane (prev was null) for key:Row2; windowMaxTimestamp:GLOBALW_MAX; inputWatermark:BOUNDEDW_MAX; outputWatermark:null; isLateForOutput:false
   14:42:53,152 [15] TRACE WindowTracing  - ReduceFnRunner.onTrigger: outputWindowedValue key:Row2 value:[Row1] at GLOBALW_MAX
   14:42:53,152 [15] DEBUG WindowTracing  - WatermarkHold.clearHolds: For key:Row2; window:GlobalWindow; inputWatermark:BOUNDEDW_MAX; outputWatermark:null
   14:42:53,153 [15] TRACE SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: output elements are TimestampedValueInGlobalWindow{value=KV{Row2, [Row1]}, timestamp=GLOBALW_MAX, pane=PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, onTimeIndex=0}} 1
   ```
   
   Failed run (watermark is advanced too late, element is lost):
   ```
   14:41:51,453 [3] TRACE SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: input elements: [ValueInGlobalWindow{value=Row1, pane=NO_FIRING}]
   14:41:51,455 [3] TRACE SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: non expired input elements: [ValueInGlobalWindow{value=Row1, pane=NO_FIRING}]
   14:41:51,463 [3] TRACE WindowTracing  - ReduceFnRunner.scheduleGarbageCollectionTimer: Scheduling at GLOBALW_MAX for key:Row2; window:GlobalWindow where inputWatermark:BOUNDEDW_MIN; outputWatermark:null
   14:41:51,471 [3] TRACE WindowTracing  - WatermarkHold.addHolds: element hold at GLOBALW_MAX is on time for key:Row2; window:GlobalWindow; inputWatermark:BOUNDEDW_MIN; outputWatermark:null
   14:41:51,474 [3] DEBUG SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: timerInternals before advance are SparkTimerInternals{highWatermark=BOUNDEDW_MIN, synchronizedProcessingTime=EPOCH, timers=[TimerData{timerId=0, timerFamilyId=, namespace=Window(GlobalWindow), timestamp=GLOBALW_MAX, outputTimestamp=GLOBALW_MAX, domain=EVENT_TIME, deleted=false}], inputWatermark=BOUNDEDW_MIN}
   14:41:51,474 [3] DEBUG SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: timers eligible for processing are [] [inputWatermark: BOUNDEDW_MIN]
   14:41:51,476 [3] TRACE SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: output elements are  0
   ```
   ```
   14:41:51,658 [15] DEBUG SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: timerInternals before advance are SparkTimerInternals{highWatermark=BOUNDEDW_MIN, synchronizedProcessingTime=EPOCH, timers=[TimerData{timerId=0, timerFamilyId=, namespace=Window(GlobalWindow), timestamp=GLOBALW_MAX, outputTimestamp=GLOBALW_MAX, domain=EVENT_TIME, deleted=false}], inputWatermark=BOUNDEDW_MIN}
   14:41:51,658 [15] DEBUG SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: timers eligible for processing are [] [inputWatermark: BOUNDEDW_MIN]
   14:41:51,658 [15] TRACE SparkGroupAlsoByWindowViaWindowSet  - Group.ByFields/ToKvs/GroupByKey: output elements are  0
   ```
   ```
   14:41:51,662 [spark-listener-group-appStatus] INFO  GlobalWatermarkHolder  - Put new watermark block: {0=SparkWatermarks{lowWatermark=BOUNDEDW_MIN, highWatermark=BOUNDEDW_MAX, synchronizedProcessingTime=NOW}}
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #22620: Support VR test including TestStream for Spark runner in streaming mode

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #22620:
URL: https://github.com/apache/beam/pull/22620#issuecomment-1231584609

   Reminder, please take a look at this pr: @kennknowles 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #22620: Support VR test including TestStream for Spark runner in streaming mode

Posted by GitBox <gi...@apache.org>.
mosche commented on code in PR #22620:
URL: https://github.com/apache/beam/pull/22620#discussion_r942514727


##########
sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java:
##########
@@ -105,6 +105,9 @@ public void testLateDataAccumulating() {
             .advanceWatermarkTo(instant.plus(Duration.standardMinutes(6)))
             // These elements are late but within the allowed lateness
             .addElements(TimestampedValue.of(4L, instant), TimestampedValue.of(5L, instant))
+            .advanceWatermarkTo(instant.plus(Duration.standardMinutes(10)))

Review Comment:
   @kennknowles Maybe you could answer this? I'm wondering if this is an issue of the Spark streaming runner (and how this is handled by other runners) or if it's a lack of my own understanding. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #22620: Support VR test including TestStream for Spark runner in streaming mode

Posted by GitBox <gi...@apache.org>.
mosche commented on code in PR #22620:
URL: https://github.com/apache/beam/pull/22620#discussion_r942514727


##########
sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java:
##########
@@ -105,6 +105,9 @@ public void testLateDataAccumulating() {
             .advanceWatermarkTo(instant.plus(Duration.standardMinutes(6)))
             // These elements are late but within the allowed lateness
             .addElements(TimestampedValue.of(4L, instant), TimestampedValue.of(5L, instant))
+            .advanceWatermarkTo(instant.plus(Duration.standardMinutes(10)))

Review Comment:
   @kennknowles Maybe you could answer this? I'm wondering if this is an issue of the Spark streaming runner (and how this is handled by other runners) or if it's a lack of my own understanding. 
   > Without advancing the watermark once more the (lower) input watermark remains at 6 mins, but data in [0,5 min) won't be considered late until it passes 10 mins.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on pull request #22620: Support VR test including TestStream for Spark runner in streaming mode

Posted by GitBox <gi...@apache.org>.
mosche commented on PR #22620:
URL: https://github.com/apache/beam/pull/22620#issuecomment-1211954623

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on pull request #22620: Support VR test including TestStream for Spark runner in streaming mode

Posted by GitBox <gi...@apache.org>.
mosche commented on PR #22620:
URL: https://github.com/apache/beam/pull/22620#issuecomment-1246776582

   I took a bit of a turn here after validating my initial approach replacing bounded sources with `UnboundedReadFromBoundedSource` with VR tests in Flink:
   
   - Tests that failed likely due to watermark issues with the Spark runner (#23129, see [test results](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_PR/494/)) ran fine with Flink suggesting there really is a major problem (in streaming mode).
   
   - Nevertheless, it also showed that the approach is somehow flawed. Some bounded test cases simply cannot be forced into a streaming execution, e.g. any GroupByKey will fail on the GlobalWindow if there's no trigger set.
   
   The initial reason for this approach was to prevent the Spark runner from failing when streaming was forced via pipeline options in VR tests for bounded test cases: Spark refuses to start if there's no streaming workload scheduled. 
   Instead `TestSparkRunner` now just detects the translation mode and acts accordingly.
   
   Unfortunately, this hides any watermark issues uncovered above as VR tests succeed.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #22620: Support VR test including TestStream for Spark runner in streaming mode

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #22620:
URL: https://github.com/apache/beam/pull/22620#issuecomment-1262191559

   Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @kennknowles for label java.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #22620: Support VR test including TestStream for Spark runner in streaming mode

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #22620:
URL: https://github.com/apache/beam/pull/22620#issuecomment-1257941248

   Reminder, please take a look at this pr: @lukecwik 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #22620: Support VR test including TestStream for Spark runner in streaming mode

Posted by GitBox <gi...@apache.org>.
mosche commented on code in PR #22620:
URL: https://github.com/apache/beam/pull/22620#discussion_r944132001


##########
sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java:
##########
@@ -105,6 +105,9 @@ public void testLateDataAccumulating() {
             .advanceWatermarkTo(instant.plus(Duration.standardMinutes(6)))
             // These elements are late but within the allowed lateness
             .addElements(TimestampedValue.of(4L, instant), TimestampedValue.of(5L, instant))
+            .advanceWatermarkTo(instant.plus(Duration.standardMinutes(10)))

Review Comment:
   Thanks a lot! I'm off as well for a bit, so no rush on this. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on pull request #22620: Support VR test including TestStream for Spark runner in streaming mode

Posted by GitBox <gi...@apache.org>.
mosche commented on PR #22620:
URL: https://github.com/apache/beam/pull/22620#issuecomment-1211903214

   Run Spark ValidatesRunner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #22620: Support VR test including TestStream for Spark runner in streaming mode

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #22620:
URL: https://github.com/apache/beam/pull/22620#issuecomment-1208318011

   Assigning reviewers. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @lukecwik for label java.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
   
   The PR bot will only process comments in the main thread (not review comments).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kennknowles commented on a diff in pull request #22620: Support VR test including TestStream for Spark runner in streaming mode

Posted by GitBox <gi...@apache.org>.
kennknowles commented on code in PR #22620:
URL: https://github.com/apache/beam/pull/22620#discussion_r968565602


##########
sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java:
##########
@@ -415,7 +418,12 @@ public void testEarlyPanesOfWindow() {
   }
 
   @Test
-  @Category({ValidatesRunner.class, UsesTestStream.class, UsesTestStreamWithMultipleStages.class})
+  @Category({
+    ValidatesRunner.class,
+    UsesTestStream.class,
+    UsesTestStreamWithMultipleStages.class,
+    UsesStatefulParDo.class

Review Comment:
   This small change should be an independent commit that comes first. Then if there is any trouble we will only revert the bigger commit but not this fixup.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #22620: Support VR test including TestStream for Spark runner in streaming mode

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #22620:
URL: https://github.com/apache/beam/pull/22620#issuecomment-1243651048

   Reminder, please take a look at this pr: @lukecwik 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on pull request #22620: Support VR test including TestStream for Spark runner in streaming mode

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on PR #22620:
URL: https://github.com/apache/beam/pull/22620#issuecomment-1262435134

   @kennknowles kind ping, are you ok to merge it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on pull request #22620: Support VR test including TestStream for Spark runner in streaming mode

Posted by GitBox <gi...@apache.org>.
mosche commented on PR #22620:
URL: https://github.com/apache/beam/pull/22620#issuecomment-1208289878

   Run Spark ValidatesRunner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on pull request #22620: Support VR test including TestStream for Spark runner in streaming mode

Posted by GitBox <gi...@apache.org>.
mosche commented on PR #22620:
URL: https://github.com/apache/beam/pull/22620#issuecomment-1209086431

   Run Spark ValidatesRunner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #22620: Support VR test including TestStream for Spark runner in streaming mode

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #22620:
URL: https://github.com/apache/beam/pull/22620#issuecomment-1223984921

   Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @kennknowles for label java.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #22620: Support VR test including TestStream for Spark runner in streaming mode

Posted by GitBox <gi...@apache.org>.
mosche commented on code in PR #22620:
URL: https://github.com/apache/beam/pull/22620#discussion_r968584728


##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/TestDStream.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation.streaming;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder.SparkWatermarks;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.testing.TestStream.ElementEvent;
+import org.apache.beam.sdk.testing.TestStream.ProcessingTimeEvent;
+import org.apache.beam.sdk.testing.TestStream.WatermarkEvent;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.JavaSparkContext$;
+import org.apache.spark.rdd.RDD;
+import org.apache.spark.streaming.StreamingContext;
+import org.apache.spark.streaming.Time;
+import org.apache.spark.streaming.dstream.InputDStream;
+import org.joda.time.Instant;
+import scala.Option;
+import scala.reflect.ClassTag;
+
+public class TestDStream<T> extends InputDStream<WindowedValue<T>> {
+  private final Coder<WindowedValue<T>> coder;
+
+  @SuppressFBWarnings("SE_TRANSIENT_FIELD_NOT_RESTORED") // not intended for use after serialization
+  private final transient List<TestStream.Event<T>> events;

Review Comment:
   Hmm, checker is enabled, not sure if that's a miss or due to the findbugs suppression. I'll have a look



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #22620: Support VR test including TestStream for Spark runner in streaming mode

Posted by GitBox <gi...@apache.org>.
mosche commented on code in PR #22620:
URL: https://github.com/apache/beam/pull/22620#discussion_r969534303


##########
runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java:
##########
@@ -153,4 +192,84 @@ private static void awaitWatermarksOrTimeout(
     } while ((timeoutMillis -= batchDurationMillis) > 0
         && globalWatermark.isBefore(stopPipelineWatermark));
   }
+
+  /**
+   * Override factory to replace {@link Read.Unbounded} with {@link UnboundedReadFromBoundedSource}
+   * to force streaming mode.
+   */
+  private static class UnboundedReadFromBoundedSourceOverrideFactory<T>

Review Comment:
   Happy to do that though I'm not entirely sure if the factory is of much value by itself. I also had to fix the outputs in a non trivial way using a visitor after the replacement, see https://github.com/apache/beam/pull/22620/files#diff-d81f49eb0330230bd03ce6cd33b5f70f59c443aac57741e877ececbada32b16bR246-R274
   
   I couldn't find a way to achieve this in `mapOutputs` of the override factory itself. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kennknowles commented on a diff in pull request #22620: Support VR test including TestStream for Spark runner in streaming mode

Posted by GitBox <gi...@apache.org>.
kennknowles commented on code in PR #22620:
URL: https://github.com/apache/beam/pull/22620#discussion_r968558666


##########
runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java:
##########
@@ -153,4 +192,84 @@ private static void awaitWatermarksOrTimeout(
     } while ((timeoutMillis -= batchDurationMillis) > 0
         && globalWatermark.isBefore(stopPipelineWatermark));
   }
+
+  /**
+   * Override factory to replace {@link Read.Unbounded} with {@link UnboundedReadFromBoundedSource}
+   * to force streaming mode.
+   */
+  private static class UnboundedReadFromBoundedSourceOverrideFactory<T>

Review Comment:
   This seems useful as a general thing that could be in `runners-core-construction` FWIW.



##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/TestDStream.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation.streaming;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder.SparkWatermarks;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.testing.TestStream.ElementEvent;
+import org.apache.beam.sdk.testing.TestStream.ProcessingTimeEvent;
+import org.apache.beam.sdk.testing.TestStream.WatermarkEvent;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.JavaSparkContext$;
+import org.apache.spark.rdd.RDD;
+import org.apache.spark.streaming.StreamingContext;
+import org.apache.spark.streaming.Time;
+import org.apache.spark.streaming.dstream.InputDStream;
+import org.joda.time.Instant;
+import scala.Option;
+import scala.reflect.ClassTag;
+
+public class TestDStream<T> extends InputDStream<WindowedValue<T>> {
+  private final Coder<WindowedValue<T>> coder;
+
+  @SuppressFBWarnings("SE_TRANSIENT_FIELD_NOT_RESTORED") // not intended for use after serialization
+  private final transient List<TestStream.Event<T>> events;

Review Comment:
   I think all transient fields should be `@Nullable`. Is checkerframework disabled at the whole package level?



##########
runners/spark/spark_runner.gradle:
##########
@@ -260,21 +265,60 @@ def validatesRunnerStreaming = tasks.register("validatesRunnerStreaming", Test)
   group = "Verification"
   // Disable gradle cache
   outputs.upToDateWhen { false }
-  def pipelineOptions = JsonOutput.toJson([
-    "--runner=TestSparkRunner",
-    "--forceStreaming=true",
-    "--enableSparkMetricSinks=true",
-  ])
-  systemProperty "beamTestPipelineOptions", pipelineOptions
+  systemProperties sparkTestProperties(["--enableSparkMetricSinks": "true", "--forceStreaming": "true"])
 
   classpath = configurations.validatesRunner
-  testClassesDirs += files(project.sourceSets.test.output.classesDirs)
+  testClassesDirs += files(
+    project(":sdks:java:core").sourceSets.test.output.classesDirs,

Review Comment:
   This change makes sense. I don't understand how it worked before. Was it not actually running the VR tests?



##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java:
##########
@@ -85,7 +85,14 @@ public JavaStreamingContext call() throws Exception {
     EvaluationContext ctxt = new EvaluationContext(jsc, pipeline, options, jssc);
     // update cache candidates
     SparkRunner.updateCacheCandidates(pipeline, translator, ctxt);
-    pipeline.traverseTopologically(new SparkRunner.Evaluator(translator, ctxt));
+    try {
+      pipeline.traverseTopologically(new SparkRunner.Evaluator(translator, ctxt));
+    } catch (RuntimeException e) {
+      jssc.stop(false, false);

Review Comment:
   Identifiers like `jsc` and `jssc` are very unreadable to me who is not doing Spark stuff every day.



##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/TestDStream.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation.streaming;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder.SparkWatermarks;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.testing.TestStream.ElementEvent;
+import org.apache.beam.sdk.testing.TestStream.ProcessingTimeEvent;
+import org.apache.beam.sdk.testing.TestStream.WatermarkEvent;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.JavaSparkContext$;
+import org.apache.spark.rdd.RDD;
+import org.apache.spark.streaming.StreamingContext;
+import org.apache.spark.streaming.Time;
+import org.apache.spark.streaming.dstream.InputDStream;
+import org.joda.time.Instant;
+import scala.Option;
+import scala.reflect.ClassTag;
+
+public class TestDStream<T> extends InputDStream<WindowedValue<T>> {
+  private final Coder<WindowedValue<T>> coder;
+
+  @SuppressFBWarnings("SE_TRANSIENT_FIELD_NOT_RESTORED") // not intended for use after serialization
+  private final transient List<TestStream.Event<T>> events;
+
+  private int event = 0;
+
+  private boolean insertEmptyBatch = false;
+
+  private long lastMillis = 0;
+
+  private Instant lastWatermark = Instant.EPOCH;
+
+  public TestDStream(TestStream<T> test, StreamingContext ssc) {
+    super(ssc, classTag());
+    this.coder = WindowedValue.getFullCoder(test.getValueCoder(), GlobalWindow.Coder.INSTANCE);
+    this.events = test.getEvents();
+  }
+
+  @Override
+  public Option<RDD<WindowedValue<T>>> compute(Time validTime) {
+    Preconditions.checkStateNotNull(events);
+
+    TestStream.Event<T> event = insertEmptyBatch ? null : nextEvent();

Review Comment:
   Confusing with `this.event`. I would rename `this.event` to `this.currentEventIndex` or something.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on pull request #22620: Support VR test including TestStream for Spark runner in streaming mode

Posted by GitBox <gi...@apache.org>.
mosche commented on PR #22620:
URL: https://github.com/apache/beam/pull/22620#issuecomment-1246679353

   Run Spark ValidatesRunner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org