You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by sc...@apache.org on 2018/12/14 21:47:58 UTC
[beam] branch master updated: [BEAM-6225] Setup Jenkins Job to Run
VR with ExecutableStage (#7271)
This is an automated email from the ASF dual-hosted git repository.
scott pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new b7035c1 [BEAM-6225] Setup Jenkins Job to Run VR with ExecutableStage (#7271)
b7035c1 is described below
commit b7035c1a098c526356c6fb33480b989ed037da0a
Author: Boyuan Zhang <36...@users.noreply.github.com>
AuthorDate: Fri Dec 14 13:47:48 2018 -0800
[BEAM-6225] Setup Jenkins Job to Run VR with ExecutableStage (#7271)
---
...unner_DataflowPortabilityExecutableStage.groovy | 54 ++++++++++++++++++++++
runners/google-cloud-dataflow-java/build.gradle | 48 +++++++++++++++++--
...aflowPortabilityExecutableStageUnsupported.java | 25 ++++++++++
.../apache/beam/sdk/testing/UsesSideInputs.java | 24 ++++++++++
.../org/apache/beam/sdk/testing/PAssertTest.java | 12 ++---
.../apache/beam/sdk/transforms/CombineTest.java | 30 ++++++------
.../apache/beam/sdk/transforms/FlattenTest.java | 3 +-
.../apache/beam/sdk/transforms/GroupByKeyTest.java | 11 +++--
.../org/apache/beam/sdk/transforms/ParDoTest.java | 17 +++----
.../apache/beam/sdk/transforms/ReshuffleTest.java | 11 +++--
.../beam/sdk/transforms/SplittableDoFnTest.java | 10 +++-
.../beam/sdk/transforms/join/CoGroupByKeyTest.java | 8 ++--
.../beam/sdk/transforms/windowing/WindowTest.java | 5 +-
.../sdk/transforms/windowing/WindowingTest.java | 9 ++--
14 files changed, 212 insertions(+), 55 deletions(-)
diff --git a/.test-infra/jenkins/job_PostCommit_Java_ValidatesRunner_DataflowPortabilityExecutableStage.groovy b/.test-infra/jenkins/job_PostCommit_Java_ValidatesRunner_DataflowPortabilityExecutableStage.groovy
new file mode 100644
index 0000000..62e7361
--- /dev/null
+++ b/.test-infra/jenkins/job_PostCommit_Java_ValidatesRunner_DataflowPortabilityExecutableStage.groovy
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import CommonJobProperties as commonJobProperties
+import PostcommitJobBuilder
+
+
+// This job runs the suite of ValidatesRunner tests against the Dataflow
+// runner.
+PostcommitJobBuilder.postCommitJob('beam_PostCommit_Java_ValidatesRunner_DataflowPortabilityExecutableStage',
+ 'Run Dataflow Portability ExecutableStage ValidatesRunner', 'Google Cloud Dataflow Runner PortabilityApi ExecutableStage ValidatesRunner Tests', this) {
+
+ description('Runs the ValidatesRunner suite on the Dataflow PortabilityApi runner with ExecutableStage code path enabled.')
+
+ // Set common parameters. Sets a 3 hour timeout.
+ commonJobProperties.setTopLevelMainJobProperties(delegate, 'master', 400)
+
+ // Publish all test results to Jenkins
+ publishers {
+ archiveJunit('**/build/test-results/**/*.xml')
+ }
+
+ // Gradle goals for this job.
+ steps {
+ gradle {
+ rootBuildScriptDir(commonJobProperties.checkoutDir)
+ tasks(':beam-runners-google-cloud-dataflow-java:validatesRunnerFnApiWorkerExecutableStageTest')
+ // Increase parallel worker threads above processor limit since most time is
+ // spent waiting on Dataflow jobs. ValidatesRunner tests on Dataflow are slow
+ // because each one launches a Dataflow job with about 3 mins of overhead.
+ // 3 x num_cores strikes a good balance between maxing out parallelism without
+ // overloading the machines.
+ commonJobProperties.setGradleSwitches(delegate, 3 * Runtime.runtime.availableProcessors())
+ }
+ }
+
+ // [BEAM-6236] "use_executable_stage_bundle_execution" hasn't been rolled out.
+ disabled()
+}
diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle
index c0b831c..9c6aaf4 100644
--- a/runners/google-cloud-dataflow-java/build.gradle
+++ b/runners/google-cloud-dataflow-java/build.gradle
@@ -241,17 +241,55 @@ task validatesRunnerFnApiWorkerTest(type: Test) {
}
}
+task validatesRunnerFnApiWorkerExecutableStageTest(type: Test) {
+ group = "Verification"
+ description "Validates Dataflow PortabilityApi runner"
+ dependsOn ":beam-runners-google-cloud-dataflow-java-fn-api-worker:shadowJar"
+ dependsOn buildAndPushDockerContainer
+
+ systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
+ "--runner=TestDataflowRunner",
+ "--project=${dataflowProject}",
+ "--tempRoot=${dataflowPostCommitTempRoot}",
+ "--dataflowWorkerJar=${dataflowFnApiWorkerJar}",
+ "--workerHarnessContainerImage=${dockerImageContainer}:${dockerTag}",
+ "--experiments=beam_fn_api,use_executable_stage_bundle_execution"]
+ )
+
+ // Increase test parallelism up to the number of Gradle workers. By default this is equal
+ // to the number of CPU cores, but can be increased by setting --max-workers=N.
+ maxParallelForks Integer.MAX_VALUE
+ classpath = configurations.validatesRunner
+ testClassesDirs = files(project(":beam-sdks-java-core").sourceSets.test.output.classesDirs)
+ // TODO(BEAM-6232): ViewTest tests sideinputs, which is not supported bu current bundle execution.
+ exclude '**/ViewTest.class'
+ useJUnit {
+ includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
+ commonExcludeCategories.each {
+ excludeCategories it
+ }
+ fnApiWorkerExcludeCategories.each {
+ excludeCategories it
+ }
+ // TODO(BEAM-6232): Support sideinput.
+ excludeCategories 'org.apache.beam.sdk.testing.UsesSideInputs'
+ // TODO(BEAM-6233): Support timer and state.
+ excludeCategories 'org.apache.beam.sdk.testing.UsesStatefulParDo'
+ // TODO(BEAM-6231): Triage failures.
+ excludeCategories 'org.apache.beam.sdk.testing.DataflowPortabilityExecutableStageUnsupported'
+ }
+}
+
task validatesRunner {
group = "Verification"
description "Validates Dataflow runner"
dependsOn validatesRunnerLegacyWorkerTest
}
-task validatesRunnerPortabilityApi {
- group = "Verification"
- description "Validates Dataflow PortabilityApi runner"
- dependsOn validatesRunnerFnApiWorkerTest
- dependsOn buildAndPushDockerContainer
+task validatesRunnerPortabilityApiExecutableStage {
+ group = "Verification"
+ description "Validates Dataflow PortabilityApi runner"
+ dependsOn validatesRunnerFnApiWorkerExecutableStageTest
}
task googleCloudPlatformLegacyWorkerIntegrationTest(type: Test) {
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/DataflowPortabilityExecutableStageUnsupported.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/DataflowPortabilityExecutableStageUnsupported.java
new file mode 100644
index 0000000..a14885f
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/DataflowPortabilityExecutableStageUnsupported.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+/**
+ * Category tag for validation tests which are not supported by Dataflow portable worker with
+ * use_exetuable_stage_bundle_execution, which needs more investigations.
+ */
+// TODO(BEAM-6231): Triage test failures introduced by using ExecutableStage.
+public interface DataflowPortabilityExecutableStageUnsupported {}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSideInputs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSideInputs.java
new file mode 100644
index 0000000..907c13e
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSideInputs.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+/**
+ * Category tag for validation tests which use sideinputs. Tests tagged with {@link UsesSideInputs}
+ * should be run for runners which support sideinputs.
+ */
+public interface UsesSideInputs {}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
index bf7ee69..160f709 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
@@ -297,7 +297,7 @@ public class PAssertTest implements Serializable {
/** Basic test for {@code isEqualTo}. */
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, UsesSideInputs.class})
public void testIsEqualTo() throws Exception {
PCollection<Integer> pcollection = pipeline.apply(Create.of(43));
PAssert.thatSingleton(pcollection).isEqualTo(43);
@@ -306,7 +306,7 @@ public class PAssertTest implements Serializable {
/** Basic test for {@code isEqualTo}. */
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class})
public void testWindowedIsEqualTo() throws Exception {
PCollection<Integer> pcollection =
pipeline
@@ -326,7 +326,7 @@ public class PAssertTest implements Serializable {
/** Basic test for {@code notEqualTo}. */
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, UsesSideInputs.class})
public void testNotEqualTo() throws Exception {
PCollection<Integer> pcollection = pipeline.apply(Create.of(43));
PAssert.thatSingleton(pcollection).notEqualTo(42);
@@ -335,7 +335,7 @@ public class PAssertTest implements Serializable {
/** Test that we throw an error for false assertion on singleton. */
@Test
- @Category({ValidatesRunner.class, UsesFailureMessage.class})
+ @Category({ValidatesRunner.class, UsesFailureMessage.class, UsesSideInputs.class})
public void testPAssertEqualsSingletonFalse() throws Exception {
PCollection<Integer> pcollection = pipeline.apply(Create.of(42));
PAssert.thatSingleton("The value was not equal to 44", pcollection).isEqualTo(44);
@@ -351,7 +351,7 @@ public class PAssertTest implements Serializable {
/** Test that we throw an error for false assertion on singleton. */
@Test
- @Category({ValidatesRunner.class, UsesFailureMessage.class})
+ @Category({ValidatesRunner.class, UsesFailureMessage.class, UsesSideInputs.class})
public void testPAssertEqualsSingletonFalseDefaultReasonString() throws Exception {
PCollection<Integer> pcollection = pipeline.apply(Create.of(42));
PAssert.thatSingleton(pcollection).isEqualTo(44);
@@ -385,7 +385,7 @@ public class PAssertTest implements Serializable {
/** Tests that windowed {@code containsInAnyOrder} is actually order-independent. */
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class})
public void testWindowedContainsInAnyOrder() throws Exception {
PCollection<Integer> pcollection =
pipeline
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index 2d825dc..21f7cda 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -58,9 +58,11 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.testing.DataflowPortabilityApiUnsupported;
+import org.apache.beam.sdk.testing.DataflowPortabilityExecutableStageUnsupported;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.UsesSideInputs;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.CombineTest.SharedTestBase.TestCombineFn.Accumulator;
@@ -662,7 +664,7 @@ public class CombineTest implements Serializable {
}
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, UsesSideInputs.class})
public void testBasicCombine() {
runTestBasicCombine(
Arrays.asList(KV.of("a", 1), KV.of("a", 1), KV.of("a", 4), KV.of("b", 1), KV.of("b", 13)),
@@ -673,7 +675,7 @@ public class CombineTest implements Serializable {
}
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, UsesSideInputs.class})
public void testBasicCombineEmpty() {
runTestBasicCombine(EMPTY_TABLE, ImmutableSet.of(), Collections.emptyList());
}
@@ -950,7 +952,7 @@ public class CombineTest implements Serializable {
@RunWith(JUnit4.class)
public static class CombineWithContextTests extends SharedTestBase {
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, UsesSideInputs.class})
@SuppressWarnings({"rawtypes", "unchecked"})
public void testSimpleCombineWithContext() {
runTestSimpleCombineWithContext(
@@ -961,7 +963,7 @@ public class CombineTest implements Serializable {
}
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, UsesSideInputs.class})
public void testSimpleCombineWithContextEmpty() {
runTestSimpleCombineWithContext(EMPTY_TABLE, 0, Collections.emptyList(), new String[] {});
}
@@ -1022,7 +1024,7 @@ public class CombineTest implements Serializable {
}
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class})
public void testFixedWindowsCombineWithContext() {
PCollection<KV<String, Integer>> perKeyInput =
pipeline
@@ -1064,7 +1066,7 @@ public class CombineTest implements Serializable {
}
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, UsesSideInputs.class})
public void testSlidingWindowsCombine() {
PCollection<String> input =
pipeline
@@ -1123,7 +1125,7 @@ public class CombineTest implements Serializable {
}
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class})
public void testSlidingWindowsCombineWithContext() {
// [a: 1, 1], [a: 4; b: 1], [b: 13]
PCollection<KV<String, Integer>> perKeyInput =
@@ -1174,7 +1176,7 @@ public class CombineTest implements Serializable {
}
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, UsesSideInputs.class})
public void testGlobalCombineWithDefaultsAndTriggers() {
PCollection<Integer> input = pipeline.apply(Create.of(1, 1));
@@ -1201,7 +1203,7 @@ public class CombineTest implements Serializable {
}
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class})
public void testSessionsCombine() {
PCollection<KV<String, Integer>> input =
pipeline
@@ -1227,7 +1229,7 @@ public class CombineTest implements Serializable {
}
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class})
public void testSessionsCombineWithContext() {
PCollection<KV<String, Integer>> perKeyInput =
pipeline.apply(
@@ -1292,7 +1294,7 @@ public class CombineTest implements Serializable {
}
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, UsesSideInputs.class})
public void testCombineGloballyAsSingletonView() {
final PCollectionView<Integer> view =
pipeline
@@ -1318,7 +1320,7 @@ public class CombineTest implements Serializable {
}
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class})
public void testWindowedCombineGloballyAsSingletonView() {
FixedWindows windowFn = FixedWindows.of(Duration.standardMinutes(1));
final PCollectionView<Integer> view =
@@ -1362,7 +1364,7 @@ public class CombineTest implements Serializable {
/** Tests creation of a global {@link Combine} via Java 8 lambda. */
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, UsesSideInputs.class})
public void testCombineGloballyLambda() {
PCollection<Integer> output =
@@ -1384,7 +1386,7 @@ public class CombineTest implements Serializable {
/** Tests creation of a global {@link Combine} via a Java 8 method reference. */
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, UsesSideInputs.class})
public void testCombineGloballyInstanceMethodReference() {
PCollection<Integer> output =
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
index aa25136..693598b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
@@ -50,6 +50,7 @@ import org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.UsesSideInputs;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Sessions;
@@ -184,7 +185,7 @@ public class FlattenTest implements Serializable {
}
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, UsesSideInputs.class})
public void testEmptyFlattenAsSideInput() {
final PCollectionView<Iterable<String>> view =
PCollectionList.<String>empty(p)
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
index 2049e10..c6bd909 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
@@ -49,6 +49,7 @@ import org.apache.beam.sdk.coders.MapCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.testing.DataflowPortabilityApiUnsupported;
+import org.apache.beam.sdk.testing.DataflowPortabilityExecutableStageUnsupported;
import org.apache.beam.sdk.testing.LargeKeys;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
@@ -254,7 +255,7 @@ public class GroupByKeyTest implements Serializable {
* two values.
*/
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class})
public void testTimestampCombinerEarliest() {
p.apply(
@@ -275,7 +276,7 @@ public class GroupByKeyTest implements Serializable {
* the windowing function customized to use the latest value.
*/
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class})
public void testTimestampCombinerLatest() {
p.apply(
Create.timestamped(
@@ -382,7 +383,7 @@ public class GroupByKeyTest implements Serializable {
@RunWith(JUnit4.class)
public static class WindowTests extends SharedTestBase {
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class})
public void testGroupByKeyAndWindows() {
List<KV<String, Integer>> ungroupedPairs =
Arrays.asList(
@@ -423,7 +424,7 @@ public class GroupByKeyTest implements Serializable {
}
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class})
public void testGroupByKeyMultipleWindows() {
PCollection<KV<String, Integer>> windowedInput =
p.apply(
@@ -453,7 +454,7 @@ public class GroupByKeyTest implements Serializable {
}
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class})
public void testGroupByKeyMergingWindows() {
PCollection<KV<String, Integer>> windowedInput =
p.apply(
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index dab9f13..3264bde 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -82,12 +82,14 @@ import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.testing.DataflowPortabilityApiUnsupported;
+import org.apache.beam.sdk.testing.DataflowPortabilityExecutableStageUnsupported;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.testing.UsesMapState;
import org.apache.beam.sdk.testing.UsesSetState;
+import org.apache.beam.sdk.testing.UsesSideInputs;
import org.apache.beam.sdk.testing.UsesStatefulParDo;
import org.apache.beam.sdk.testing.UsesTestStream;
import org.apache.beam.sdk.testing.UsesTimersInParDo;
@@ -342,7 +344,6 @@ public class ParDoTest implements Serializable {
pipeline.apply(Create.of(inputs)).apply(ParDo.of(new TestDoFn()));
PAssert.that(output).satisfies(ParDoTest.HasExpectedOutput.forInput(inputs));
-
pipeline.run();
}
@@ -706,7 +707,7 @@ public class ParDoTest implements Serializable {
}
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, UsesSideInputs.class})
public void testParDoWithSideInputs() {
List<Integer> inputs = Arrays.asList(3, -42, 666);
@@ -738,7 +739,7 @@ public class ParDoTest implements Serializable {
}
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, UsesSideInputs.class})
public void testParDoWithSideInputsIsCumulative() {
List<Integer> inputs = Arrays.asList(3, -42, 666);
@@ -772,7 +773,7 @@ public class ParDoTest implements Serializable {
}
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, UsesSideInputs.class})
public void testMultiOutputParDoWithSideInputs() {
List<Integer> inputs = Arrays.asList(3, -42, 666);
@@ -810,7 +811,7 @@ public class ParDoTest implements Serializable {
}
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, UsesSideInputs.class})
public void testMultiOutputParDoWithSideInputsIsCumulative() {
List<Integer> inputs = Arrays.asList(3, -42, 666);
@@ -866,7 +867,7 @@ public class ParDoTest implements Serializable {
}
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, UsesSideInputs.class})
public void testSideInputsWithMultipleWindows() {
// Tests that the runner can safely run a DoFn that uses side inputs
// on an input where the element is in multiple windows. The complication is
@@ -1186,7 +1187,7 @@ public class ParDoTest implements Serializable {
}
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class})
public void testWindowingInStartAndFinishBundle() {
final FixedWindows windowFn = FixedWindows.of(Duration.millis(1));
@@ -1961,7 +1962,7 @@ public class ParDoTest implements Serializable {
}
@Test
- @Category({ValidatesRunner.class, UsesStatefulParDo.class})
+ @Category({ValidatesRunner.class, UsesStatefulParDo.class, UsesSideInputs.class})
public void testBagStateSideInput() {
final PCollectionView<List<Integer>> listView =
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java
index 6c2c94b..5d9c7f2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java
@@ -30,6 +30,7 @@ import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.testing.DataflowPortabilityExecutableStageUnsupported;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
@@ -149,7 +150,7 @@ public class ReshuffleTest implements Serializable {
}
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class})
public void testReshuffleAfterSessionsAndGroupByKey() {
PCollection<KV<String, Iterable<Integer>>> input =
@@ -170,7 +171,7 @@ public class ReshuffleTest implements Serializable {
}
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class})
public void testReshuffleAfterFixedWindowsAndGroupByKey() {
PCollection<KV<String, Iterable<Integer>>> input =
@@ -191,7 +192,7 @@ public class ReshuffleTest implements Serializable {
}
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class})
public void testReshuffleAfterSlidingWindowsAndGroupByKey() {
PCollection<KV<String, Iterable<Integer>>> input =
@@ -212,7 +213,7 @@ public class ReshuffleTest implements Serializable {
}
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class})
public void testReshuffleAfterFixedWindows() {
PCollection<KV<String, Integer>> input =
@@ -232,7 +233,7 @@ public class ReshuffleTest implements Serializable {
}
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class})
public void testReshuffleAfterSlidingWindows() {
PCollection<KV<String, Integer>> input =
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
index 08a26cb..01e7335 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
@@ -34,12 +34,14 @@ import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.testing.DataflowPortabilityApiUnsupported;
+import org.apache.beam.sdk.testing.DataflowPortabilityExecutableStageUnsupported;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.testing.UsesBoundedSplittableParDo;
import org.apache.beam.sdk.testing.UsesParDoLifecycle;
+import org.apache.beam.sdk.testing.UsesSideInputs;
import org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs;
import org.apache.beam.sdk.testing.UsesTestStream;
import org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo;
@@ -152,7 +154,11 @@ public class SplittableDoFnTest implements Serializable {
}
@Test
- @Category({ValidatesRunner.class, UsesBoundedSplittableParDo.class})
+ @Category({
+ ValidatesRunner.class,
+ UsesBoundedSplittableParDo.class,
+ DataflowPortabilityExecutableStageUnsupported.class
+ })
public void testPairWithIndexWindowedTimestampedBounded() {
testPairWithIndexWindowedTimestamped(IsBounded.BOUNDED);
}
@@ -351,7 +357,7 @@ public class SplittableDoFnTest implements Serializable {
}
@Test
- @Category({ValidatesRunner.class, UsesBoundedSplittableParDo.class})
+ @Category({ValidatesRunner.class, UsesBoundedSplittableParDo.class, UsesSideInputs.class})
public void testSideInputBounded() {
testSideInput(IsBounded.BOUNDED);
}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
index ff476e3..002bc4b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
@@ -32,9 +32,11 @@ import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.testing.DataflowPortabilityExecutableStageUnsupported;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.UsesSideInputs;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
@@ -118,7 +120,7 @@ public class CoGroupByKeyTest implements Serializable {
@Rule public final transient TestPipeline p = TestPipeline.create();
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, UsesSideInputs.class})
public void testCoGroupByKeyGetOnly() {
final TupleTag<String> tag1 = new TupleTag<>();
final TupleTag<String> tag2 = new TupleTag<>();
@@ -248,7 +250,7 @@ public class CoGroupByKeyTest implements Serializable {
}
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, UsesSideInputs.class})
public void testCoGroupByKey() {
final TupleTag<String> namesTag = new TupleTag<>();
final TupleTag<String> addressesTag = new TupleTag<>();
@@ -468,7 +470,7 @@ public class CoGroupByKeyTest implements Serializable {
/** Tests the pipeline end-to-end with FixedWindows. */
@SuppressWarnings("unchecked")
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class})
public void testCoGroupByKeyWithWindowing() {
TupleTag<String> clicksTag = new TupleTag<>();
TupleTag<String> purchasesTag = new TupleTag<>();
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
index 6b1be69..5345213 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
@@ -52,6 +52,7 @@ import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.testing.DataflowPortabilityApiUnsupported;
+import org.apache.beam.sdk.testing.DataflowPortabilityExecutableStageUnsupported;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.UsesCustomWindowMerging;
@@ -439,7 +440,7 @@ public class WindowTest implements Serializable {
* the windowing function default, the end of the window.
*/
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class})
public void testTimestampCombinerDefault() {
pipeline.enableAbandonedNodeEnforcement(true);
@@ -473,7 +474,7 @@ public class WindowTest implements Serializable {
* the windowing function customized to use the end of the window.
*/
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class})
public void testTimestampCombinerEndOfWindow() {
pipeline.enableAbandonedNodeEnforcement(true);
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
index 88bf613..8b8faf8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
@@ -25,6 +25,7 @@ import java.io.Serializable;
import java.util.List;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.testing.DataflowPortabilityExecutableStageUnsupported;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -103,7 +104,7 @@ public class WindowingTest implements Serializable {
}
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class})
public void testPartitioningWindowing() {
PCollection<String> input =
p.apply(
@@ -127,7 +128,7 @@ public class WindowingTest implements Serializable {
}
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class})
public void testNonPartitioningWindowing() {
PCollection<String> input =
p.apply(
@@ -151,7 +152,7 @@ public class WindowingTest implements Serializable {
}
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class})
public void testMergingWindowing() {
PCollection<String> input =
p.apply(
@@ -169,7 +170,7 @@ public class WindowingTest implements Serializable {
}
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class})
public void testWindowPreservation() {
PCollection<String> input1 =
p.apply(