You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by sc...@apache.org on 2018/12/14 21:47:58 UTC

[beam] branch master updated: [BEAM-6225] Setup Jenkins Job to Run VR with ExecutableStage (#7271)

This is an automated email from the ASF dual-hosted git repository.

scott pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b7035c1  [BEAM-6225] Setup Jenkins Job to Run VR with ExecutableStage (#7271)
b7035c1 is described below

commit b7035c1a098c526356c6fb33480b989ed037da0a
Author: Boyuan Zhang <36...@users.noreply.github.com>
AuthorDate: Fri Dec 14 13:47:48 2018 -0800

    [BEAM-6225] Setup Jenkins Job to Run VR with ExecutableStage (#7271)
---
 ...unner_DataflowPortabilityExecutableStage.groovy | 54 ++++++++++++++++++++++
 runners/google-cloud-dataflow-java/build.gradle    | 48 +++++++++++++++++--
 ...aflowPortabilityExecutableStageUnsupported.java | 25 ++++++++++
 .../apache/beam/sdk/testing/UsesSideInputs.java    | 24 ++++++++++
 .../org/apache/beam/sdk/testing/PAssertTest.java   | 12 ++---
 .../apache/beam/sdk/transforms/CombineTest.java    | 30 ++++++------
 .../apache/beam/sdk/transforms/FlattenTest.java    |  3 +-
 .../apache/beam/sdk/transforms/GroupByKeyTest.java | 11 +++--
 .../org/apache/beam/sdk/transforms/ParDoTest.java  | 17 +++----
 .../apache/beam/sdk/transforms/ReshuffleTest.java  | 11 +++--
 .../beam/sdk/transforms/SplittableDoFnTest.java    | 10 +++-
 .../beam/sdk/transforms/join/CoGroupByKeyTest.java |  8 ++--
 .../beam/sdk/transforms/windowing/WindowTest.java  |  5 +-
 .../sdk/transforms/windowing/WindowingTest.java    |  9 ++--
 14 files changed, 212 insertions(+), 55 deletions(-)

diff --git a/.test-infra/jenkins/job_PostCommit_Java_ValidatesRunner_DataflowPortabilityExecutableStage.groovy b/.test-infra/jenkins/job_PostCommit_Java_ValidatesRunner_DataflowPortabilityExecutableStage.groovy
new file mode 100644
index 0000000..62e7361
--- /dev/null
+++ b/.test-infra/jenkins/job_PostCommit_Java_ValidatesRunner_DataflowPortabilityExecutableStage.groovy
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import CommonJobProperties as commonJobProperties
+import PostcommitJobBuilder
+
+
+// This job runs the suite of ValidatesRunner tests against the Dataflow
+// runner.
+PostcommitJobBuilder.postCommitJob('beam_PostCommit_Java_ValidatesRunner_DataflowPortabilityExecutableStage',
+  'Run Dataflow Portability ExecutableStage ValidatesRunner', 'Google Cloud Dataflow Runner PortabilityApi ExecutableStage ValidatesRunner Tests', this) {
+
+  description('Runs the ValidatesRunner suite on the Dataflow PortabilityApi runner with ExecutableStage code path enabled.')
+
+  // Set common parameters. Sets a 3 hour timeout.
+  commonJobProperties.setTopLevelMainJobProperties(delegate, 'master', 400)
+
+  // Publish all test results to Jenkins
+  publishers {
+    archiveJunit('**/build/test-results/**/*.xml')
+  }
+
+  // Gradle goals for this job.
+  steps {
+    gradle {
+      rootBuildScriptDir(commonJobProperties.checkoutDir)
+      tasks(':beam-runners-google-cloud-dataflow-java:validatesRunnerFnApiWorkerExecutableStageTest')
+      // Increase parallel worker threads above processor limit since most time is
+      // spent waiting on Dataflow jobs. ValidatesRunner tests on Dataflow are slow
+      // because each one launches a Dataflow job with about 3 mins of overhead.
+      // 3 x num_cores strikes a good balance between maxing out parallelism without
+      // overloading the machines.
+      commonJobProperties.setGradleSwitches(delegate, 3 * Runtime.runtime.availableProcessors())
+    }
+  }
+
+  // [BEAM-6236] "use_executable_stage_bundle_execution" hasn't been rolled out.
+  disabled()
+}
diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle
index c0b831c..9c6aaf4 100644
--- a/runners/google-cloud-dataflow-java/build.gradle
+++ b/runners/google-cloud-dataflow-java/build.gradle
@@ -241,17 +241,55 @@ task validatesRunnerFnApiWorkerTest(type: Test) {
     }
 }
 
+task validatesRunnerFnApiWorkerExecutableStageTest(type: Test) {
+    group = "Verification"
+    description "Validates Dataflow PortabilityApi runner"
+    dependsOn ":beam-runners-google-cloud-dataflow-java-fn-api-worker:shadowJar"
+    dependsOn buildAndPushDockerContainer
+
+    systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
+            "--runner=TestDataflowRunner",
+            "--project=${dataflowProject}",
+            "--tempRoot=${dataflowPostCommitTempRoot}",
+            "--dataflowWorkerJar=${dataflowFnApiWorkerJar}",
+            "--workerHarnessContainerImage=${dockerImageContainer}:${dockerTag}",
+            "--experiments=beam_fn_api,use_executable_stage_bundle_execution"]
+    )
+
+    // Increase test parallelism up to the number of Gradle workers. By default this is equal
+    // to the number of CPU cores, but can be increased by setting --max-workers=N.
+    maxParallelForks Integer.MAX_VALUE
+    classpath = configurations.validatesRunner
+    testClassesDirs = files(project(":beam-sdks-java-core").sourceSets.test.output.classesDirs)
+    // TODO(BEAM-6232): ViewTest tests sideinputs, which is not supported bu current bundle execution.
+    exclude '**/ViewTest.class'
+    useJUnit {
+        includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
+        commonExcludeCategories.each {
+            excludeCategories it
+        }
+        fnApiWorkerExcludeCategories.each {
+            excludeCategories it
+        }
+        // TODO(BEAM-6232): Support sideinput.
+        excludeCategories 'org.apache.beam.sdk.testing.UsesSideInputs'
+        // TODO(BEAM-6233): Support timer and state.
+        excludeCategories 'org.apache.beam.sdk.testing.UsesStatefulParDo'
+        // TODO(BEAM-6231): Triage failures.
+        excludeCategories 'org.apache.beam.sdk.testing.DataflowPortabilityExecutableStageUnsupported'
+    }
+}
+
 task validatesRunner {
   group = "Verification"
   description "Validates Dataflow runner"
   dependsOn validatesRunnerLegacyWorkerTest
 }
 
