You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2018/10/30 17:35:31 UTC

[beam] branch master updated: [BEAM-5781] Enable ValidatesRunner tests for protable java (#6881)

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 71c6de3  [BEAM-5781] Enable ValidatesRunner tests for protable java (#6881)
71c6de3 is described below

commit 71c6de321fb8648146f7bd877f074a0a7617c6e0
Author: Boyuan Zhang <36...@users.noreply.github.com>
AuthorDate: Tue Oct 30 10:35:24 2018 -0700

    [BEAM-5781] Enable ValidatesRunner tests for protable java (#6881)
---
 ..._ValidatesRunner_PortabilityApi_Dataflow.groovy | 51 ++++++++++++
 runners/google-cloud-dataflow-java/build.gradle    | 54 ++++++++++++
 .../org/apache/beam/sdk/io/CountingSourceTest.java |  5 +-
 .../org/apache/beam/sdk/metrics/MetricsTest.java   |  8 +-
 .../org/apache/beam/sdk/testing/PAssertTest.java   |  6 +-
 .../apache/beam/sdk/transforms/CombineFnsTest.java |  7 +-
 .../apache/beam/sdk/transforms/CombineTest.java    | 15 ++--
 .../apache/beam/sdk/transforms/GroupByKeyTest.java |  3 +-
 .../org/apache/beam/sdk/transforms/ParDoTest.java  | 97 ++++++++++++++++++----
 .../beam/sdk/transforms/SplittableDoFnTest.java    |  7 +-
 .../org/apache/beam/sdk/transforms/ViewTest.java   | 13 +--
 .../beam/sdk/transforms/windowing/WindowTest.java  |  3 +-
 12 files changed, 228 insertions(+), 41 deletions(-)

diff --git a/.test-infra/jenkins/job_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow.groovy b/.test-infra/jenkins/job_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow.groovy
new file mode 100644
index 0000000..6f7264e
--- /dev/null
+++ b/.test-infra/jenkins/job_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow.groovy
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import CommonJobProperties as commonJobProperties
+import PostcommitJobBuilder
+
+
+// This job runs the suite of ValidatesRunner tests against the Dataflow
+// runner.
+PostcommitJobBuilder.postCommitJob('beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow_Gradle',
+  'Run Dataflow PortabilityApi ValidatesRunner', 'Google Cloud Dataflow Runner PortabilityApi ValidatesRunner Tests', this) {
+
+  description('Runs the ValidatesRunner suite on the Dataflow PortabilityApi runner.')
+
+  // Set common parameters. Sets a 3 hour timeout.
+  commonJobProperties.setTopLevelMainJobProperties(delegate, 'master', 400)
+
+  // Publish all test results to Jenkins
+  publishers {
+    archiveJunit('**/build/test-results/**/*.xml')
+  }
+
+  // Gradle goals for this job.
+  steps {
+    gradle {
+      rootBuildScriptDir(commonJobProperties.checkoutDir)
+      tasks(':beam-runners-google-cloud-dataflow-java:validatesRunnerPortabilityApi')
+      // Increase parallel worker threads above processor limit since most time is
+      // spent waiting on Dataflow jobs. ValidatesRunner tests on Dataflow are slow
+      // because each one launches a Dataflow job with about 3 mins of overhead.
+      // 3 x num_cores strikes a good balance between maxing out parallelism without
+      // overloading the machines.
+      commonJobProperties.setGradleSwitches(delegate, 3 * Runtime.runtime.availableProcessors())
+    }
+  }
+}
diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle
index 00edec7..ab18b53 100644
--- a/runners/google-cloud-dataflow-java/build.gradle
+++ b/runners/google-cloud-dataflow-java/build.gradle
@@ -131,6 +131,14 @@ def commonExcludeCategories = [
   'org.apache.beam.sdk.testing.UsesMetricsPusher',
 ]
 
+def fnApiWorkerExcludeCategories = [
+  'org.apache.beam.sdk.testing.UsesCustomWindowMerging',
+  'org.apache.beam.sdk.testing.DataflowPortabilityApiUnsupported',
+  'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders',
+  'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo',
+  'org.apache.beam.sdk.testing.UsesSchema'
+]
+
 // For the following test tasks using legacy worker, set workerHarnessContainerImage to empty to
 // make Dataflow pick up the non-versioned container image, which handles a staged worker jar.
 task validatesRunnerLegacyWorkerTest(type: Test) {
@@ -172,12 +180,58 @@ task buildAndPushDockerContainer() {
   }
 }
 
+task validatesRunnerFnApiWorkerTest(type: Test) {
+    group = "Verification"
+    dependsOn ":beam-runners-google-cloud-dataflow-java-fn-api-worker:shadowJar"
+    dependsOn buildAndPushDockerContainer
+
+    systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
+            "--runner=TestDataflowRunner",
+            "--project=${dataflowProject}",
+            "--tempRoot=${dataflowPostCommitTempRoot}",
+            "--dataflowWorkerJar=${dataflowFnApiWorkerJar}",
+            "--workerHarnessContainerImage=${dockerImageContainer}:${dockerTag}",
+            "--experiments=beam_fn_api",
+
+    ])
+
+    // Increase test parallelism up to the number of Gradle workers. By default this is equal
+    // to the number of CPU cores, but can be increased by setting --max-workers=N.
+    maxParallelForks Integer.MAX_VALUE
+    classpath = configurations.validatesRunner
+    testClassesDirs = files(project(":beam-sdks-java-core").sourceSets.test.output.classesDirs)
+    useJUnit {
+      includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
+      commonExcludeCategories.each {
+        excludeCategories it
+      }
+      fnApiWorkerExcludeCategories.each {
+        excludeCategories it
+      }
+    }
+}
+
 task validatesRunner {
   group = "Verification"
   description "Validates Dataflow runner"
   dependsOn validatesRunnerLegacyWorkerTest
 }
 
+task validatesRunnerPortabilityApi {
+  group = "Verification"
+  description "Validates Dataflow PortabilityApi runner"
+  dependsOn validatesRunnerFnApiWorkerTest
+  // Clean up docker image
+  doLast {
+    exec {
+      commandLine "docker", "rmi", "${dockerImageName}"
+    }
+    exec {
+      commandLine "gcloud", "--quiet", "container", "images", "delete", "${dockerImageName}"
+    }
+  }
+}
+
 task googleCloudPlatformLegacyWorkerIntegrationTest(type: Test) {
   group = "Verification"
   dependsOn ":beam-runners-google-cloud-dataflow-java-legacy-worker:shadowJar"
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
index e0224fb..8cacd67 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
@@ -30,6 +30,7 @@ import org.apache.beam.sdk.io.CountingSource.CounterMark;
 import org.apache.beam.sdk.io.CountingSource.UnboundedCountingSource;
 import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.DataflowPortabilityApiUnsupported;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -91,7 +92,7 @@ public class CountingSourceTest {
   }
 
   @Test
-  @Category(ValidatesRunner.class)
+  @Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
   public void testBoundedSourceSplits() throws Exception {
     long numElements = 1000;
     long numSplits = 10;
@@ -213,7 +214,7 @@ public class CountingSourceTest {
   }
 
   @Test
-  @Category(ValidatesRunner.class)
+  @Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
   public void testUnboundedSourceSplits() throws Exception {
     long numElements = 1000;
     int numSplits = 10;
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
index 284cba7..6f7863d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
@@ -29,6 +29,7 @@ import static org.mockito.Mockito.when;
 import java.io.Serializable;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.testing.DataflowPortabilityApiUnsupported;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.UsesAttemptedMetrics;
@@ -242,7 +243,12 @@ public class MetricsTest implements Serializable {
       assertAllMetrics(metrics, true);
     }
 
-    @Category({ValidatesRunner.class, UsesCommittedMetrics.class, UsesCounterMetrics.class})
+    @Category({
+      ValidatesRunner.class,
+      UsesCommittedMetrics.class,
+      UsesCounterMetrics.class,
+      DataflowPortabilityApiUnsupported.class
+    })
     @Test
     public void testCommittedCounterMetrics() {
       PipelineResult result = runPipelineWithMetrics();
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
index bedbb5d..bf7ee69 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
@@ -165,7 +165,7 @@ public class PAssertTest implements Serializable {
    * the {@link PCollection} to be serializable.
    */
   @Test
-  @Category(ValidatesRunner.class)
+  @Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
   public void testContainsInAnyOrderNotSerializable() throws Exception {
     PCollection<NotSerializableObject> pcollection =
         pipeline.apply(
@@ -183,7 +183,7 @@ public class PAssertTest implements Serializable {
    * arbitrary {@link SerializableFunction}, though.
    */
   @Test
-  @Category(ValidatesRunner.class)
+  @Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
   public void testSerializablePredicate() throws Exception {
     PCollection<NotSerializableObject> pcollection =
         pipeline.apply(
@@ -204,7 +204,7 @@ public class PAssertTest implements Serializable {
    * arbitrary {@link SerializableFunction}, though.
    */
   @Test
-  @Category(ValidatesRunner.class)
+  @Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
   public void testWindowedSerializablePredicate() throws Exception {
     PCollection<NotSerializableObject> pcollection =
         pipeline
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
index 94bb159..c2c7a84 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
@@ -37,6 +37,7 @@ import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.testing.DataflowPortabilityApiUnsupported;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.ValidatesRunner;
@@ -96,7 +97,7 @@ public class CombineFnsTest {
   }
 
   @Test
-  @Category(ValidatesRunner.class)
+  @Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
   public void testComposedCombine() {
     p.getCoderRegistry().registerCoderForClass(UserString.class, UserStringCoder.of());
 
@@ -146,7 +147,7 @@ public class CombineFnsTest {
   }
 
   @Test
-  @Category(ValidatesRunner.class)
+  @Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
   public void testComposedCombineWithContext() {
     p.getCoderRegistry().registerCoderForClass(UserString.class, UserStringCoder.of());
 
@@ -207,7 +208,7 @@ public class CombineFnsTest {
   }
 
   @Test
-  @Category(ValidatesRunner.class)
+  @Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
   public void testComposedCombineNullValues() {
     p.getCoderRegistry()
         .registerCoderForClass(UserString.class, NullableCoder.of(UserStringCoder.of()));
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index bb7414e..2d825dc 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -57,6 +57,7 @@ import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.testing.DataflowPortabilityApiUnsupported;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -645,7 +646,7 @@ public class CombineTest implements Serializable {
   @RunWith(JUnit4.class)
   public static class BasicTests extends SharedTestBase {
     @Test
-    @Category(ValidatesRunner.class)
+    @Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
     @SuppressWarnings({"rawtypes", "unchecked"})
     public void testSimpleCombine() {
       runTestSimpleCombine(
@@ -655,7 +656,7 @@ public class CombineTest implements Serializable {
     }
 
     @Test
-    @Category(ValidatesRunner.class)
+    @Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
     public void testSimpleCombineEmpty() {
       runTestSimpleCombine(EMPTY_TABLE, 0, Collections.emptyList());
     }
@@ -692,7 +693,7 @@ public class CombineTest implements Serializable {
     }
 
     @Test
-    @Category(ValidatesRunner.class)
+    @Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
     public void testHotKeyCombining() {
       PCollection<KV<String, Integer>> input =
           copy(
@@ -994,7 +995,7 @@ public class CombineTest implements Serializable {
   @RunWith(JUnit4.class)
   public static class WindowingTests extends SharedTestBase implements Serializable {
     @Test
-    @Category(ValidatesRunner.class)
+    @Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
     public void testFixedWindowsCombine() {
       PCollection<KV<String, Integer>> input =
           pipeline
@@ -1277,7 +1278,7 @@ public class CombineTest implements Serializable {
     }
 
     @Test
-    @Category(ValidatesRunner.class)
+    @Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
     public void testWindowedCombineEmpty() {
       PCollection<Double> mean =
           pipeline
@@ -1398,7 +1399,7 @@ public class CombineTest implements Serializable {
   @RunWith(JUnit4.class)
   public static class AccumulationTests extends SharedTestBase {
     @Test
-    @Category(ValidatesRunner.class)
+    @Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
     public void testAccumulatingCombine() {
       runTestAccumulatingCombine(
           Arrays.asList(KV.of("a", 1), KV.of("a", 1), KV.of("a", 4), KV.of("b", 1), KV.of("b", 13)),
@@ -1407,7 +1408,7 @@ public class CombineTest implements Serializable {
     }
 
     @Test
-    @Category(ValidatesRunner.class)
+    @Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
     public void testAccumulatingCombineEmpty() {
       runTestAccumulatingCombine(EMPTY_TABLE, 0.0, Collections.emptyList());
     }
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
index e81d2b9..2049e10 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
@@ -48,6 +48,7 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.MapCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.testing.DataflowPortabilityApiUnsupported;
 import org.apache.beam.sdk.testing.LargeKeys;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
@@ -308,7 +309,7 @@ public class GroupByKeyTest implements Serializable {
 
     /** Verify that runners correctly hash/group on the encoded value and not the value itself. */
     @Test
-    @Category(ValidatesRunner.class)
+    @Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
     public void testGroupByKeyWithBadEqualsHashCode() throws Exception {
       final int numValues = 10;
       final int numKeys = 5;
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index f7e9b79..5cd4382 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -79,6 +79,7 @@ import org.apache.beam.sdk.state.Timer;
 import org.apache.beam.sdk.state.TimerSpec;
 import org.apache.beam.sdk.state.TimerSpecs;
 import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.testing.DataflowPortabilityApiUnsupported;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -1521,7 +1522,11 @@ public class ParDoTest implements Serializable {
     }
 
     @Test
-    @Category({ValidatesRunner.class, UsesStatefulParDo.class})
+    @Category({
+      ValidatesRunner.class,
+      UsesStatefulParDo.class,
+      DataflowPortabilityApiUnsupported.class
+    })
     public void testCoderInferenceOfList() {
       final String stateId = "foo";
       MyIntegerCoder myIntegerCoder = MyIntegerCoder.of();
@@ -1561,7 +1566,11 @@ public class ParDoTest implements Serializable {
     }
 
     @Test
-    @Category({ValidatesRunner.class, UsesStatefulParDo.class})
+    @Category({
+      ValidatesRunner.class,
+      UsesStatefulParDo.class,
+      DataflowPortabilityApiUnsupported.class
+    })
     public void testValueStateFixedWindows() {
       final String stateId = "foo";
 
@@ -1660,7 +1669,11 @@ public class ParDoTest implements Serializable {
     }
 
     @Test
-    @Category({ValidatesRunner.class, UsesStatefulParDo.class})
+    @Category({
+      ValidatesRunner.class,
+      UsesStatefulParDo.class,
+      DataflowPortabilityApiUnsupported.class
+    })
     public void testValueStateTaggedOutput() {
       final String stateId = "foo";
 
@@ -1711,7 +1724,11 @@ public class ParDoTest implements Serializable {
     }
 
     @Test
-    @Category({ValidatesRunner.class, UsesStatefulParDo.class})
+    @Category({
+      ValidatesRunner.class,
+      UsesStatefulParDo.class,
+      DataflowPortabilityApiUnsupported.class
+    })
     public void testBagState() {
       final String stateId = "foo";
 
@@ -1997,7 +2014,11 @@ public class ParDoTest implements Serializable {
   @RunWith(JUnit4.class)
   public static class StateCoderInferenceTests extends SharedTestBase implements Serializable {
     @Test
-    @Category({ValidatesRunner.class, UsesStatefulParDo.class})
+    @Category({
+      ValidatesRunner.class,
+      UsesStatefulParDo.class,
+      DataflowPortabilityApiUnsupported.class
+    })
     public void testBagStateCoderInference() {
       final String stateId = "foo";
       Coder<MyInteger> myIntegerCoder = MyIntegerCoder.of();
@@ -2479,7 +2500,11 @@ public class ParDoTest implements Serializable {
      * hooks and is only supported by the direct runner.
      */
     @Test
-    @Category({ValidatesRunner.class, UsesTimersInParDo.class})
+    @Category({
+      ValidatesRunner.class,
+      UsesTimersInParDo.class,
+      DataflowPortabilityApiUnsupported.class
+    })
     public void testEventTimeTimerBounded() throws Exception {
       final String timerId = "foo";
 
@@ -2514,7 +2539,11 @@ public class ParDoTest implements Serializable {
      * case where both GBK and the user code share a timer delivery bundle.
      */
     @Test
-    @Category({ValidatesRunner.class, UsesTimersInParDo.class})
+    @Category({
+      ValidatesRunner.class,
+      UsesTimersInParDo.class,
+      DataflowPortabilityApiUnsupported.class
+    })
     public void testGbkFollowedByUserTimers() throws Exception {
 
       DoFn<KV<String, Iterable<Integer>>, Integer> fn =
@@ -2549,7 +2578,11 @@ public class ParDoTest implements Serializable {
     }
 
     @Test
-    @Category({ValidatesRunner.class, UsesTimersInParDo.class})
+    @Category({
+      ValidatesRunner.class,
+      UsesTimersInParDo.class,
+      DataflowPortabilityApiUnsupported.class
+    })
     public void testEventTimeTimerAlignBounded() throws Exception {
       final String timerId = "foo";
 
@@ -2585,7 +2618,11 @@ public class ParDoTest implements Serializable {
     }
 
     @Test
-    @Category({ValidatesRunner.class, UsesTimersInParDo.class})
+    @Category({
+      ValidatesRunner.class,
+      UsesTimersInParDo.class,
+      DataflowPortabilityApiUnsupported.class
+    })
     public void testTimerReceivedInOriginalWindow() throws Exception {
       final String timerId = "foo";
 
@@ -2635,7 +2672,11 @@ public class ParDoTest implements Serializable {
      * #testEventTimeTimerBounded()}.
      */
     @Test
-    @Category({ValidatesRunner.class, UsesTimersInParDo.class})
+    @Category({
+      ValidatesRunner.class,
+      UsesTimersInParDo.class,
+      DataflowPortabilityApiUnsupported.class
+    })
     public void testEventTimeTimerAbsolute() throws Exception {
       final String timerId = "foo";
 
@@ -2716,7 +2757,11 @@ public class ParDoTest implements Serializable {
      * implementations that may GC in ways not simply governed by the watermark.
      */
     @Test
-    @Category({ValidatesRunner.class, UsesTimersInParDo.class})
+    @Category({
+      ValidatesRunner.class,
+      UsesTimersInParDo.class,
+      DataflowPortabilityApiUnsupported.class
+    })
     public void testEventTimeTimerMultipleKeys() throws Exception {
       final String timerId = "foo";
       final String stateId = "sizzle";
@@ -2778,7 +2823,11 @@ public class ParDoTest implements Serializable {
     }
 
     @Test
-    @Category({ValidatesRunner.class, UsesTimersInParDo.class})
+    @Category({
+      ValidatesRunner.class,
+      UsesTimersInParDo.class,
+      DataflowPortabilityApiUnsupported.class
+    })
     public void testAbsoluteProcessingTimeTimerRejected() throws Exception {
       final String timerId = "foo";
 
@@ -2807,7 +2856,11 @@ public class ParDoTest implements Serializable {
     }
 
     @Test
-    @Category({ValidatesRunner.class, UsesTimersInParDo.class})
+    @Category({
+      ValidatesRunner.class,
+      UsesTimersInParDo.class,
+      DataflowPortabilityApiUnsupported.class
+    })
     public void testOutOfBoundsEventTimeTimer() throws Exception {
       final String timerId = "foo";
 
@@ -3073,7 +3126,11 @@ public class ParDoTest implements Serializable {
     }
 
     @Test
-    @Category({ValidatesRunner.class, UsesTimersInParDo.class})
+    @Category({
+      ValidatesRunner.class,
+      UsesTimersInParDo.class,
+      DataflowPortabilityApiUnsupported.class
+    })
     public void testPipelineOptionsParameterOnTimer() {
       final String timerId = "thisTimer";
 
@@ -3124,7 +3181,11 @@ public class ParDoTest implements Serializable {
   @RunWith(JUnit4.class)
   public static class TimerCoderInferenceTests extends SharedTestBase implements Serializable {
     @Test
-    @Category({ValidatesRunner.class, UsesStatefulParDo.class})
+    @Category({
+      ValidatesRunner.class,
+      UsesStatefulParDo.class,
+      DataflowPortabilityApiUnsupported.class
+    })
     public void testValueStateCoderInference() {
       final String stateId = "foo";
       MyIntegerCoder myIntegerCoder = MyIntegerCoder.of();
@@ -3190,7 +3251,11 @@ public class ParDoTest implements Serializable {
     }
 
     @Test
-    @Category({ValidatesRunner.class, UsesStatefulParDo.class})
+    @Category({
+      ValidatesRunner.class,
+      UsesStatefulParDo.class,
+      DataflowPortabilityApiUnsupported.class
+    })
     public void testValueStateCoderInferenceFromInputCoder() {
       final String stateId = "foo";
       MyIntegerCoder myIntegerCoder = MyIntegerCoder.of();
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
index d960254..9abaf50 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
@@ -33,6 +33,7 @@ import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.testing.DataflowPortabilityApiUnsupported;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -280,7 +281,11 @@ public class SplittableDoFnTest implements Serializable {
   }
 
   @Test
-  @Category({ValidatesRunner.class, UsesBoundedSplittableParDo.class})
+  @Category({
+    ValidatesRunner.class,
+    UsesBoundedSplittableParDo.class,
+    DataflowPortabilityApiUnsupported.class
+  })
   public void testOutputAfterCheckpointBounded() {
     testOutputAfterCheckpoint(IsBounded.BOUNDED);
   }
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
index dc3bec8..f70ca9a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
@@ -46,6 +46,7 @@ import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.testing.DataflowPortabilityApiUnsupported;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -609,7 +610,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(ValidatesRunner.class)
+  @Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
   public void testMultimapSideInputWithNonDeterministicKeyCoder() {
 
     final PCollectionView<Map<String, Iterable<Integer>>> view =
@@ -745,7 +746,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(ValidatesRunner.class)
+  @Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
   public void testWindowedMultimapSideInputWithNonDeterministicKeyCoder() {
 
     final PCollectionView<Map<String, Iterable<Integer>>> view =
@@ -828,7 +829,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(ValidatesRunner.class)
+  @Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
   public void testEmptyMultimapSideInputWithNonDeterministicKeyCoder() throws Exception {
 
     final PCollectionView<Map<String, Iterable<Integer>>> view =
@@ -975,7 +976,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(ValidatesRunner.class)
+  @Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
   public void testMapSideInputWithNonDeterministicKeyCoder() {
 
     final PCollectionView<Map<String, Integer>> view =
@@ -1097,7 +1098,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(ValidatesRunner.class)
+  @Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
   public void testWindowedMapSideInputWithNonDeterministicKeyCoder() {
 
     final PCollectionView<Map<String, Integer>> view =
@@ -1175,7 +1176,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(ValidatesRunner.class)
+  @Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
   public void testEmptyMapSideInputWithNonDeterministicKeyCoder() throws Exception {
 
     final PCollectionView<Map<String, Integer>> view =
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
index 8985051..6b1be69 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
@@ -51,6 +51,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.testing.DataflowPortabilityApiUnsupported;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.UsesCustomWindowMerging;
@@ -386,7 +387,7 @@ public class WindowTest implements Serializable {
   }
 
   @Test
-  @Category(ValidatesRunner.class)
+  @Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
   public void testNoWindowFnDoesNotReassignWindows() {
     pipeline.enableAbandonedNodeEnforcement(true);