You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2018/10/30 17:35:31 UTC
[beam] branch master updated: [BEAM-5781] Enable ValidatesRunner
tests for protable java (#6881)
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 71c6de3 [BEAM-5781] Enable ValidatesRunner tests for protable java (#6881)
71c6de3 is described below
commit 71c6de321fb8648146f7bd877f074a0a7617c6e0
Author: Boyuan Zhang <36...@users.noreply.github.com>
AuthorDate: Tue Oct 30 10:35:24 2018 -0700
[BEAM-5781] Enable ValidatesRunner tests for protable java (#6881)
---
..._ValidatesRunner_PortabilityApi_Dataflow.groovy | 51 ++++++++++++
runners/google-cloud-dataflow-java/build.gradle | 54 ++++++++++++
.../org/apache/beam/sdk/io/CountingSourceTest.java | 5 +-
.../org/apache/beam/sdk/metrics/MetricsTest.java | 8 +-
.../org/apache/beam/sdk/testing/PAssertTest.java | 6 +-
.../apache/beam/sdk/transforms/CombineFnsTest.java | 7 +-
.../apache/beam/sdk/transforms/CombineTest.java | 15 ++--
.../apache/beam/sdk/transforms/GroupByKeyTest.java | 3 +-
.../org/apache/beam/sdk/transforms/ParDoTest.java | 97 ++++++++++++++++++----
.../beam/sdk/transforms/SplittableDoFnTest.java | 7 +-
.../org/apache/beam/sdk/transforms/ViewTest.java | 13 +--
.../beam/sdk/transforms/windowing/WindowTest.java | 3 +-
12 files changed, 228 insertions(+), 41 deletions(-)
diff --git a/.test-infra/jenkins/job_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow.groovy b/.test-infra/jenkins/job_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow.groovy
new file mode 100644
index 0000000..6f7264e
--- /dev/null
+++ b/.test-infra/jenkins/job_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow.groovy
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import CommonJobProperties as commonJobProperties
+import PostcommitJobBuilder
+
+
+// This job runs the suite of ValidatesRunner tests against the Dataflow
+// runner.
+PostcommitJobBuilder.postCommitJob('beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow_Gradle',
+ 'Run Dataflow PortabilityApi ValidatesRunner', 'Google Cloud Dataflow Runner PortabilityApi ValidatesRunner Tests', this) {
+
+ description('Runs the ValidatesRunner suite on the Dataflow PortabilityApi runner.')
+
+ // Set common parameters. Sets a 3 hour timeout.
+ commonJobProperties.setTopLevelMainJobProperties(delegate, 'master', 400)
+
+ // Publish all test results to Jenkins
+ publishers {
+ archiveJunit('**/build/test-results/**/*.xml')
+ }
+
+ // Gradle goals for this job.
+ steps {
+ gradle {
+ rootBuildScriptDir(commonJobProperties.checkoutDir)
+ tasks(':beam-runners-google-cloud-dataflow-java:validatesRunnerPortabilityApi')
+ // Increase parallel worker threads above processor limit since most time is
+ // spent waiting on Dataflow jobs. ValidatesRunner tests on Dataflow are slow
+ // because each one launches a Dataflow job with about 3 mins of overhead.
+ // 3 x num_cores strikes a good balance between maxing out parallelism without
+ // overloading the machines.
+ commonJobProperties.setGradleSwitches(delegate, 3 * Runtime.runtime.availableProcessors())
+ }
+ }
+}
diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle
index 00edec7..ab18b53 100644
--- a/runners/google-cloud-dataflow-java/build.gradle
+++ b/runners/google-cloud-dataflow-java/build.gradle
@@ -131,6 +131,14 @@ def commonExcludeCategories = [
'org.apache.beam.sdk.testing.UsesMetricsPusher',
]
+def fnApiWorkerExcludeCategories = [
+ 'org.apache.beam.sdk.testing.UsesCustomWindowMerging',
+ 'org.apache.beam.sdk.testing.DataflowPortabilityApiUnsupported',
+ 'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders',
+ 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo',
+ 'org.apache.beam.sdk.testing.UsesSchema'
+]
+
// For the following test tasks using legacy worker, set workerHarnessContainerImage to empty to
// make Dataflow pick up the non-versioned container image, which handles a staged worker jar.
task validatesRunnerLegacyWorkerTest(type: Test) {
@@ -172,12 +180,58 @@ task buildAndPushDockerContainer() {
}
}
+task validatesRunnerFnApiWorkerTest(type: Test) {
+ group = "Verification"
+ dependsOn ":beam-runners-google-cloud-dataflow-java-fn-api-worker:shadowJar"
+ dependsOn buildAndPushDockerContainer
+
+ systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
+ "--runner=TestDataflowRunner",
+ "--project=${dataflowProject}",
+ "--tempRoot=${dataflowPostCommitTempRoot}",
+ "--dataflowWorkerJar=${dataflowFnApiWorkerJar}",
+ "--workerHarnessContainerImage=${dockerImageContainer}:${dockerTag}",
+ "--experiments=beam_fn_api",
+
+ ])
+
+ // Increase test parallelism up to the number of Gradle workers. By default this is equal
+ // to the number of CPU cores, but can be increased by setting --max-workers=N.
+ maxParallelForks Integer.MAX_VALUE
+ classpath = configurations.validatesRunner
+ testClassesDirs = files(project(":beam-sdks-java-core").sourceSets.test.output.classesDirs)
+ useJUnit {
+ includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
+ commonExcludeCategories.each {
+ excludeCategories it
+ }
+ fnApiWorkerExcludeCategories.each {
+ excludeCategories it
+ }
+ }
+}
+
task validatesRunner {
group = "Verification"
description "Validates Dataflow runner"
dependsOn validatesRunnerLegacyWorkerTest
}
+task validatesRunnerPortabilityApi {
+ group = "Verification"
+ description "Validates Dataflow PortabilityApi runner"
+ dependsOn validatesRunnerFnApiWorkerTest
+ // Clean up docker image
+ doLast {
+ exec {
+ commandLine "docker", "rmi", "${dockerImageName}"
+ }
+ exec {
+ commandLine "gcloud", "--quiet", "container", "images", "delete", "${dockerImageName}"
+ }
+ }
+}
+
task googleCloudPlatformLegacyWorkerIntegrationTest(type: Test) {
group = "Verification"
dependsOn ":beam-runners-google-cloud-dataflow-java-legacy-worker:shadowJar"
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
index e0224fb..8cacd67 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
@@ -30,6 +30,7 @@ import org.apache.beam.sdk.io.CountingSource.CounterMark;
import org.apache.beam.sdk.io.CountingSource.UnboundedCountingSource;
import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.DataflowPortabilityApiUnsupported;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -91,7 +92,7 @@ public class CountingSourceTest {
}
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
public void testBoundedSourceSplits() throws Exception {
long numElements = 1000;
long numSplits = 10;
@@ -213,7 +214,7 @@ public class CountingSourceTest {
}
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
public void testUnboundedSourceSplits() throws Exception {
long numElements = 1000;
int numSplits = 10;
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
index 284cba7..6f7863d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
@@ -29,6 +29,7 @@ import static org.mockito.Mockito.when;
import java.io.Serializable;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.testing.DataflowPortabilityApiUnsupported;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.UsesAttemptedMetrics;
@@ -242,7 +243,12 @@ public class MetricsTest implements Serializable {
assertAllMetrics(metrics, true);
}
- @Category({ValidatesRunner.class, UsesCommittedMetrics.class, UsesCounterMetrics.class})
+ @Category({
+ ValidatesRunner.class,
+ UsesCommittedMetrics.class,
+ UsesCounterMetrics.class,
+ DataflowPortabilityApiUnsupported.class
+ })
@Test
public void testCommittedCounterMetrics() {
PipelineResult result = runPipelineWithMetrics();
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
index bedbb5d..bf7ee69 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
@@ -165,7 +165,7 @@ public class PAssertTest implements Serializable {
* the {@link PCollection} to be serializable.
*/
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
public void testContainsInAnyOrderNotSerializable() throws Exception {
PCollection<NotSerializableObject> pcollection =
pipeline.apply(
@@ -183,7 +183,7 @@ public class PAssertTest implements Serializable {
* arbitrary {@link SerializableFunction}, though.
*/
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
public void testSerializablePredicate() throws Exception {
PCollection<NotSerializableObject> pcollection =
pipeline.apply(
@@ -204,7 +204,7 @@ public class PAssertTest implements Serializable {
* arbitrary {@link SerializableFunction}, though.
*/
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
public void testWindowedSerializablePredicate() throws Exception {
PCollection<NotSerializableObject> pcollection =
pipeline
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
index 94bb159..c2c7a84 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
@@ -37,6 +37,7 @@ import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.testing.DataflowPortabilityApiUnsupported;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.ValidatesRunner;
@@ -96,7 +97,7 @@ public class CombineFnsTest {
}
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
public void testComposedCombine() {
p.getCoderRegistry().registerCoderForClass(UserString.class, UserStringCoder.of());
@@ -146,7 +147,7 @@ public class CombineFnsTest {
}
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
public void testComposedCombineWithContext() {
p.getCoderRegistry().registerCoderForClass(UserString.class, UserStringCoder.of());
@@ -207,7 +208,7 @@ public class CombineFnsTest {
}
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
public void testComposedCombineNullValues() {
p.getCoderRegistry()
.registerCoderForClass(UserString.class, NullableCoder.of(UserStringCoder.of()));
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index bb7414e..2d825dc 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -57,6 +57,7 @@ import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.testing.DataflowPortabilityApiUnsupported;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -645,7 +646,7 @@ public class CombineTest implements Serializable {
@RunWith(JUnit4.class)
public static class BasicTests extends SharedTestBase {
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
@SuppressWarnings({"rawtypes", "unchecked"})
public void testSimpleCombine() {
runTestSimpleCombine(
@@ -655,7 +656,7 @@ public class CombineTest implements Serializable {
}
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
public void testSimpleCombineEmpty() {
runTestSimpleCombine(EMPTY_TABLE, 0, Collections.emptyList());
}
@@ -692,7 +693,7 @@ public class CombineTest implements Serializable {
}
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
public void testHotKeyCombining() {
PCollection<KV<String, Integer>> input =
copy(
@@ -994,7 +995,7 @@ public class CombineTest implements Serializable {
@RunWith(JUnit4.class)
public static class WindowingTests extends SharedTestBase implements Serializable {
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
public void testFixedWindowsCombine() {
PCollection<KV<String, Integer>> input =
pipeline
@@ -1277,7 +1278,7 @@ public class CombineTest implements Serializable {
}
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
public void testWindowedCombineEmpty() {
PCollection<Double> mean =
pipeline
@@ -1398,7 +1399,7 @@ public class CombineTest implements Serializable {
@RunWith(JUnit4.class)
public static class AccumulationTests extends SharedTestBase {
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
public void testAccumulatingCombine() {
runTestAccumulatingCombine(
Arrays.asList(KV.of("a", 1), KV.of("a", 1), KV.of("a", 4), KV.of("b", 1), KV.of("b", 13)),
@@ -1407,7 +1408,7 @@ public class CombineTest implements Serializable {
}
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
public void testAccumulatingCombineEmpty() {
runTestAccumulatingCombine(EMPTY_TABLE, 0.0, Collections.emptyList());
}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
index e81d2b9..2049e10 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
@@ -48,6 +48,7 @@ import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.MapCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.testing.DataflowPortabilityApiUnsupported;
import org.apache.beam.sdk.testing.LargeKeys;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
@@ -308,7 +309,7 @@ public class GroupByKeyTest implements Serializable {
/** Verify that runners correctly hash/group on the encoded value and not the value itself. */
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
public void testGroupByKeyWithBadEqualsHashCode() throws Exception {
final int numValues = 10;
final int numKeys = 5;
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index f7e9b79..5cd4382 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -79,6 +79,7 @@ import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.testing.DataflowPortabilityApiUnsupported;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -1521,7 +1522,11 @@ public class ParDoTest implements Serializable {
}
@Test
- @Category({ValidatesRunner.class, UsesStatefulParDo.class})
+ @Category({
+ ValidatesRunner.class,
+ UsesStatefulParDo.class,
+ DataflowPortabilityApiUnsupported.class
+ })
public void testCoderInferenceOfList() {
final String stateId = "foo";
MyIntegerCoder myIntegerCoder = MyIntegerCoder.of();
@@ -1561,7 +1566,11 @@ public class ParDoTest implements Serializable {
}
@Test
- @Category({ValidatesRunner.class, UsesStatefulParDo.class})
+ @Category({
+ ValidatesRunner.class,
+ UsesStatefulParDo.class,
+ DataflowPortabilityApiUnsupported.class
+ })
public void testValueStateFixedWindows() {
final String stateId = "foo";
@@ -1660,7 +1669,11 @@ public class ParDoTest implements Serializable {
}
@Test
- @Category({ValidatesRunner.class, UsesStatefulParDo.class})
+ @Category({
+ ValidatesRunner.class,
+ UsesStatefulParDo.class,
+ DataflowPortabilityApiUnsupported.class
+ })
public void testValueStateTaggedOutput() {
final String stateId = "foo";
@@ -1711,7 +1724,11 @@ public class ParDoTest implements Serializable {
}
@Test
- @Category({ValidatesRunner.class, UsesStatefulParDo.class})
+ @Category({
+ ValidatesRunner.class,
+ UsesStatefulParDo.class,
+ DataflowPortabilityApiUnsupported.class
+ })
public void testBagState() {
final String stateId = "foo";
@@ -1997,7 +2014,11 @@ public class ParDoTest implements Serializable {
@RunWith(JUnit4.class)
public static class StateCoderInferenceTests extends SharedTestBase implements Serializable {
@Test
- @Category({ValidatesRunner.class, UsesStatefulParDo.class})
+ @Category({
+ ValidatesRunner.class,
+ UsesStatefulParDo.class,
+ DataflowPortabilityApiUnsupported.class
+ })
public void testBagStateCoderInference() {
final String stateId = "foo";
Coder<MyInteger> myIntegerCoder = MyIntegerCoder.of();
@@ -2479,7 +2500,11 @@ public class ParDoTest implements Serializable {
* hooks and is only supported by the direct runner.
*/
@Test
- @Category({ValidatesRunner.class, UsesTimersInParDo.class})
+ @Category({
+ ValidatesRunner.class,
+ UsesTimersInParDo.class,
+ DataflowPortabilityApiUnsupported.class
+ })
public void testEventTimeTimerBounded() throws Exception {
final String timerId = "foo";
@@ -2514,7 +2539,11 @@ public class ParDoTest implements Serializable {
* case where both GBK and the user code share a timer delivery bundle.
*/
@Test
- @Category({ValidatesRunner.class, UsesTimersInParDo.class})
+ @Category({
+ ValidatesRunner.class,
+ UsesTimersInParDo.class,
+ DataflowPortabilityApiUnsupported.class
+ })
public void testGbkFollowedByUserTimers() throws Exception {
DoFn<KV<String, Iterable<Integer>>, Integer> fn =
@@ -2549,7 +2578,11 @@ public class ParDoTest implements Serializable {
}
@Test
- @Category({ValidatesRunner.class, UsesTimersInParDo.class})
+ @Category({
+ ValidatesRunner.class,
+ UsesTimersInParDo.class,
+ DataflowPortabilityApiUnsupported.class
+ })
public void testEventTimeTimerAlignBounded() throws Exception {
final String timerId = "foo";
@@ -2585,7 +2618,11 @@ public class ParDoTest implements Serializable {
}
@Test
- @Category({ValidatesRunner.class, UsesTimersInParDo.class})
+ @Category({
+ ValidatesRunner.class,
+ UsesTimersInParDo.class,
+ DataflowPortabilityApiUnsupported.class
+ })
public void testTimerReceivedInOriginalWindow() throws Exception {
final String timerId = "foo";
@@ -2635,7 +2672,11 @@ public class ParDoTest implements Serializable {
* #testEventTimeTimerBounded()}.
*/
@Test
- @Category({ValidatesRunner.class, UsesTimersInParDo.class})
+ @Category({
+ ValidatesRunner.class,
+ UsesTimersInParDo.class,
+ DataflowPortabilityApiUnsupported.class
+ })
public void testEventTimeTimerAbsolute() throws Exception {
final String timerId = "foo";
@@ -2716,7 +2757,11 @@ public class ParDoTest implements Serializable {
* implementations that may GC in ways not simply governed by the watermark.
*/
@Test
- @Category({ValidatesRunner.class, UsesTimersInParDo.class})
+ @Category({
+ ValidatesRunner.class,
+ UsesTimersInParDo.class,
+ DataflowPortabilityApiUnsupported.class
+ })
public void testEventTimeTimerMultipleKeys() throws Exception {
final String timerId = "foo";
final String stateId = "sizzle";
@@ -2778,7 +2823,11 @@ public class ParDoTest implements Serializable {
}
@Test
- @Category({ValidatesRunner.class, UsesTimersInParDo.class})
+ @Category({
+ ValidatesRunner.class,
+ UsesTimersInParDo.class,
+ DataflowPortabilityApiUnsupported.class
+ })
public void testAbsoluteProcessingTimeTimerRejected() throws Exception {
final String timerId = "foo";
@@ -2807,7 +2856,11 @@ public class ParDoTest implements Serializable {
}
@Test
- @Category({ValidatesRunner.class, UsesTimersInParDo.class})
+ @Category({
+ ValidatesRunner.class,
+ UsesTimersInParDo.class,
+ DataflowPortabilityApiUnsupported.class
+ })
public void testOutOfBoundsEventTimeTimer() throws Exception {
final String timerId = "foo";
@@ -3073,7 +3126,11 @@ public class ParDoTest implements Serializable {
}
@Test
- @Category({ValidatesRunner.class, UsesTimersInParDo.class})
+ @Category({
+ ValidatesRunner.class,
+ UsesTimersInParDo.class,
+ DataflowPortabilityApiUnsupported.class
+ })
public void testPipelineOptionsParameterOnTimer() {
final String timerId = "thisTimer";
@@ -3124,7 +3181,11 @@ public class ParDoTest implements Serializable {
@RunWith(JUnit4.class)
public static class TimerCoderInferenceTests extends SharedTestBase implements Serializable {
@Test
- @Category({ValidatesRunner.class, UsesStatefulParDo.class})
+ @Category({
+ ValidatesRunner.class,
+ UsesStatefulParDo.class,
+ DataflowPortabilityApiUnsupported.class
+ })
public void testValueStateCoderInference() {
final String stateId = "foo";
MyIntegerCoder myIntegerCoder = MyIntegerCoder.of();
@@ -3190,7 +3251,11 @@ public class ParDoTest implements Serializable {
}
@Test
- @Category({ValidatesRunner.class, UsesStatefulParDo.class})
+ @Category({
+ ValidatesRunner.class,
+ UsesStatefulParDo.class,
+ DataflowPortabilityApiUnsupported.class
+ })
public void testValueStateCoderInferenceFromInputCoder() {
final String stateId = "foo";
MyIntegerCoder myIntegerCoder = MyIntegerCoder.of();
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
index d960254..9abaf50 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
@@ -33,6 +33,7 @@ import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.testing.DataflowPortabilityApiUnsupported;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -280,7 +281,11 @@ public class SplittableDoFnTest implements Serializable {
}
@Test
- @Category({ValidatesRunner.class, UsesBoundedSplittableParDo.class})
+ @Category({
+ ValidatesRunner.class,
+ UsesBoundedSplittableParDo.class,
+ DataflowPortabilityApiUnsupported.class
+ })
public void testOutputAfterCheckpointBounded() {
testOutputAfterCheckpoint(IsBounded.BOUNDED);
}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
index dc3bec8..f70ca9a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
@@ -46,6 +46,7 @@ import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.testing.DataflowPortabilityApiUnsupported;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -609,7 +610,7 @@ public class ViewTest implements Serializable {
}
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
public void testMultimapSideInputWithNonDeterministicKeyCoder() {
final PCollectionView<Map<String, Iterable<Integer>>> view =
@@ -745,7 +746,7 @@ public class ViewTest implements Serializable {
}
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
public void testWindowedMultimapSideInputWithNonDeterministicKeyCoder() {
final PCollectionView<Map<String, Iterable<Integer>>> view =
@@ -828,7 +829,7 @@ public class ViewTest implements Serializable {
}
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
public void testEmptyMultimapSideInputWithNonDeterministicKeyCoder() throws Exception {
final PCollectionView<Map<String, Iterable<Integer>>> view =
@@ -975,7 +976,7 @@ public class ViewTest implements Serializable {
}
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
public void testMapSideInputWithNonDeterministicKeyCoder() {
final PCollectionView<Map<String, Integer>> view =
@@ -1097,7 +1098,7 @@ public class ViewTest implements Serializable {
}
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
public void testWindowedMapSideInputWithNonDeterministicKeyCoder() {
final PCollectionView<Map<String, Integer>> view =
@@ -1175,7 +1176,7 @@ public class ViewTest implements Serializable {
}
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
public void testEmptyMapSideInputWithNonDeterministicKeyCoder() throws Exception {
final PCollectionView<Map<String, Integer>> view =
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
index 8985051..6b1be69 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
@@ -51,6 +51,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.testing.DataflowPortabilityApiUnsupported;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.UsesCustomWindowMerging;
@@ -386,7 +387,7 @@ public class WindowTest implements Serializable {
}
@Test
- @Category(ValidatesRunner.class)
+ @Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class})
public void testNoWindowFnDoesNotReassignWindows() {
pipeline.enableAbandonedNodeEnforcement(true);