-task validatesRunnerPortabilityApi {
-  group = "Verification"
-  description "Validates Dataflow PortabilityApi runner"
-  dependsOn validatesRunnerFnApiWorkerTest
-  dependsOn buildAndPushDockerContainer
+task validatesRunnerPortabilityApiExecutableStage {
+    group = "Verification"
+    description "Validates Dataflow PortabilityApi runner"
+    dependsOn validatesRunnerFnApiWorkerExecutableStageTest
 }
 
 task googleCloudPlatformLegacyWorkerIntegrationTest(type: Test) {
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/DataflowPortabilityExecutableStageUnsupported.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/DataflowPortabilityExecutableStageUnsupported.java
new file mode 100644
index 0000000..a14885f
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/DataflowPortabilityExecutableStageUnsupported.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+/**
+ * Category tag for validation tests which are not supported by Dataflow portable worker with
+ * use_exetuable_stage_bundle_execution, which needs more investigations.
+ */
+// TODO(BEAM-6231): Triage test failures introduced by using ExecutableStage.
+public interface DataflowPortabilityExecutableStageUnsupported {}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSideInputs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSideInputs.java
new file mode 100644
index 0000000..907c13e
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSideInputs.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+/**
+ * Category tag for validation tests which use sideinputs. Tests tagged with {@link UsesSideInputs}
+ * should be run for runners which support sideinputs.
+ */
+public interface UsesSideInputs {}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
index bf7ee69..160f709 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
@@ -297,7 +297,7 @@ public class PAssertTest implements Serializable {
 
   /** Basic test for {@code isEqualTo}. */
   @Test
-  @Category(ValidatesRunner.class)
+  @Category({ValidatesRunner.class, UsesSideInputs.class})
   public void testIsEqualTo() throws Exception {
     PCollection<Integer> pcollection = pipeline.apply(Create.of(43));
     PAssert.thatSingleton(pcollection).isEqualTo(43);
@@ -306,7 +306,7 @@ public class PAssertTest implements Serializable {
 
   /** Basic test for {@code isEqualTo}. */
   @Test
-  @Category(ValidatesRunner.class)
+  @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class})
   public void testWindowedIsEqualTo() throws Exception {
     PCollection<Integer> pcollection =
         pipeline
@@ -326,7 +326,7 @@ public class PAssertTest implements Serializable {
 
   /** Basic test for {@code notEqualTo}. */
   @Test
-  @Category(ValidatesRunner.class)
+  @Category({ValidatesRunner.class, UsesSideInputs.class})
   public void testNotEqualTo() throws Exception {
     PCollection<Integer> pcollection = pipeline.apply(Create.of(43));
     PAssert.thatSingleton(pcollection).notEqualTo(42);
@@ -335,7 +335,7 @@ public class PAssertTest implements Serializable {
 
   /** Test that we throw an error for false assertion on singleton. */
   @Test
-  @Category({ValidatesRunner.class, UsesFailureMessage.class})
+  @Category({ValidatesRunner.class, UsesFailureMessage.class, UsesSideInputs.class})
   public void testPAssertEqualsSingletonFalse() throws Exception {
     PCollection<Integer> pcollection = pipeline.apply(Create.of(42));
     PAssert.thatSingleton("The value was not equal to 44", pcollection).isEqualTo(44);
@@ -351,7 +351,7 @@ public class PAssertTest implements Serializable {
 
   /** Test that we throw an error for false assertion on singleton. */
   @Test
-  @Category({ValidatesRunner.class, UsesFailureMessage.class})
+  @Category({ValidatesRunner.class, UsesFailureMessage.class, UsesSideInputs.class})
   public void testPAssertEqualsSingletonFalseDefaultReasonString() throws Exception {
     PCollection<Integer> pcollection = pipeline.apply(Create.of(42));
     PAssert.thatSingleton(pcollection).isEqualTo(44);
@@ -385,7 +385,7 @@ public class PAssertTest implements Serializable {
 
   /** Tests that windowed {@code containsInAnyOrder} is actually order-independent. */
   @Test
-  @Category(ValidatesRunner.class)
+  @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class})
   public void testWindowedContainsInAnyOrder() throws Exception {
     PCollection<Integer> pcollection =
         pipeline
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index 2d825dc..21f7cda 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -58,9 +58,11 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.testing.DataflowPortabilityApiUnsupported;
+import org.apache.beam.sdk.testing.DataflowPortabilityExecutableStageUnsupported;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.UsesSideInputs;
 import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.CombineTest.SharedTestBase.TestCombineFn.Accumulator;
@@ -662,7 +664,7 @@ public class CombineTest implements Serializable {
     }
 
     @Test
-    @Category(ValidatesRunner.class)
+    @Category({ValidatesRunner.class, UsesSideInputs.class})
     public void testBasicCombine() {
       runTestBasicCombine(
           Arrays.asList(KV.of("a", 1), KV.of("a", 1), KV.of("a", 4), KV.of("b", 1), KV.of("b", 13)),
@@ -673,7 +675,7 @@ public class CombineTest implements Serializable {
     }
 
     @Test
-    @Category(ValidatesRunner.class)
+    @Category({ValidatesRunner.class, UsesSideInputs.class})
     public void testBasicCombineEmpty() {
       runTestBasicCombine(EMPTY_TABLE, ImmutableSet.of(), Collections.emptyList());
     }
@@ -950,7 +952,7 @@ public class CombineTest implements Serializable {
   @RunWith(JUnit4.class)
   public static class CombineWithContextTests extends SharedTestBase {
     @Test
-    @Category(ValidatesRunner.class)
+    @Category({ValidatesRunner.class, UsesSideInputs.class})
     @SuppressWarnings({"rawtypes", "unchecked"})
     public void testSimpleCombineWithContext() {
       runTestSimpleCombineWithContext(
@@ -961,7 +963,7 @@ public class CombineTest implements Serializable {
     }
 
     @Test
-    @Category(ValidatesRunner.class)
+    @Category({ValidatesRunner.class, UsesSideInputs.class})
     public void testSimpleCombineWithContextEmpty() {
       runTestSimpleCombineWithContext(EMPTY_TABLE, 0, Collections.emptyList(), new String[] {});
     }
@@ -1022,7 +1024,7 @@ public class CombineTest implements Serializable {
     }
 
     @Test
-    @Category(ValidatesRunner.class)
+    @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class})
     public void testFixedWindowsCombineWithContext() {
       PCollection<KV<String, Integer>> perKeyInput =
           pipeline
@@ -1064,7 +1066,7 @@ public class CombineTest implements Serializable {
     }
 
     @Test
-    @Category(ValidatesRunner.class)
+    @Category({ValidatesRunner.class, UsesSideInputs.class})
     public void testSlidingWindowsCombine() {
       PCollection<String> input =
           pipeline
@@ -1123,7 +1125,7 @@ public class CombineTest implements Serializable {
     }
 
     @Test
-    @Category(ValidatesRunner.class)
+    @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class})
     public void testSlidingWindowsCombineWithContext() {
       // [a: 1, 1], [a: 4; b: 1], [b: 13]
       PCollection<KV<String, Integer>> perKeyInput =
@@ -1174,7 +1176,7 @@ public class CombineTest implements Serializable {
     }
 
     @Test
-    @Category(ValidatesRunner.class)
+    @Category({ValidatesRunner.class, UsesSideInputs.class})
     public void testGlobalCombineWithDefaultsAndTriggers() {
       PCollection<Integer> input = pipeline.apply(Create.of(1, 1));
 
@@ -1201,7 +1203,7 @@ public class CombineTest implements Serializable {
     }
 
     @Test
-    @Category(ValidatesRunner.class)
+    @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class})
     public void testSessionsCombine() {
       PCollection<KV<String, Integer>> input =
           pipeline
@@ -1227,7 +1229,7 @@ public class CombineTest implements Serializable {
     }
 
     @Test
-    @Category(ValidatesRunner.class)
+    @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class})
     public void testSessionsCombineWithContext() {
       PCollection<KV<String, Integer>> perKeyInput =
           pipeline.apply(
@@ -1292,7 +1294,7 @@ public class CombineTest implements Serializable {
     }
 
     @Test
-    @Category(ValidatesRunner.class)
+    @Category({ValidatesRunner.class, UsesSideInputs.class})
     public void testCombineGloballyAsSingletonView() {
       final PCollectionView<Integer> view =
           pipeline
@@ -1318,7 +1320,7 @@ public class CombineTest implements Serializable {
     }
 
     @Test
-    @Category(ValidatesRunner.class)
+    @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class})
     public void testWindowedCombineGloballyAsSingletonView() {
       FixedWindows windowFn = FixedWindows.of(Duration.standardMinutes(1));
       final PCollectionView<Integer> view =
@@ -1362,7 +1364,7 @@ public class CombineTest implements Serializable {
 
     /** Tests creation of a global {@link Combine} via Java 8 lambda. */
     @Test
-    @Category(ValidatesRunner.class)
+    @Category({ValidatesRunner.class, UsesSideInputs.class})
     public void testCombineGloballyLambda() {
 
       PCollection<Integer> output =
@@ -1384,7 +1386,7 @@ public class CombineTest implements Serializable {
 
     /** Tests creation of a global {@link Combine} via a Java 8 method reference. */
     @Test
-    @Category(ValidatesRunner.class)
+    @Category({ValidatesRunner.class, UsesSideInputs.class})
     public void testCombineGloballyInstanceMethodReference() {
 
       PCollection<Integer> output =
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
index aa25136..693598b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
@@ -50,6 +50,7 @@ import org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.UsesSideInputs;
 import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Sessions;
@@ -184,7 +185,7 @@ public class FlattenTest implements Serializable {
   }
 
   @Test
-  @Category(ValidatesRunner.class)
+  @Category({ValidatesRunner.class, UsesSideInputs.class})
   public void testEmptyFlattenAsSideInput() {
     final PCollectionView<Iterable<String>> view =
         PCollectionList.<String>empty(p)
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
index 2049e10..c6bd909 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
@@ -49,6 +49,7 @@ import org.apache.beam.sdk.coders.MapCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.testing.DataflowPortabilityApiUnsupported;
+import org.apache.beam.sdk.testing.DataflowPortabilityExecutableStageUnsupported;
 import org.apache.beam.sdk.testing.LargeKeys;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
@@ -254,7 +255,7 @@ public class GroupByKeyTest implements Serializable {
      * two values.
      */
     @Test
-    @Category(ValidatesRunner.class)
+    @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class})
     public void testTimestampCombinerEarliest() {
 
       p.apply(
@@ -275,7 +276,7 @@ public class GroupByKeyTest implements Serializable {
      * the windowing function customized to use the latest value.
      */
     @Test
-    @Category(ValidatesRunner.class)
+    @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class})
     public void testTimestampCombinerLatest() {
       p.apply(
               Create.timestamped(
@@ -382,7 +383,7 @@ public class GroupByKeyTest implements Serializable {
   @RunWith(JUnit4.class)
   public static class WindowTests extends SharedTestBase {
     @Test
-    @Category(ValidatesRunner.class)
+    @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class})
     public void testGroupByKeyAndWindows() {
       List<KV<String, Integer>> ungroupedPairs =
           Arrays.asList(
@@ -423,7 +424,7 @@ public class GroupByKeyTest implements Serializable {
     }
 
     @Test
-    @Category(ValidatesRunner.class)
+    @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class})
     public void testGroupByKeyMultipleWindows() {
       PCollection<KV<String, Integer>> windowedInput =
           p.apply(
@@ -453,7 +454,7 @@ public class GroupByKeyTest implements Serializable {
     }
 
     @Test
-    @Category(ValidatesRunner.class)
+    @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class})
     public void testGroupByKeyMergingWindows() {
       PCollection<KV<String, Integer>> windowedInput =
           p.apply(
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index dab9f13..3264bde 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -82,12 +82,14 @@ import org.apache.beam.sdk.state.TimerSpec;
 import org.apache.beam.sdk.state.TimerSpecs;
 import org.apache.beam.sdk.state.ValueState;
 import org.apache.beam.sdk.testing.DataflowPortabilityApiUnsupported;
+import org.apache.beam.sdk.testing.DataflowPortabilityExecutableStageUnsupported;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.testing.UsesMapState;
 import org.apache.beam.sdk.testing.UsesSetState;
+import org.apache.beam.sdk.testing.UsesSideInputs;
 import org.apache.beam.sdk.testing.UsesStatefulParDo;
 import org.apache.beam.sdk.testing.UsesTestStream;
 import org.apache.beam.sdk.testing.UsesTimersInParDo;
@@ -342,7 +344,6 @@ public class ParDoTest implements Serializable {
           pipeline.apply(Create.of(inputs)).apply(ParDo.of(new TestDoFn()));
 
       PAssert.that(output).satisfies(ParDoTest.HasExpectedOutput.forInput(inputs));
-
       pipeline.run();
     }
 
@@ -706,7 +707,7 @@ public class ParDoTest implements Serializable {
     }
 
     @Test
-    @Category(ValidatesRunner.class)
+    @Category({ValidatesRunner.class, UsesSideInputs.class})
     public void testParDoWithSideInputs() {
 
       List<Integer> inputs = Arrays.asList(3, -42, 666);
@@ -738,7 +739,7 @@ public class ParDoTest implements Serializable {
     }
 
     @Test
-    @Category(ValidatesRunner.class)
+    @Category({ValidatesRunner.class, UsesSideInputs.class})
     public void testParDoWithSideInputsIsCumulative() {
 
       List<Integer> inputs = Arrays.asList(3, -42, 666);
@@ -772,7 +773,7 @@ public class ParDoTest implements Serializable {
     }
 
     @Test
-    @Category(ValidatesRunner.class)
+    @Category({ValidatesRunner.class, UsesSideInputs.class})
     public void testMultiOutputParDoWithSideInputs() {
 
       List<Integer> inputs = Arrays.asList(3, -42, 666);
@@ -810,7 +811,7 @@ public class ParDoTest implements Serializable {
     }
 
     @Test
-    @Category(ValidatesRunner.class)
+    @Category({ValidatesRunner.class, UsesSideInputs.class})
     public void testMultiOutputParDoWithSideInputsIsCumulative() {
 
       List<Integer> inputs = Arrays.asList(3, -42, 666);
@@ -866,7 +867,7 @@ public class ParDoTest implements Serializable {
     }
 
     @Test
-    @Category(ValidatesRunner.class)
+    @Category({ValidatesRunner.class, UsesSideInputs.class})
     public void testSideInputsWithMultipleWindows() {
       // Tests that the runner can safely run a DoFn that uses side inputs
       // on an input where the element is in multiple windows. The complication is
@@ -1186,7 +1187,7 @@ public class ParDoTest implements Serializable {
     }
 
     @Test
-    @Category(ValidatesRunner.class)
+    @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class})
     public void testWindowingInStartAndFinishBundle() {
 
       final FixedWindows windowFn = FixedWindows.of(Duration.millis(1));
@@ -1961,7 +1962,7 @@ public class ParDoTest implements Serializable {
     }
 
     @Test
-    @Category({ValidatesRunner.class, UsesStatefulParDo.class})
+    @Category({ValidatesRunner.class, UsesStatefulParDo.class, UsesSideInputs.class})
     public void testBagStateSideInput() {
 
       final PCollectionView<List<Integer>> listView =
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java
index 6c2c94b..5d9c7f2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java
@@ -30,6 +30,7 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.testing.DataflowPortabilityExecutableStageUnsupported;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestStream;
@@ -149,7 +150,7 @@ public class ReshuffleTest implements Serializable {
   }
 
   @Test
-  @Category(ValidatesRunner.class)
+  @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class})
   public void testReshuffleAfterSessionsAndGroupByKey() {
 
     PCollection<KV<String, Iterable<Integer>>> input =
@@ -170,7 +171,7 @@ public class ReshuffleTest implements Serializable {
   }
 
   @Test
-  @Category(ValidatesRunner.class)
+  @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class})
   public void testReshuffleAfterFixedWindowsAndGroupByKey() {
 
     PCollection<KV<String, Iterable<Integer>>> input =
@@ -191,7 +192,7 @@ public class ReshuffleTest implements Serializable {
   }
 
   @Test
-  @Category(ValidatesRunner.class)
+  @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class})
   public void testReshuffleAfterSlidingWindowsAndGroupByKey() {
 
     PCollection<KV<String, Iterable<Integer>>> input =
@@ -212,7 +213,7 @@ public class ReshuffleTest implements Serializable {
   }
 
   @Test
-  @Category(ValidatesRunner.class)
+  @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class})
   public void testReshuffleAfterFixedWindows() {
 
     PCollection<KV<String, Integer>> input =
@@ -232,7 +233,7 @@ public class ReshuffleTest implements Serializable {
   }
 
   @Test
-  @Category(ValidatesRunner.class)
+  @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class})
   public void testReshuffleAfterSlidingWindows() {
 
     PCollection<KV<String, Integer>> input =
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
index 08a26cb..01e7335 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
@@ -34,12 +34,14 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.range.OffsetRange;
 import org.apache.beam.sdk.testing.DataflowPortabilityApiUnsupported;
+import org.apache.beam.sdk.testing.DataflowPortabilityExecutableStageUnsupported;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.testing.UsesBoundedSplittableParDo;
 import org.apache.beam.sdk.testing.UsesParDoLifecycle;
+import org.apache.beam.sdk.testing.UsesSideInputs;
 import org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs;
 import org.apache.beam.sdk.testing.UsesTestStream;
 import org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo;
@@ -152,7 +154,11 @@ public class SplittableDoFnTest implements Serializable {
   }
 
   @Test
-  @Category({ValidatesRunner.class, UsesBoundedSplittableParDo.class})
+  @Category({
+    ValidatesRunner.class,
+    UsesBoundedSplittableParDo.class,
+    DataflowPortabilityExecutableStageUnsupported.class
+  })
   public void testPairWithIndexWindowedTimestampedBounded() {
     testPairWithIndexWindowedTimestamped(IsBounded.BOUNDED);
   }
@@ -351,7 +357,7 @@ public class SplittableDoFnTest implements Serializable {
   }
 
   @Test
-  @Category({ValidatesRunner.class, UsesBoundedSplittableParDo.class})
+  @Category({ValidatesRunner.class, UsesBoundedSplittableParDo.class, UsesSideInputs.class})
   public void testSideInputBounded() {
     testSideInput(IsBounded.BOUNDED);
   }
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
index ff476e3..002bc4b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
@@ -32,9 +32,11 @@ import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.testing.DataflowPortabilityExecutableStageUnsupported;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.UsesSideInputs;
 import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -118,7 +120,7 @@ public class CoGroupByKeyTest implements Serializable {
   @Rule public final transient TestPipeline p = TestPipeline.create();
 
   @Test
-  @Category(ValidatesRunner.class)
+  @Category({ValidatesRunner.class, UsesSideInputs.class})
   public void testCoGroupByKeyGetOnly() {
     final TupleTag<String> tag1 = new TupleTag<>();
     final TupleTag<String> tag2 = new TupleTag<>();
@@ -248,7 +250,7 @@ public class CoGroupByKeyTest implements Serializable {
   }
 
   @Test
-  @Category(ValidatesRunner.class)
+  @Category({ValidatesRunner.class, UsesSideInputs.class})
   public void testCoGroupByKey() {
     final TupleTag<String> namesTag = new TupleTag<>();
     final TupleTag<String> addressesTag = new TupleTag<>();
@@ -468,7 +470,7 @@ public class CoGroupByKeyTest implements Serializable {
   /** Tests the pipeline end-to-end with FixedWindows. */
   @SuppressWarnings("unchecked")
   @Test
-  @Category(ValidatesRunner.class)
+  @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class})
   public void testCoGroupByKeyWithWindowing() {
     TupleTag<String> clicksTag = new TupleTag<>();
     TupleTag<String> purchasesTag = new TupleTag<>();
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
index 6b1be69..5345213 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
@@ -52,6 +52,7 @@ import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.testing.DataflowPortabilityApiUnsupported;
+import org.apache.beam.sdk.testing.DataflowPortabilityExecutableStageUnsupported;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.UsesCustomWindowMerging;
@@ -439,7 +440,7 @@ public class WindowTest implements Serializable {
    * the windowing function default, the end of the window.
    */
   @Test
-  @Category(ValidatesRunner.class)
+  @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class})
   public void testTimestampCombinerDefault() {
     pipeline.enableAbandonedNodeEnforcement(true);
 
@@ -473,7 +474,7 @@ public class WindowTest implements Serializable {
    * the windowing function customized to use the end of the window.
    */
   @Test
-  @Category(ValidatesRunner.class)
+  @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class})
   public void testTimestampCombinerEndOfWindow() {
     pipeline.enableAbandonedNodeEnforcement(true);
 
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
index 88bf613..8b8faf8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
@@ -25,6 +25,7 @@ import java.io.Serializable;
 import java.util.List;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.testing.DataflowPortabilityExecutableStageUnsupported;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -103,7 +104,7 @@ public class WindowingTest implements Serializable {
   }
 
   @Test
-  @Category(ValidatesRunner.class)
+  @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class})
   public void testPartitioningWindowing() {
     PCollection<String> input =
         p.apply(
@@ -127,7 +128,7 @@ public class WindowingTest implements Serializable {
   }
 
   @Test
-  @Category(ValidatesRunner.class)
+  @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class})
   public void testNonPartitioningWindowing() {
     PCollection<String> input =
         p.apply(
@@ -151,7 +152,7 @@ public class WindowingTest implements Serializable {
   }
 
   @Test
-  @Category(ValidatesRunner.class)
+  @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class})
   public void testMergingWindowing() {
     PCollection<String> input =
         p.apply(
@@ -169,7 +170,7 @@ public class WindowingTest implements Serializable {
   }
 
   @Test
-  @Category(ValidatesRunner.class)
+  @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class})
   public void testWindowPreservation() {
     PCollection<String> input1 =
         p.apply(