You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/03/27 18:15:24 UTC

[1/4] beam git commit: Remove deprecated RunnableOnService

Repository: beam
Updated Branches:
  refs/heads/master 61fad2a66 -> 1d36d32fe


Remove deprecated RunnableOnService


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/488dc99d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/488dc99d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/488dc99d

Branch: refs/heads/master
Commit: 488dc99de199f4e2f66c167a7d70448d5f3eabf9
Parents: a58a741
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Mar 23 13:40:39 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Mar 27 09:38:55 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/testing/RunnableOnService.java     | 24 --------------------
 .../beam/sdk/testing/ValidatesRunner.java       |  2 +-
 2 files changed, 1 insertion(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/488dc99d/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/RunnableOnService.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/RunnableOnService.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/RunnableOnService.java
deleted file mode 100644
index dd8fd13..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/RunnableOnService.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.testing;
-
-/**
- * @deprecated use {@link ValidatesRunner}
- */
-@Deprecated
-public interface RunnableOnService extends NeedsRunner {}

http://git-wip-us.apache.org/repos/asf/beam/blob/488dc99d/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/ValidatesRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/ValidatesRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/ValidatesRunner.java
index d33b3a6..09e62dc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/ValidatesRunner.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/ValidatesRunner.java
@@ -20,4 +20,4 @@ package org.apache.beam.sdk.testing;
 /**
  * Category tag for tests which validate that a Beam runner is correctly implemented.
  */
-public interface ValidatesRunner extends RunnableOnService {}
+public interface ValidatesRunner extends NeedsRunner {}


[4/4] beam git commit: This closes #2157: Port Java modules from RunnableOnService to ValidatesRunner

Posted by ke...@apache.org.
This closes #2157: Port Java modules from RunnableOnService to ValidatesRunner

  Remove deprecated RunnableOnService
  Port Java modules from RunnableOnService to ValidatesRunner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1d36d32f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1d36d32f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1d36d32f

Branch: refs/heads/master
Commit: 1d36d32fea68e067a631a15143377e58f75da7d4
Parents: 61fad2a 488dc99
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Mar 27 11:14:21 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Mar 27 11:14:21 2017 -0700

----------------------------------------------------------------------
 examples/java/pom.xml                           |  2 +-
 .../org/apache/beam/examples/WordCountTest.java |  4 +-
 .../beam/examples/complete/TfIdfTest.java       |  4 +-
 .../complete/TopWikipediaSessionsTest.java      |  4 +-
 .../examples/cookbook/DistinctExampleTest.java  |  6 +-
 .../examples/cookbook/JoinExamplesTest.java     |  4 +-
 .../examples/cookbook/TriggerExampleTest.java   |  4 +-
 examples/java8/pom.xml                          |  2 +-
 .../examples/complete/game/GameStatsTest.java   |  4 +-
 .../complete/game/HourlyTeamScoreTest.java      |  4 +-
 .../examples/complete/game/UserScoreTest.java   |  8 +--
 runners/apex/pom.xml                            |  4 +-
 runners/flink/runner/pom.xml                    |  6 +-
 .../runners/dataflow/testing/package-info.java  |  2 +-
 runners/pom.xml                                 |  2 +-
 runners/spark/pom.xml                           |  6 +-
 .../beam/sdk/testing/RunnableOnService.java     | 38 -----------
 .../apache/beam/sdk/testing/TestPipeline.java   |  6 +-
 .../beam/sdk/testing/ValidatesRunner.java       | 23 +++++++
 .../java/org/apache/beam/sdk/PipelineTest.java  | 10 +--
 .../java/org/apache/beam/sdk/io/AvroIOTest.java |  8 +--
 .../io/BoundedReadFromUnboundedSourceTest.java  |  8 +--
 .../apache/beam/sdk/io/CountingInputTest.java   | 14 ++--
 .../apache/beam/sdk/io/CountingSourceTest.java  | 14 ++--
 .../org/apache/beam/sdk/io/PubsubIOTest.java    |  6 +-
 .../java/org/apache/beam/sdk/io/ReadTest.java   |  6 +-
 .../java/org/apache/beam/sdk/io/TextIOTest.java | 10 +--
 .../apache/beam/sdk/metrics/MetricsTest.java    |  6 +-
 .../apache/beam/sdk/testing/PAssertTest.java    | 34 +++++-----
 .../beam/sdk/testing/TestPipelineTest.java      | 24 +++----
 .../sdk/transforms/ApproximateUniqueTest.java   |  4 +-
 .../beam/sdk/transforms/CombineFnsTest.java     |  8 +--
 .../apache/beam/sdk/transforms/CombineTest.java | 44 ++++++------
 .../apache/beam/sdk/transforms/CountTest.java   | 10 +--
 .../apache/beam/sdk/transforms/CreateTest.java  | 18 ++---
 .../beam/sdk/transforms/DistinctTest.java       |  8 +--
 .../apache/beam/sdk/transforms/FilterTest.java  | 16 ++---
 .../apache/beam/sdk/transforms/FlattenTest.java | 30 ++++-----
 .../beam/sdk/transforms/GroupByKeyTest.java     | 14 ++--
 .../apache/beam/sdk/transforms/KeysTest.java    |  6 +-
 .../apache/beam/sdk/transforms/KvSwapTest.java  |  6 +-
 .../beam/sdk/transforms/MapElementsTest.java    |  4 +-
 .../beam/sdk/transforms/ParDoLifecycleTest.java | 10 +--
 .../apache/beam/sdk/transforms/ParDoTest.java   | 68 +++++++++----------
 .../beam/sdk/transforms/PartitionTest.java      |  4 +-
 .../apache/beam/sdk/transforms/SampleTest.java  | 14 ++--
 .../beam/sdk/transforms/SplittableDoFnTest.java | 14 ++--
 .../beam/sdk/transforms/ToStringTest.java       | 13 ++--
 .../apache/beam/sdk/transforms/ValuesTest.java  |  6 +-
 .../apache/beam/sdk/transforms/ViewTest.java    | 70 ++++++++++----------
 .../beam/sdk/transforms/WithTimestampsTest.java |  8 +--
 .../sdk/transforms/join/CoGroupByKeyTest.java   | 10 +--
 .../sdk/transforms/windowing/WindowTest.java    | 10 +--
 .../sdk/transforms/windowing/WindowingTest.java | 10 +--
 .../beam/sdk/util/ReifyTimestampsTest.java      |  6 +-
 .../org/apache/beam/sdk/util/ReshuffleTest.java | 16 ++---
 .../beam/sdk/values/PCollectionTupleTest.java   |  4 +-
 .../org/apache/beam/sdk/values/PDoneTest.java   |  4 +-
 .../io/elasticsearch/ElasticsearchIOTest.java   |  8 +--
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     | 18 ++---
 .../sdk/io/gcp/datastore/DatastoreV1Test.java   |  8 +--
 .../org/apache/beam/sdk/io/mqtt/MqttIOTest.java |  9 ++-
 .../beam/sdk/transforms/FilterJava8Test.java    |  8 +--
 .../beam/sdk/transforms/WithKeysJava8Test.java  |  4 +-
 .../sdk/transforms/WithTimestampsJava8Test.java |  4 +-
 65 files changed, 375 insertions(+), 392 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/1d36d32f/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
----------------------------------------------------------------------


[3/4] beam git commit: Port Java modules from RunnableOnService to ValidatesRunner

Posted by ke...@apache.org.
Port Java modules from RunnableOnService to ValidatesRunner

Python is already using ValidatesRunner, as per dev list consensus.

A deprecated RunnableOnService is left in place as a superclass so anything
remaining that scrapes for it can be ported gradually.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a58a7412
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a58a7412
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a58a7412

Branch: refs/heads/master
Commit: a58a74124893c602e311aef0a9afb51f95b89828
Parents: 026aec8
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Mar 3 15:21:51 2017 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Mar 27 09:38:55 2017 -0700

----------------------------------------------------------------------
 examples/java/pom.xml                           |  2 +-
 .../org/apache/beam/examples/WordCountTest.java |  4 +-
 .../beam/examples/complete/TfIdfTest.java       |  4 +-
 .../complete/TopWikipediaSessionsTest.java      |  4 +-
 .../examples/cookbook/DistinctExampleTest.java  |  6 +-
 .../examples/cookbook/JoinExamplesTest.java     |  4 +-
 .../examples/cookbook/TriggerExampleTest.java   |  4 +-
 examples/java8/pom.xml                          |  2 +-
 .../examples/complete/game/GameStatsTest.java   |  4 +-
 .../complete/game/HourlyTeamScoreTest.java      |  4 +-
 .../examples/complete/game/UserScoreTest.java   |  8 +--
 runners/apex/pom.xml                            |  4 +-
 runners/flink/runner/pom.xml                    |  6 +-
 .../runners/dataflow/testing/package-info.java  |  2 +-
 runners/pom.xml                                 |  2 +-
 runners/spark/pom.xml                           |  6 +-
 .../beam/sdk/testing/RunnableOnService.java     | 18 +----
 .../apache/beam/sdk/testing/TestPipeline.java   |  6 +-
 .../beam/sdk/testing/ValidatesRunner.java       | 23 +++++++
 .../java/org/apache/beam/sdk/PipelineTest.java  | 10 +--
 .../java/org/apache/beam/sdk/io/AvroIOTest.java |  8 +--
 .../io/BoundedReadFromUnboundedSourceTest.java  |  8 +--
 .../apache/beam/sdk/io/CountingInputTest.java   | 14 ++--
 .../apache/beam/sdk/io/CountingSourceTest.java  | 14 ++--
 .../org/apache/beam/sdk/io/PubsubIOTest.java    |  6 +-
 .../java/org/apache/beam/sdk/io/ReadTest.java   |  6 +-
 .../java/org/apache/beam/sdk/io/TextIOTest.java | 10 +--
 .../apache/beam/sdk/metrics/MetricsTest.java    |  6 +-
 .../apache/beam/sdk/testing/PAssertTest.java    | 34 +++++-----
 .../beam/sdk/testing/TestPipelineTest.java      | 24 +++----
 .../sdk/transforms/ApproximateUniqueTest.java   |  4 +-
 .../beam/sdk/transforms/CombineFnsTest.java     |  8 +--
 .../apache/beam/sdk/transforms/CombineTest.java | 44 ++++++------
 .../apache/beam/sdk/transforms/CountTest.java   | 10 +--
 .../apache/beam/sdk/transforms/CreateTest.java  | 18 ++---
 .../beam/sdk/transforms/DistinctTest.java       |  8 +--
 .../apache/beam/sdk/transforms/FilterTest.java  | 16 ++---
 .../apache/beam/sdk/transforms/FlattenTest.java | 30 ++++-----
 .../beam/sdk/transforms/GroupByKeyTest.java     | 14 ++--
 .../apache/beam/sdk/transforms/KeysTest.java    |  6 +-
 .../apache/beam/sdk/transforms/KvSwapTest.java  |  6 +-
 .../beam/sdk/transforms/MapElementsTest.java    |  4 +-
 .../beam/sdk/transforms/ParDoLifecycleTest.java | 10 +--
 .../apache/beam/sdk/transforms/ParDoTest.java   | 68 +++++++++----------
 .../beam/sdk/transforms/PartitionTest.java      |  4 +-
 .../apache/beam/sdk/transforms/SampleTest.java  | 14 ++--
 .../beam/sdk/transforms/SplittableDoFnTest.java | 14 ++--
 .../beam/sdk/transforms/ToStringTest.java       | 13 ++--
 .../apache/beam/sdk/transforms/ValuesTest.java  |  6 +-
 .../apache/beam/sdk/transforms/ViewTest.java    | 70 ++++++++++----------
 .../beam/sdk/transforms/WithTimestampsTest.java |  8 +--
 .../sdk/transforms/join/CoGroupByKeyTest.java   | 10 +--
 .../sdk/transforms/windowing/WindowTest.java    | 10 +--
 .../sdk/transforms/windowing/WindowingTest.java | 10 +--
 .../beam/sdk/util/ReifyTimestampsTest.java      |  6 +-
 .../org/apache/beam/sdk/util/ReshuffleTest.java | 16 ++---
 .../beam/sdk/values/PCollectionTupleTest.java   |  4 +-
 .../org/apache/beam/sdk/values/PDoneTest.java   |  4 +-
 .../io/elasticsearch/ElasticsearchIOTest.java   |  8 +--
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     | 18 ++---
 .../sdk/io/gcp/datastore/DatastoreV1Test.java   |  8 +--
 .../org/apache/beam/sdk/io/mqtt/MqttIOTest.java |  9 ++-
 .../beam/sdk/transforms/FilterJava8Test.java    |  8 +--
 .../beam/sdk/transforms/WithKeysJava8Test.java  |  4 +-
 .../sdk/transforms/WithTimestampsJava8Test.java |  4 +-
 65 files changed, 377 insertions(+), 370 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/examples/java/pom.xml
----------------------------------------------------------------------
diff --git a/examples/java/pom.xml b/examples/java/pom.xml
index 1618f60..2b18130 100644
--- a/examples/java/pom.xml
+++ b/examples/java/pom.xml
@@ -539,7 +539,7 @@
 
     <!--
       For testing the example itself, use the direct runner. This is separate from
-      the use of RunnableOnService tests for testing a particular runner.
+      the use of ValidatesRunner tests for testing a particular runner.
     -->
     <dependency>
       <groupId>org.apache.beam</groupId>

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/examples/java/src/test/java/org/apache/beam/examples/WordCountTest.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/WordCountTest.java b/examples/java/src/test/java/org/apache/beam/examples/WordCountTest.java
index 0f8e4dc..54ce1e3 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/WordCountTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/WordCountTest.java
@@ -24,8 +24,8 @@ import org.apache.beam.examples.WordCount.ExtractWordsFn;
 import org.apache.beam.examples.WordCount.FormatAsTextFn;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFnTester;
@@ -73,7 +73,7 @@ public class WordCountTest {
 
   /** Example test that tests a PTransform by using an in-memory input and inspecting the output. */
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testCountWords() throws Exception {
     PCollection<String> input = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of()));
 

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/examples/java/src/test/java/org/apache/beam/examples/complete/TfIdfTest.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/complete/TfIdfTest.java b/examples/java/src/test/java/org/apache/beam/examples/complete/TfIdfTest.java
index 0382532..d263643 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/complete/TfIdfTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/complete/TfIdfTest.java
@@ -21,8 +21,8 @@ import java.net.URI;
 import java.util.Arrays;
 import org.apache.beam.sdk.coders.StringDelegateCoder;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.Distinct;
 import org.apache.beam.sdk.transforms.Keys;
@@ -45,7 +45,7 @@ public class TfIdfTest {
 
   /** Test that the example runs. */
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testTfIdf() throws Exception {
 
     pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class));

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/examples/java/src/test/java/org/apache/beam/examples/complete/TopWikipediaSessionsTest.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/complete/TopWikipediaSessionsTest.java b/examples/java/src/test/java/org/apache/beam/examples/complete/TopWikipediaSessionsTest.java
index 2c50cf2..5415281 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/complete/TopWikipediaSessionsTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/complete/TopWikipediaSessionsTest.java
@@ -20,8 +20,8 @@ package org.apache.beam.examples.complete;
 import com.google.api.services.bigquery.model.TableRow;
 import java.util.Arrays;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.values.PCollection;
 import org.junit.Rule;
@@ -38,7 +38,7 @@ public class TopWikipediaSessionsTest {
   public TestPipeline p = TestPipeline.create();
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testComputeTopUsers() {
 
     PCollection<String> output =

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/examples/java/src/test/java/org/apache/beam/examples/cookbook/DistinctExampleTest.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/DistinctExampleTest.java b/examples/java/src/test/java/org/apache/beam/examples/cookbook/DistinctExampleTest.java
index 6fadbe5..c9dab80 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/cookbook/DistinctExampleTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/cookbook/DistinctExampleTest.java
@@ -21,8 +21,8 @@ import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.Distinct;
 import org.apache.beam.sdk.values.PCollection;
@@ -40,7 +40,7 @@ public class DistinctExampleTest {
   public TestPipeline p = TestPipeline.create();
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testDistinct() {
     List<String> strings = Arrays.asList(
         "k1",
@@ -64,7 +64,7 @@ public class DistinctExampleTest {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testDistinctEmpty() {
     List<String> strings = Arrays.asList();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/examples/java/src/test/java/org/apache/beam/examples/cookbook/JoinExamplesTest.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/JoinExamplesTest.java b/examples/java/src/test/java/org/apache/beam/examples/cookbook/JoinExamplesTest.java
index 43e6d01..b2fcd73 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/cookbook/JoinExamplesTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/cookbook/JoinExamplesTest.java
@@ -23,8 +23,8 @@ import java.util.List;
 import org.apache.beam.examples.cookbook.JoinExamples.ExtractCountryInfoFn;
 import org.apache.beam.examples.cookbook.JoinExamples.ExtractEventDataFn;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFnTester;
 import org.apache.beam.sdk.values.KV;
@@ -103,7 +103,7 @@ public class JoinExamplesTest {
 
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testJoin() throws java.lang.Exception {
     PCollection<TableRow> input1 = p.apply("CreateEvent", Create.of(EVENT_ARRAY));
     PCollection<TableRow> input2 = p.apply("CreateCC", Create.of(CC_ARRAY));

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java b/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java
index ec0b9d4..706cfb9 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java
@@ -27,8 +27,8 @@ import java.util.Map;
 import org.apache.beam.examples.cookbook.TriggerExample.ExtractFlowInfo;
 import org.apache.beam.examples.cookbook.TriggerExample.TotalFlow;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFnTester;
@@ -111,7 +111,7 @@ public class TriggerExampleTest {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testTotalFlow () {
     PCollection<KV<String, Integer>> flow = pipeline
         .apply(Create.timestamped(TIME_STAMPED_INPUT))

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/examples/java8/pom.xml
----------------------------------------------------------------------
diff --git a/examples/java8/pom.xml b/examples/java8/pom.xml
index f5a0db7..5ea6ca5 100644
--- a/examples/java8/pom.xml
+++ b/examples/java8/pom.xml
@@ -257,7 +257,7 @@
 
     <!--
       For testing the example itself, use the direct runner. This is separate from
-      the use of RunnableOnService tests for testing a particular runner.
+      the use of ValidatesRunner tests for testing a particular runner.
     -->
     <dependency>
       <groupId>org.apache.beam</groupId>

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/examples/java8/src/test/java/org/apache/beam/examples/complete/game/GameStatsTest.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/GameStatsTest.java b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/GameStatsTest.java
index 36cf9bc..44481c5 100644
--- a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/GameStatsTest.java
+++ b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/GameStatsTest.java
@@ -23,8 +23,8 @@ import java.util.List;
 import org.apache.beam.examples.complete.game.GameStats.CalculateSpammyUsers;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -63,7 +63,7 @@ public class GameStatsTest implements Serializable {
 
   /** Test the calculation of 'spammy users'. */
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testCalculateSpammyUsers() throws Exception {
     PCollection<KV<String, Integer>> input = p.apply(Create.of(USER_SCORES));
     PCollection<KV<String, Integer>> output = input.apply(new CalculateSpammyUsers());

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java
index 5fc94a5..40bbfdb 100644
--- a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java
+++ b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java
@@ -25,8 +25,8 @@ import org.apache.beam.examples.complete.game.UserScore.ParseEventFn;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.Filter;
 import org.apache.beam.sdk.transforms.MapElements;
@@ -86,7 +86,7 @@ public class HourlyTeamScoreTest implements Serializable {
 
   /** Test the filtering. */
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testUserScoresFilter() throws Exception {
 
     final Instant startMinTimestamp = new Instant(1447965680000L);

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java
index 3b77b26..f0c28ab 100644
--- a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java
+++ b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java
@@ -25,8 +25,8 @@ import org.apache.beam.examples.complete.game.UserScore.GameActionInfo;
 import org.apache.beam.examples.complete.game.UserScore.ParseEventFn;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFnTester;
 import org.apache.beam.sdk.transforms.MapElements;
@@ -99,7 +99,7 @@ public class UserScoreTest implements Serializable {
 
   /** Tests ExtractAndSumScore("user"). */
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testUserScoreSums() throws Exception {
 
     PCollection<String> input = p.apply(Create.of(GAME_EVENTS).withCoder(StringUtf8Coder.of()));
@@ -117,7 +117,7 @@ public class UserScoreTest implements Serializable {
 
   /** Tests ExtractAndSumScore("team"). */
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testTeamScoreSums() throws Exception {
 
     PCollection<String> input = p.apply(Create.of(GAME_EVENTS).withCoder(StringUtf8Coder.of()));
@@ -135,7 +135,7 @@ public class UserScoreTest implements Serializable {
 
   /** Test that bad input data is dropped appropriately. */
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testUserScoresBadInput() throws Exception {
 
     PCollection<String> input = p.apply(Create.of(GAME_EVENTS2).withCoder(StringUtf8Coder.of()));

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/runners/apex/pom.xml
----------------------------------------------------------------------
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index 01dea22..3f01698 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -130,7 +130,7 @@
       <scope>test</scope>
     </dependency>
 
-    <!-- Depend on test jar to scan for RunnableOnService tests -->
+    <!-- Depend on test jar to scan for ValidatesRunner tests -->
     <dependency>
       <groupId>org.apache.beam</groupId>
       <artifactId>beam-sdks-java-core</artifactId>
@@ -206,7 +206,7 @@
               <goal>test</goal>
             </goals>
             <configuration>
-              <groups>org.apache.beam.sdk.testing.RunnableOnService</groups>
+              <groups>org.apache.beam.sdk.testing.ValidatesRunner</groups>
               <excludedGroups>
                 org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders,
                 org.apache.beam.sdk.testing.UsesStatefulParDo,

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/runners/flink/runner/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml
index f2c2d01..e013adb 100644
--- a/runners/flink/runner/pom.xml
+++ b/runners/flink/runner/pom.xml
@@ -52,7 +52,7 @@
                   <goal>test</goal>
                 </goals>
                 <configuration>
-                  <groups>org.apache.beam.sdk.testing.RunnableOnService</groups>
+                  <groups>org.apache.beam.sdk.testing.ValidatesRunner</groups>
                   <excludedGroups>
                     org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders,
                     org.apache.beam.sdk.testing.UsesSplittableParDo,
@@ -83,7 +83,7 @@
                   <goal>test</goal>
                 </goals>
                 <configuration>
-                  <groups>org.apache.beam.sdk.testing.RunnableOnService</groups>
+                  <groups>org.apache.beam.sdk.testing.ValidatesRunner</groups>
                   <excludedGroups>
                     org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders,
                     org.apache.beam.sdk.testing.UsesSetState,
@@ -211,7 +211,7 @@
       <scope>test</scope>
     </dependency>
 
-    <!-- Depend on test jar to scan for RunnableOnService tests -->
+    <!-- Depend on test jar to scan for ValidatesRunner tests -->
     <dependency>
       <groupId>org.apache.beam</groupId>
       <artifactId>beam-sdks-java-core</artifactId>

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/package-info.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/package-info.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/package-info.java
index 98c963a..9683df0 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/package-info.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/package-info.java
@@ -18,7 +18,7 @@
 
 /**
  * Provides utilities for integration testing and {@link
- * org.apache.beam.sdk.testing.RunnableOnService} tests of the Google Cloud Dataflow
+ * org.apache.beam.sdk.testing.ValidatesRunner} tests of the Google Cloud Dataflow
  * runner.
  */
 package org.apache.beam.runners.dataflow.testing;

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/runners/pom.xml
----------------------------------------------------------------------
diff --git a/runners/pom.xml b/runners/pom.xml
index fb090ad..9bf170d 100644
--- a/runners/pom.xml
+++ b/runners/pom.xml
@@ -77,7 +77,7 @@
                     <goal>test</goal>
                   </goals>
                   <configuration>
-                    <groups>org.apache.beam.sdk.testing.RunnableOnService</groups>
+                    <groups>org.apache.beam.sdk.testing.ValidatesRunner</groups>
                     <parallel>all</parallel>
                     <threadCount>4</threadCount>
                     <dependenciesToScan>

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 6958b28..a470fb5 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -55,7 +55,7 @@
     </profile>
 
     <profile>
-      <!-- This profile adds execution of RunnableOnService integration tests
+      <!-- This profile adds execution of ValidatesRunner integration tests
            against a local Spark endpoint. -->
       <id>local-runnable-on-service-tests</id>
       <activation><activeByDefault>false</activeByDefault></activation>
@@ -73,7 +73,7 @@
                 </goals>
                 <configuration>
                   <groups>
-                    org.apache.beam.sdk.testing.RunnableOnService,
+                    org.apache.beam.sdk.testing.ValidatesRunner,
                     org.apache.beam.runners.spark.UsesCheckpointRecovery
                   </groups>
                   <excludedGroups>
@@ -303,7 +303,7 @@
       <scope>test</scope>
     </dependency>
 
-    <!-- Depend on test jar to scan for RunnableOnService tests -->
+    <!-- Depend on test jar to scan for ValidatesRunner tests -->
     <dependency>
       <groupId>org.apache.beam</groupId>
       <artifactId>beam-sdks-java-core</artifactId>

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/RunnableOnService.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/RunnableOnService.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/RunnableOnService.java
index 2794d1a..dd8fd13 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/RunnableOnService.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/RunnableOnService.java
@@ -18,21 +18,7 @@
 package org.apache.beam.sdk.testing;
 
 /**
- * Category tag for validation tests which utilize {@link TestPipeline} for execution and
- * {@link PAssert} for validation. Example usage:
- * <pre><code>
- *     {@literal @}Test
- *     {@literal @}Category(RunnableOnService.class)
- *     public class ParDoTest {
- *       {@literal @}Rule
- *       public final transient TestPipeline p = TestPipeline.create();
- *
- *       public void testParDo() {
- *         p.apply(...);
- *         PAssert.that(p);
- *         p.run();
- *       }
- *     }
- * </code></pre>
+ * @deprecated use {@link ValidatesRunner}
  */
+@Deprecated
 public interface RunnableOnService extends NeedsRunner {}

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index 6c5244d..485dd39 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -59,7 +59,7 @@ import org.junit.runners.model.Statement;
  * locally or against a remote pipeline runner.
  *
  * <p>It is recommended to tag hand-selected tests for this purpose using the {@link
- * RunnableOnService} {@link Category} annotation, as each test run against a pipeline runner will
+ * ValidatesRunner} {@link Category} annotation, as each test run against a pipeline runner will
  * utilize resources of that pipeline runner.
  *
  * <p>In order to run tests on a pipeline runner, the following conditions must be met:
@@ -274,7 +274,7 @@ public class TestPipeline extends Pipeline implements TestRule {
               "The test was annotated with a [@%s] / [@%s] while the runner "
                   + "was set to [%s]. Please re-check your configuration.",
               NeedsRunner.class.getSimpleName(),
-              RunnableOnService.class.getSimpleName(),
+              ValidatesRunner.class.getSimpleName(),
               CrashingRunner.class.getSimpleName());
 
           enableAbandonedNodeEnforcement(annotatedWithNeedsRunner || !crashingRunner);
@@ -324,7 +324,7 @@ public class TestPipeline extends Pipeline implements TestRule {
    * <li>Addition of PTransforms after the pipeline has already run.
    * </ul>
    * Abandoned node detection is automatically enabled when a real pipeline runner (i.e. not a
-   * {@link CrashingRunner}) and/or a {@link NeedsRunner} or a {@link RunnableOnService} annotation
+   * {@link CrashingRunner}) and/or a {@link NeedsRunner} or a {@link ValidatesRunner} annotation
    * are detected.
    */
   public TestPipeline enableAbandonedNodeEnforcement(final boolean enable) {

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/ValidatesRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/ValidatesRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/ValidatesRunner.java
new file mode 100644
index 0000000..d33b3a6
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/ValidatesRunner.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+/**
+ * Category tag for tests which validate that a Beam runner is correctly implemented.
+ */
+public interface ValidatesRunner extends RunnableOnService {}

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
index 7e5cc35..efe8db4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
@@ -48,8 +48,8 @@ import org.apache.beam.sdk.testing.CrashingRunner;
 import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.Flatten;
@@ -143,7 +143,7 @@ public class PipelineTest {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testMultipleApply() {
     PTransform<PCollection<? extends String>, PCollection<String>> myTransform =
         addSuffix("+");
@@ -217,7 +217,7 @@ public class PipelineTest {
    * Tests that Pipeline supports a pass-through identity function.
    */
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testIdentityTransform() throws Exception {
 
     PCollection<Integer> output = pipeline
@@ -240,7 +240,7 @@ public class PipelineTest {
    * Tests that Pipeline supports pulling an element out of a tuple as a transform.
    */
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testTupleProjectionTransform() throws Exception {
     PCollection<Integer> input = pipeline
         .apply(Create.<Integer>of(1, 2, 3, 4));
@@ -273,7 +273,7 @@ public class PipelineTest {
    * Tests that Pipeline supports putting an element into a tuple as a transform.
    */
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testTupleInjectionTransform() throws Exception {
     PCollection<Integer> input = pipeline
         .apply(Create.<Integer>of(1, 2, 3, 4));

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index c4b5526..19f5ffa 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -51,9 +51,9 @@ import org.apache.beam.sdk.io.AvroIO.Write.Bound;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
@@ -411,7 +411,7 @@ public class AvroIOTest {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testPrimitiveReadDisplayData() {
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
 
@@ -447,8 +447,8 @@ public class AvroIOTest {
   }
 
   @Test
-  @Category(RunnableOnService.class)
-  @Ignore("[BEAM-436] DirectRunner RunnableOnService tempLocation configuration insufficient")
+  @Category(ValidatesRunner.class)
+  @Ignore("[BEAM-436] DirectRunner ValidatesRunner tempLocation configuration insufficient")
   public void testPrimitiveWriteDisplayData() throws IOException {
     PipelineOptions options = DisplayDataEvaluator.getDefaultOptions();
     String tempRoot = options.as(TestPipelineOptions.class).getTempRoot();

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSourceTest.java
index d49873e..4f6af12 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSourceTest.java
@@ -28,8 +28,8 @@ import java.util.Collections;
 import java.util.List;
 import org.apache.beam.sdk.runners.dataflow.TestCountingSource;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.KV;
@@ -50,19 +50,19 @@ public class BoundedReadFromUnboundedSourceTest implements Serializable{
   public transient TestPipeline p = TestPipeline.create();
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testNoDedup() throws Exception {
     test(false, false);
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testDedup() throws Exception {
     test(true, false);
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testTimeBound() throws Exception {
     test(false, true);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
index f23ee76..e7a6cfd 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
@@ -24,8 +24,8 @@ import static org.junit.Assert.assertThat;
 import org.apache.beam.sdk.io.CountingInput.UnboundedCountingInput;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Distinct;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -70,7 +70,7 @@ public class CountingInputTest {
   public TestPipeline p = TestPipeline.create();
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testBoundedInput() {
     long numElements = 1000;
     PCollection<Long> input = p.apply(CountingInput.upTo(numElements));
@@ -80,7 +80,7 @@ public class CountingInputTest {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testEmptyBoundedInput() {
     PCollection<Long> input = p.apply(CountingInput.upTo(0));
 
@@ -89,7 +89,7 @@ public class CountingInputTest {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testEmptyBoundedInputSubrange() {
     PCollection<Long> input = p.apply(CountingInput.forSubrange(42, 42));
 
@@ -99,7 +99,7 @@ public class CountingInputTest {
 
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testBoundedInputSubrange() {
     long start = 10;
     long end = 1000;
@@ -125,7 +125,7 @@ public class CountingInputTest {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testUnboundedInput() {
     long numElements = 1000;
 
@@ -164,7 +164,7 @@ public class CountingInputTest {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testUnboundedInputTimestamps() {
     long numElements = 1000;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
index dfd0949..0e3b07e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
@@ -32,8 +32,8 @@ import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Distinct;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -83,7 +83,7 @@ public class CountingSourceTest {
   public TestPipeline p = TestPipeline.create();
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testBoundedSource() {
     long numElements = 1000;
     PCollection<Long> input = p.apply(Read.from(CountingSource.upTo(numElements)));
@@ -93,7 +93,7 @@ public class CountingSourceTest {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testEmptyBoundedSource() {
     PCollection<Long> input = p.apply(Read.from(CountingSource.upTo(0)));
 
@@ -102,7 +102,7 @@ public class CountingSourceTest {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testBoundedSourceSplits() throws Exception {
     long numElements = 1000;
     long numSplits = 10;
@@ -155,7 +155,7 @@ public class CountingSourceTest {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testUnboundedSource() {
     long numElements = 1000;
 
@@ -174,7 +174,7 @@ public class CountingSourceTest {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testUnboundedSourceTimestamps() {
     long numElements = 1000;
 
@@ -227,7 +227,7 @@ public class CountingSourceTest {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testUnboundedSourceSplits() throws Exception {
     long numElements = 1000;
     int numSplits = 10;

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
index 5ec08b4..1538db2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
@@ -27,8 +27,8 @@ import static org.junit.Assert.assertThat;
 import java.util.Set;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.UsesUnboundedPCollections;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
 import org.joda.time.Duration;
@@ -150,7 +150,7 @@ public class PubsubIOTest {
   }
 
   @Test
-  @Category({RunnableOnService.class, UsesUnboundedPCollections.class})
+  @Category({ValidatesRunner.class, UsesUnboundedPCollections.class})
   public void testPrimitiveReadDisplayData() {
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
     Set<DisplayData> displayData;
@@ -185,7 +185,7 @@ public class PubsubIOTest {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testPrimitiveWriteDisplayData() {
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
     PubsubIO.Write<?> write = PubsubIO.<String>write().topic("projects/project/topics/topic");

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java
index 362139b..416a086 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java
@@ -32,7 +32,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.StreamingOptions;
-import org.apache.beam.sdk.testing.RunnableOnService;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
 import org.joda.time.Duration;
@@ -106,13 +106,13 @@ public class ReadTest implements Serializable{
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testBoundedPrimitiveDisplayData() {
     testPrimitiveDisplayData(/* isStreaming: */ false);
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testStreamingPrimitiveDisplayData() {
     testPrimitiveDisplayData(/* isStreaming: */ true);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 713cb71..2e36273 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -88,10 +88,10 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.SourceTestUtils;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
@@ -118,7 +118,7 @@ import org.mockito.stubbing.Answer;
 /**
  * Tests for TextIO Read and Write transforms.
  */
-// TODO: Change the tests to use RunnableOnService instead of NeedsRunner
+// TODO: Change the tests to use ValidatesRunner instead of NeedsRunner
 @RunWith(JUnit4.class)
 @SuppressWarnings("unchecked")
 public class TextIOTest {
@@ -281,7 +281,7 @@ public class TextIOTest {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testPrimitiveReadDisplayData() {
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
 
@@ -546,8 +546,8 @@ public class TextIOTest {
   }
 
   @Test
-  @Category(RunnableOnService.class)
-  @Ignore("[BEAM-436] DirectRunner RunnableOnService tempLocation configuration insufficient")
+  @Category(ValidatesRunner.class)
+  @Ignore("[BEAM-436] DirectRunner ValidatesRunner tempLocation configuration insufficient")
   public void testPrimitiveWriteDisplayData() throws IOException {
     PipelineOptions options = DisplayDataEvaluator.getDefaultOptions();
     String tempRoot = options.as(TestPipelineOptions.class).getTempRoot();

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
index fc9e18b..f5506d7 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
@@ -29,10 +29,10 @@ import static org.junit.Assert.assertThat;
 
 import java.io.Serializable;
 import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.UsesAttemptedMetrics;
 import org.apache.beam.sdk.testing.UsesCommittedMetrics;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -117,7 +117,7 @@ public class MetricsTest implements Serializable {
     assertThat(cell.getCumulative(), CoreMatchers.equalTo(42L));
   }
 
-  @Category({RunnableOnService.class, UsesCommittedMetrics.class})
+  @Category({ValidatesRunner.class, UsesCommittedMetrics.class})
   @Test
   public void committedMetricsReportToQuery() {
     PipelineResult result = runPipelineWithMetrics();
@@ -143,7 +143,7 @@ public class MetricsTest implements Serializable {
   }
 
 
-  @Category({RunnableOnService.class, UsesAttemptedMetrics.class})
+  @Category({ValidatesRunner.class, UsesAttemptedMetrics.class})
   @Test
   public void attemptedMetricsReportToQuery() {
     PipelineResult result = runPipelineWithMetrics();

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
index dab457a..3528797 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
@@ -123,7 +123,7 @@ public class PAssertTest implements Serializable {
    * serializable.
    */
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testContainsInAnyOrderNotSerializable() throws Exception {
     PCollection<NotSerializableObject> pcollection = pipeline
         .apply(Create.of(
@@ -144,7 +144,7 @@ public class PAssertTest implements Serializable {
    * though.
    */
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testSerializablePredicate() throws Exception {
     PCollection<NotSerializableObject> pcollection = pipeline
         .apply(Create.of(
@@ -169,7 +169,7 @@ public class PAssertTest implements Serializable {
    * though.
    */
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testWindowedSerializablePredicate() throws Exception {
     PCollection<NotSerializableObject> pcollection = pipeline
         .apply(Create.timestamped(
@@ -260,7 +260,7 @@ public class PAssertTest implements Serializable {
    * Basic test for {@code isEqualTo}.
    */
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testIsEqualTo() throws Exception {
     PCollection<Integer> pcollection = pipeline.apply(Create.of(43));
     PAssert.thatSingleton(pcollection).isEqualTo(43);
@@ -271,7 +271,7 @@ public class PAssertTest implements Serializable {
    * Basic test for {@code isEqualTo}.
    */
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testWindowedIsEqualTo() throws Exception {
     PCollection<Integer> pcollection =
         pipeline.apply(Create.timestamped(TimestampedValue.of(43, new Instant(250L)),
@@ -290,7 +290,7 @@ public class PAssertTest implements Serializable {
    * Basic test for {@code notEqualTo}.
    */
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testNotEqualTo() throws Exception {
     PCollection<Integer> pcollection = pipeline.apply(Create.of(43));
     PAssert.thatSingleton(pcollection).notEqualTo(42);
@@ -301,7 +301,7 @@ public class PAssertTest implements Serializable {
    * Test that we throw an error for false assertion on singleton.
    */
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testPAssertEqualsSingletonFalse() throws Exception {
     PCollection<Integer> pcollection = pipeline.apply(Create.of(42));
     PAssert.thatSingleton("The value was not equal to 44", pcollection).isEqualTo(44);
@@ -319,7 +319,7 @@ public class PAssertTest implements Serializable {
    * Test that we throw an error for false assertion on singleton.
    */
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testPAssertEqualsSingletonFalseDefaultReasonString() throws Exception {
     PCollection<Integer> pcollection = pipeline.apply(Create.of(42));
     PAssert.thatSingleton(pcollection).isEqualTo(44);
@@ -337,7 +337,7 @@ public class PAssertTest implements Serializable {
    * Tests that {@code containsInAnyOrder} is actually order-independent.
    */
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testContainsInAnyOrder() throws Exception {
     PCollection<Integer> pcollection = pipeline.apply(Create.of(1, 2, 3, 4));
     PAssert.that(pcollection).containsInAnyOrder(2, 1, 4, 3);
@@ -348,7 +348,7 @@ public class PAssertTest implements Serializable {
    * Tests that {@code containsInAnyOrder} is actually order-independent.
    */
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testGlobalWindowContainsInAnyOrder() throws Exception {
     PCollection<Integer> pcollection = pipeline.apply(Create.of(1, 2, 3, 4));
     PAssert.that(pcollection).inWindow(GlobalWindow.INSTANCE).containsInAnyOrder(2, 1, 4, 3);
@@ -359,7 +359,7 @@ public class PAssertTest implements Serializable {
    * Tests that windowed {@code containsInAnyOrder} is actually order-independent.
    */
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testWindowedContainsInAnyOrder() throws Exception {
     PCollection<Integer> pcollection =
         pipeline.apply(Create.timestamped(TimestampedValue.of(1, new Instant(100L)),
@@ -388,7 +388,7 @@ public class PAssertTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testEmpty() {
     PCollection<Long> vals =
         pipeline.apply(Create.empty(VarLongCoder.of()));
@@ -402,7 +402,7 @@ public class PAssertTest implements Serializable {
    * Tests that {@code containsInAnyOrder} fails when and how it should.
    */
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testContainsInAnyOrderFalse() throws Exception {
     PCollection<Integer> pcollection = pipeline
         .apply(Create.of(1, 2, 3, 4));
@@ -423,7 +423,7 @@ public class PAssertTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testEmptyFalse() throws Exception {
     PCollection<Long> vals = pipeline.apply(CountingInput.upTo(5L));
     PAssert.that("Vals should have been empty", vals).empty();
@@ -437,7 +437,7 @@ public class PAssertTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testEmptyFalseDefaultReasonString() throws Exception {
     PCollection<Long> vals = pipeline.apply(CountingInput.upTo(5L));
     PAssert.that(vals).empty();
@@ -452,7 +452,7 @@ public class PAssertTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testAssertionSiteIsCapturedWithMessage() throws Exception {
     PCollection<Long> vals = pipeline.apply(CountingInput.upTo(5L));
     assertThatCollectionIsEmptyWithMessage(vals);
@@ -471,7 +471,7 @@ public class PAssertTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testAssertionSiteIsCapturedWithoutMessage() throws Exception {
     PCollection<Long> vals = pipeline.apply(CountingInput.upTo(5L));
     assertThatCollectionIsEmptyWithoutMessage(vals);

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
index 1a7d375..084d303 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
@@ -271,21 +271,21 @@ public class TestPipelineTest implements Serializable {
       @Rule
       public final transient RuleChain chain = RuleChain.outerRule(exception).around(pipeline);
 
-      @Category(RunnableOnService.class)
+      @Category(ValidatesRunner.class)
       @Test
       public void testNormalFlow() throws Exception {
         addTransform(pCollection(pipeline));
         pipeline.run();
       }
 
-      @Category(RunnableOnService.class)
+      @Category(ValidatesRunner.class)
       @Test
       public void testMissingRun() throws Exception {
         exception.expect(TestPipeline.PipelineRunMissingException.class);
         addTransform(pCollection(pipeline));
       }
 
-      @Category(RunnableOnService.class)
+      @Category(ValidatesRunner.class)
       @Test
       public void testMissingRunWithDisabledEnforcement() throws Exception {
         pipeline.enableAbandonedNodeEnforcement(false);
@@ -294,7 +294,7 @@ public class TestPipelineTest implements Serializable {
         // disable abandoned node detection
       }
 
-      @Category(RunnableOnService.class)
+      @Category(ValidatesRunner.class)
       @Test
       public void testMissingRunAutoAdd() throws Exception {
         pipeline.enableAutoRunIfMissing(true);
@@ -303,9 +303,9 @@ public class TestPipelineTest implements Serializable {
         // have the pipeline.run() auto-added
       }
 
-      @Category(RunnableOnService.class)
+      @Category(ValidatesRunner.class)
       @Test
-      public void testDanglingPTransformRunnableOnService() throws Exception {
+      public void testDanglingPTransformValidatesRunner() throws Exception {
         final PCollection<String> pCollection = pCollection(pipeline);
         PAssert.that(pCollection).containsInAnyOrder(WHATEVER);
         pipeline.run().waitUntilFinish();
@@ -329,9 +329,9 @@ public class TestPipelineTest implements Serializable {
         addTransform(pCollection);
       }
 
-      @Category(RunnableOnService.class)
+      @Category(ValidatesRunner.class)
       @Test
-      public void testDanglingPAssertRunnableOnService() throws Exception {
+      public void testDanglingPAssertValidatesRunner() throws Exception {
         final PCollection<String> pCollection = pCollection(pipeline);
         PAssert.that(pCollection).containsInAnyOrder(WHATEVER);
         pipeline.run().waitUntilFinish();
@@ -344,11 +344,11 @@ public class TestPipelineTest implements Serializable {
 
       /**
        * Tests that a {@link TestPipeline} rule behaves as expected when there is no pipeline usage
-       * within a test that has a {@link RunnableOnService} annotation.
+       * within a test that has a {@link ValidatesRunner} annotation.
        */
-      @Category(RunnableOnService.class)
+      @Category(ValidatesRunner.class)
       @Test
-      public void testNoTestPipelineUsedRunnableOnService() {}
+      public void testNoTestPipelineUsedValidatesRunner() {}
 
       /**
        * Tests that a {@link TestPipeline} rule behaves as expected when there is no pipeline usage
@@ -381,7 +381,7 @@ public class TestPipelineTest implements Serializable {
         addTransform(pCollection(pipeline));
 
         // pipeline.run() is missing, BUT:
-        // 1. Neither @RunnableOnService nor @NeedsRunner are present, AND
+        // 1. Neither @ValidatesRunner nor @NeedsRunner are present, AND
         // 2. The runner class is CrashingRunner.class
         // (1) + (2) => we assume this pipeline was never meant to be run, so no exception is
         // thrown on account of the missing run / dangling nodes.

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
index 72c33fb..51880e1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
@@ -34,8 +34,8 @@ import java.util.List;
 import org.apache.beam.sdk.TestUtils;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -300,7 +300,7 @@ public class ApproximateUniqueTest implements Serializable {
     }
 
     @Test
-    @Category(RunnableOnService.class)
+    @Category(ValidatesRunner.class)
     public void testApproximateUniqueWithSmallInput() {
       final PCollection<Integer> input = p.apply(
           Create.of(Arrays.asList(1, 2, 3, 3)));

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
index 4d35e53..b107f3d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
@@ -36,8 +36,8 @@ import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.coders.StandardCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Combine.BinaryCombineFn;
 import org.apache.beam.sdk.transforms.CombineFns.CoCombineResult;
 import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
@@ -119,7 +119,7 @@ public class  CombineFnsTest {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testComposedCombine() {
     p.getCoderRegistry().registerCoder(UserString.class, UserStringCoder.of());
 
@@ -173,7 +173,7 @@ public class  CombineFnsTest {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testComposedCombineWithContext() {
     p.getCoderRegistry().registerCoder(UserString.class, UserStringCoder.of());
 
@@ -234,7 +234,7 @@ public class  CombineFnsTest {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testComposedCombineNullValues() {
     p.getCoderRegistry().registerCoder(UserString.class, NullableCoder.of(UserStringCoder.of()));
     p.getCoderRegistry().registerCoder(String.class, NullableCoder.of(StringUtf8Coder.of()));

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index 6c62d0b..e51ee16 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -56,8 +56,8 @@ import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
 import org.apache.beam.sdk.transforms.CombineWithContext.Context;
 import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
@@ -165,14 +165,14 @@ public class CombineTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   @SuppressWarnings({"rawtypes", "unchecked"})
   public void testSimpleCombine() {
     runTestSimpleCombine(TABLE, 20, Arrays.asList(KV.of("a", "114a"), KV.of("b", "113b")));
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   @SuppressWarnings({"rawtypes", "unchecked"})
   public void testSimpleCombineWithContext() {
     runTestSimpleCombineWithContext(TABLE, 20,
@@ -181,14 +181,14 @@ public class CombineTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testSimpleCombineWithContextEmpty() {
     runTestSimpleCombineWithContext(
         EMPTY_TABLE, 0, Collections.<KV<String, String>>emptyList(), new String[] {});
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testSimpleCombineEmpty() {
     runTestSimpleCombine(EMPTY_TABLE, 0, Collections.<KV<String, String>>emptyList());
   }
@@ -214,7 +214,7 @@ public class CombineTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testBasicCombine() {
     runTestBasicCombine(TABLE, ImmutableSet.of(1, 13, 4), Arrays.asList(
         KV.of("a", (Set<Integer>) ImmutableSet.of(1, 4)),
@@ -222,7 +222,7 @@ public class CombineTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testBasicCombineEmpty() {
     runTestBasicCombine(
         EMPTY_TABLE, ImmutableSet.<Integer>of(), Collections.<KV<String, Set<Integer>>>emptyList());
@@ -248,7 +248,7 @@ public class CombineTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testFixedWindowsCombine() {
     PCollection<KV<String, Integer>> input =
         pipeline.apply(Create.timestamped(TABLE, Arrays.asList(0L, 1L, 6L, 7L, 8L))
@@ -272,7 +272,7 @@ public class CombineTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testFixedWindowsCombineWithContext() {
     PCollection<KV<String, Integer>> perKeyInput =
         pipeline.apply(Create.timestamped(TABLE, Arrays.asList(0L, 1L, 6L, 7L, 8L))
@@ -307,7 +307,7 @@ public class CombineTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testSlidingWindowsCombineWithContext() {
     PCollection<KV<String, Integer>> perKeyInput =
         pipeline.apply(Create.timestamped(TABLE, Arrays.asList(2L, 3L, 8L, 9L, 10L))
@@ -354,7 +354,7 @@ public class CombineTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testGlobalCombineWithDefaultsAndTriggers() {
     PCollection<Integer> input = pipeline.apply(Create.of(1, 1));
 
@@ -380,7 +380,7 @@ public class CombineTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testSessionsCombine() {
     PCollection<KV<String, Integer>> input =
         pipeline.apply(Create.timestamped(TABLE, Arrays.asList(0L, 4L, 7L, 10L, 16L))
@@ -403,7 +403,7 @@ public class CombineTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testSessionsCombineWithContext() {
     PCollection<KV<String, Integer>> perKeyInput =
         pipeline.apply(Create.timestamped(TABLE, Arrays.asList(0L, 4L, 7L, 10L, 16L))
@@ -443,7 +443,7 @@ public class CombineTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testWindowedCombineEmpty() {
     PCollection<Double> mean = pipeline
         .apply(Create.empty(BigEndianIntegerCoder.of()))
@@ -456,13 +456,13 @@ public class CombineTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testAccumulatingCombine() {
     runTestAccumulatingCombine(TABLE, 4.0, Arrays.asList(KV.of("a", 2.0), KV.of("b", 7.0)));
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testAccumulatingCombineEmpty() {
     runTestAccumulatingCombine(EMPTY_TABLE, 0.0, Collections.<KV<String, Double>>emptyList());
   }
@@ -498,7 +498,7 @@ public class CombineTest implements Serializable {
       };
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testHotKeyCombining() {
     PCollection<KV<String, Integer>> input = copy(createInput(pipeline, TABLE), 10);
 
@@ -532,7 +532,7 @@ public class CombineTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testHotKeyCombiningWithAccumulationMode() {
     PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3, 4, 5));
 
@@ -610,7 +610,7 @@ public class CombineTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testCombineGloballyAsSingletonView() {
     final PCollectionView<Integer> view = pipeline
         .apply("CreateEmptySideInput", Create.empty(BigEndianIntegerCoder.of()))
@@ -630,7 +630,7 @@ public class CombineTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testWindowedCombineGloballyAsSingletonView() {
     FixedWindows windowFn = FixedWindows.of(Duration.standardMinutes(1));
     final PCollectionView<Integer> view =
@@ -718,7 +718,7 @@ public class CombineTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testCombinePerKeyPrimitiveDisplayData() {
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
 
@@ -734,7 +734,7 @@ public class CombineTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testCombinePerKeyWithHotKeyFanoutPrimitiveDisplayData() {
     int hotKeyFanout = 2;
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CountTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CountTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CountTest.java
index dca0542..80aba67 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CountTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CountTest.java
@@ -24,8 +24,8 @@ import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.junit.Rule;
@@ -49,7 +49,7 @@ public class CountTest {
   public TestPipeline p = TestPipeline.create();
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   @SuppressWarnings("unchecked")
   public void testCountPerElementBasic() {
     PCollection<String> input = p.apply(Create.of(WORDS));
@@ -69,7 +69,7 @@ public class CountTest {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   @SuppressWarnings("unchecked")
   public void testCountPerElementEmpty() {
     PCollection<String> input = p.apply(Create.of(NO_LINES).withCoder(StringUtf8Coder.of()));
@@ -82,7 +82,7 @@ public class CountTest {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testCountGloballyBasic() {
     PCollection<String> input = p.apply(Create.of(WORDS));
 
@@ -95,7 +95,7 @@ public class CountTest {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testCountGloballyEmpty() {
     PCollection<String> input = p.apply(Create.of(NO_LINES).withCoder(StringUtf8Coder.of()));
 

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
index d21e502..09f1f96 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
@@ -50,9 +50,9 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.SourceTestUtils;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create.Values.CreateSource;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.values.KV;
@@ -80,7 +80,7 @@ public class CreateTest {
 
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testCreate() {
     PCollection<String> output =
         p.apply(Create.of(LINES));
@@ -91,7 +91,7 @@ public class CreateTest {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testCreateEmpty() {
     PCollection<String> output =
         p.apply(Create.empty(StringUtf8Coder.of()));
@@ -155,7 +155,7 @@ public class CreateTest {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testCreateWithNullsAndValues() throws Exception {
     PCollection<String> output =
         p.apply(Create.of(null, "test1", null, "test2", null)
@@ -222,7 +222,7 @@ public class CreateTest {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testCreateWithUnserializableElements() throws Exception {
     List<UnserializableRecord> elements =
         ImmutableList.of(
@@ -248,7 +248,7 @@ public class CreateTest {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testCreateTimestamped() {
     List<TimestampedValue<String>> data = Arrays.asList(
         TimestampedValue.of("a", new Instant(1L)),
@@ -265,7 +265,7 @@ public class CreateTest {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testCreateTimestampedEmpty() {
     PCollection<String> output = p
         .apply(Create.timestamped(new ArrayList<TimestampedValue<String>>())
@@ -330,7 +330,7 @@ public class CreateTest {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testCreateWithVoidType() throws Exception {
     PCollection<Void> output = p.apply(Create.of((Void) null, (Void) null));
     PAssert.that(output).containsInAnyOrder((Void) null, (Void) null);
@@ -338,7 +338,7 @@ public class CreateTest {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testCreateWithKVVoidType() throws Exception {
     PCollection<KV<Void, Void>> output = p.apply(Create.of(
         KV.of((Void) null, (Void) null),

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DistinctTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DistinctTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DistinctTest.java
index b3b3925..17bbed6 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DistinctTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DistinctTest.java
@@ -26,8 +26,8 @@ import java.util.List;
 import java.util.Map;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.junit.Rule;
@@ -46,7 +46,7 @@ public class DistinctTest {
   public final TestPipeline p = TestPipeline.create();
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testDistinct() {
     List<String> strings = Arrays.asList(
         "k1",
@@ -70,7 +70,7 @@ public class DistinctTest {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testDistinctEmpty() {
     List<String> strings = Arrays.asList();
 
@@ -108,7 +108,7 @@ public class DistinctTest {
 
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testDistinctWithRepresentativeValue() {
     List<KV<String, String>> strings = Arrays.asList(
         KV.of("k1", "v1"),

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java
index 81e1d02..85ad796 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java
@@ -22,8 +22,8 @@ import static org.hamcrest.MatcherAssert.assertThat;
 
 import java.io.Serializable;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PCollection;
 import org.junit.Rule;
@@ -62,7 +62,7 @@ public class FilterTest implements Serializable {
   public final TestPipeline p = TestPipeline.create();
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testIdentityFilterByPredicate() {
     PCollection<Integer> output = p
         .apply(Create.of(591, 11789, 1257, 24578, 24799, 307))
@@ -73,7 +73,7 @@ public class FilterTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testNoFilterByPredicate() {
     PCollection<Integer> output = p
         .apply(Create.of(1, 2, 4, 5))
@@ -84,7 +84,7 @@ public class FilterTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testFilterByPredicate() {
     PCollection<Integer> output = p
         .apply(Create.of(1, 2, 3, 4, 5, 6, 7))
@@ -95,7 +95,7 @@ public class FilterTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testFilterLessThan() {
     PCollection<Integer> output = p
         .apply(Create.of(1, 2, 3, 4, 5, 6, 7))
@@ -106,7 +106,7 @@ public class FilterTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testFilterGreaterThan() {
     PCollection<Integer> output = p
         .apply(Create.of(1, 2, 3, 4, 5, 6, 7))
@@ -117,7 +117,7 @@ public class FilterTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testFilterLessThanEq() {
     PCollection<Integer> output = p
         .apply(Create.of(1, 2, 3, 4, 5, 6, 7))
@@ -128,7 +128,7 @@ public class FilterTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testFilterGreaterThanEq() {
     PCollection<Integer> output = p
         .apply(Create.of(1, 2, 3, 4, 5, 6, 7))

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
index 5800738..1753c49 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
@@ -49,8 +49,8 @@ import org.apache.beam.sdk.io.CountingInput;
 import org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Sessions;
 import org.apache.beam.sdk.transforms.windowing.Window;
@@ -83,7 +83,7 @@ public class FlattenTest implements Serializable {
 
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testFlattenPCollections() {
     List<List<String>> inputs = Arrays.asList(
       LINES, NO_LINES, LINES2, NO_LINES, LINES, NO_LINES);
@@ -97,7 +97,7 @@ public class FlattenTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testFlattenPCollectionsSingletonList() {
     PCollection<String> input = p.apply(Create.of(LINES));
     PCollection<String> output = PCollectionList.of(input).apply(Flatten.<String>pCollections());
@@ -109,7 +109,7 @@ public class FlattenTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testFlattenPCollectionsThenParDo() {
     List<List<String>> inputs = Arrays.asList(
       LINES, NO_LINES, LINES2, NO_LINES, LINES, NO_LINES);
@@ -124,7 +124,7 @@ public class FlattenTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testFlattenPCollectionsEmpty() {
     PCollection<String> output =
         PCollectionList.<String>empty(p)
@@ -135,7 +135,7 @@ public class FlattenTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testFlattenInputMultipleCopies() {
     int count = 5;
     PCollection<Long> longs = p.apply("mkLines", CountingInput.upTo(count));
@@ -167,7 +167,7 @@ public class FlattenTest implements Serializable {
   }
 
   @Test
-  @Category({RunnableOnService.class, FlattenWithHeterogeneousCoders.class})
+  @Category({ValidatesRunner.class, FlattenWithHeterogeneousCoders.class})
   public void testFlattenMultipleCoders() throws CannotProvideCoderException {
     PCollection<Long> bigEndianLongs =
         p.apply(
@@ -189,7 +189,7 @@ public class FlattenTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testEmptyFlattenAsSideInput() {
     final PCollectionView<Iterable<String>> view =
         PCollectionList.<String>empty(p)
@@ -212,7 +212,7 @@ public class FlattenTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testFlattenPCollectionsEmptyThenParDo() {
     PCollection<String> output =
         PCollectionList.<String>empty(p)
@@ -226,7 +226,7 @@ public class FlattenTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testFlattenNoListsNoCoder() {
-    // not RunnableOnService because it should fail at pipeline construction time anyhow.
+    // not ValidatesRunner because it should fail at pipeline construction time anyhow.
     thrown.expect(IllegalStateException.class);
     thrown.expectMessage("cannot provide a Coder for empty");
 
@@ -239,7 +239,7 @@ public class FlattenTest implements Serializable {
   /////////////////////////////////////////////////////////////////////////////
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testFlattenIterables() {
     PCollection<Iterable<String>> input = p
         .apply(Create.<Iterable<String>>of(LINES)
@@ -255,7 +255,7 @@ public class FlattenTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testFlattenIterablesLists() {
     PCollection<List<String>> input =
         p.apply(Create.<List<String>>of(LINES).withCoder(ListCoder.of(StringUtf8Coder.of())));
@@ -268,7 +268,7 @@ public class FlattenTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testFlattenIterablesSets() {
     Set<String> linesSet = ImmutableSet.copyOf(LINES);
 
@@ -283,7 +283,7 @@ public class FlattenTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testFlattenIterablesCollections() {
     Set<String> linesSet = ImmutableSet.copyOf(LINES);
 
@@ -299,7 +299,7 @@ public class FlattenTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testFlattenIterablesEmpty() {
     PCollection<Iterable<String>> input = p
         .apply(Create.<Iterable<String>>of(NO_LINES)


[2/4] beam git commit: Port Java modules from RunnableOnService to ValidatesRunner

Posted by ke...@apache.org.
http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
index 73cedfd..3443847 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
@@ -47,8 +47,8 @@ import org.apache.beam.sdk.coders.MapCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.InvalidWindows;
@@ -86,7 +86,7 @@ public class GroupByKeyTest {
   public ExpectedException thrown = ExpectedException.none();
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testGroupByKey() {
     List<KV<String, Integer>> ungroupedPairs = Arrays.asList(
         KV.of("k1", 3),
@@ -126,7 +126,7 @@ public class GroupByKeyTest {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testGroupByKeyAndWindows() {
     List<KV<String, Integer>> ungroupedPairs = Arrays.asList(
         KV.of("k1", 3),  // window [0, 5)
@@ -168,7 +168,7 @@ public class GroupByKeyTest {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testGroupByKeyEmpty() {
     List<KV<String, Integer>> ungroupedPairs = Arrays.asList();
 
@@ -318,7 +318,7 @@ public class GroupByKeyTest {
    * the two values.
    */
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testOutputTimeFnEarliest() {
 
     p.apply(
@@ -339,7 +339,7 @@ public class GroupByKeyTest {
    * with the windowing function customized to use the latest value.
    */
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testOutputTimeFnLatest() {
     p.apply(
         Create.timestamped(
@@ -389,7 +389,7 @@ public class GroupByKeyTest {
    * and not the value itself.
    */
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testGroupByKeyWithBadEqualsHashCode() throws Exception {
     final int numValues = 10;
     final int numKeys = 5;

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/KeysTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/KeysTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/KeysTest.java
index 2a19802..dafb953 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/KeysTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/KeysTest.java
@@ -22,8 +22,8 @@ import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.junit.Rule;
@@ -54,7 +54,7 @@ public class KeysTest {
   public final TestPipeline p = TestPipeline.create();
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testKeys() {
     PCollection<KV<String, Integer>> input =
         p.apply(Create.of(Arrays.asList(TABLE)).withCoder(
@@ -68,7 +68,7 @@ public class KeysTest {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testKeysEmpty() {
     PCollection<KV<String, Integer>> input =
         p.apply(Create.of(Arrays.asList(EMPTY_TABLE)).withCoder(

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/KvSwapTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/KvSwapTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/KvSwapTest.java
index 859312f..762451f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/KvSwapTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/KvSwapTest.java
@@ -23,8 +23,8 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.junit.Rule;
@@ -56,7 +56,7 @@ public class KvSwapTest {
   public final TestPipeline p = TestPipeline.create();
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testKvSwap() {
     PCollection<KV<String, Integer>> input =
         p.apply(Create.of(Arrays.asList(TABLE)).withCoder(
@@ -77,7 +77,7 @@ public class KvSwapTest {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testKvSwapEmpty() {
     PCollection<KV<String, Integer>> input =
         p.apply(Create.of(Arrays.asList(EMPTY_TABLE)).withCoder(

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
index 47d0b87..82e856e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
@@ -26,8 +26,8 @@ import java.io.Serializable;
 import java.util.Set;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
 import org.apache.beam.sdk.values.KV;
@@ -250,7 +250,7 @@ public class MapElementsTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testPrimitiveDisplayData() {
     SimpleFunction<Integer, ?> mapFn = new SimpleFunction<Integer, Integer>() {
       @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java
index 2c3a735..fda8947 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java
@@ -28,8 +28,8 @@ import static org.junit.Assert.fail;
 import java.io.Serializable;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.beam.sdk.testing.NeedsRunner;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
@@ -49,7 +49,7 @@ public class ParDoLifecycleTest implements Serializable {
   public final transient TestPipeline p = TestPipeline.create();
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testOldFnCallSequence() {
     PCollectionList.of(p.apply("Impolite", Create.of(1, 2, 4)))
         .and(p.apply("Polite", Create.of(3, 5, 6, 7)))
@@ -60,7 +60,7 @@ public class ParDoLifecycleTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testOldFnCallSequenceMulti() {
     PCollectionList.of(p.apply("Impolite", Create.of(1, 2, 4)))
         .and(p.apply("Polite", Create.of(3, 5, 6, 7)))
@@ -128,7 +128,7 @@ public class ParDoLifecycleTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testFnCallSequence() {
     PCollectionList.of(p.apply("Impolite", Create.of(1, 2, 4)))
         .and(p.apply("Polite", Create.of(3, 5, 6, 7)))
@@ -139,7 +139,7 @@ public class ParDoLifecycleTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testFnCallSequenceMulti() {
     PCollectionList.of(p.apply("Impolite", Create.of(1, 2, 4)))
         .and(p.apply("Polite", Create.of(3, 5, 6, 7)))

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index e0fa02c..336f4c0 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -62,7 +62,6 @@ import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.io.CountingInput;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.testing.UsesMapState;
@@ -70,6 +69,7 @@ import org.apache.beam.sdk.testing.UsesSetState;
 import org.apache.beam.sdk.testing.UsesStatefulParDo;
 import org.apache.beam.sdk.testing.UsesTestStream;
 import org.apache.beam.sdk.testing.UsesTimersInParDo;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.DoFn.OnTimer;
 import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
 import org.apache.beam.sdk.transforms.ParDo.Bound;
@@ -320,7 +320,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testParDo() {
 
     List<Integer> inputs = Arrays.asList(3, -42, 666);
@@ -336,7 +336,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testParDo2() {
 
     List<Integer> inputs = Arrays.asList(3, -42, 666);
@@ -352,7 +352,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testParDoEmpty() {
 
     List<Integer> inputs = Arrays.asList();
@@ -368,7 +368,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testParDoEmptyOutputs() {
 
     List<Integer> inputs = Arrays.asList();
@@ -383,7 +383,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testParDoWithSideOutputs() {
 
     List<Integer> inputs = Arrays.asList(3, -42, 666);
@@ -425,7 +425,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testParDoEmptyWithSideOutputs() {
     TupleTag<String> mainOutputTag = new TupleTag<String>("main"){};
     TupleTag<String> sideOutputTag1 = new TupleTag<String>("side1"){};
@@ -463,7 +463,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testParDoWithEmptySideOutputs() {
     TupleTag<String> mainOutputTag = new TupleTag<String>("main"){};
     TupleTag<String> sideOutputTag1 = new TupleTag<String>("side1"){};
@@ -487,7 +487,7 @@ public class ParDoTest implements Serializable {
 
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testParDoWithOnlySideOutputs() {
 
     List<Integer> inputs = Arrays.asList(3, -42, 666);
@@ -569,7 +569,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testParDoWithSideInputs() {
 
     List<Integer> inputs = Arrays.asList(3, -42, 666);
@@ -600,7 +600,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testParDoWithSideInputsIsCumulative() {
 
     List<Integer> inputs = Arrays.asList(3, -42, 666);
@@ -633,7 +633,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testMultiOutputParDoWithSideInputs() {
 
     List<Integer> inputs = Arrays.asList(3, -42, 666);
@@ -670,7 +670,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testMultiOutputParDoWithSideInputsIsCumulative() {
 
     List<Integer> inputs = Arrays.asList(3, -42, 666);
@@ -740,7 +740,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testSideInputsWithMultipleWindows() {
     // Tests that the runner can safely run a DoFn that uses side inputs
     // on an input where the element is in multiple windows. The complication is
@@ -908,7 +908,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testParDoInCustomTransform() {
 
     List<Integer> inputs = Arrays.asList(3, -42, 666);
@@ -1283,7 +1283,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testParDoShiftTimestamp() {
 
     PCollection<Integer> input =
@@ -1346,7 +1346,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testParDoShiftTimestampUnlimited() {
     PCollection<Long> outputs =
         pipeline
@@ -1432,7 +1432,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testWindowingInStartAndFinishBundle() {
 
     PCollection<String> output =
@@ -1536,7 +1536,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category({RunnableOnService.class, UsesStatefulParDo.class})
+  @Category({ValidatesRunner.class, UsesStatefulParDo.class})
   public void testValueStateSimple() {
     final String stateId = "foo";
 
@@ -1565,7 +1565,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category({RunnableOnService.class, UsesStatefulParDo.class})
+  @Category({ValidatesRunner.class, UsesStatefulParDo.class})
   public void testValueStateDedup() {
     final String stateId = "foo";
 
@@ -1613,7 +1613,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category({RunnableOnService.class, UsesStatefulParDo.class})
+  @Category({ValidatesRunner.class, UsesStatefulParDo.class})
   public void testValueStateFixedWindows() {
     final String stateId = "foo";
 
@@ -1661,7 +1661,7 @@ public class ParDoTest implements Serializable {
    * which may (or may not) be executed in similar contexts after runner optimizations.
    */
   @Test
-  @Category({RunnableOnService.class, UsesStatefulParDo.class})
+  @Category({ValidatesRunner.class, UsesStatefulParDo.class})
   public void testValueStateSameId() {
     final String stateId = "foo";
 
@@ -1711,7 +1711,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category({RunnableOnService.class, UsesStatefulParDo.class})
+  @Category({ValidatesRunner.class, UsesStatefulParDo.class})
   public void testValueStateSideOutput() {
     final String stateId = "foo";
 
@@ -1761,7 +1761,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category({RunnableOnService.class, UsesStatefulParDo.class})
+  @Category({ValidatesRunner.class, UsesStatefulParDo.class})
   public void testBagState() {
     final String stateId = "foo";
 
@@ -1796,7 +1796,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category({RunnableOnService.class, UsesStatefulParDo.class, UsesSetState.class})
+  @Category({ValidatesRunner.class, UsesStatefulParDo.class, UsesSetState.class})
   public void testSetState() {
     final String stateId = "foo";
     final String countStateId = "count";
@@ -1838,7 +1838,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category({RunnableOnService.class, UsesStatefulParDo.class, UsesMapState.class})
+  @Category({ValidatesRunner.class, UsesStatefulParDo.class, UsesMapState.class})
   public void testMapState() {
     final String stateId = "foo";
     final String countStateId = "count";
@@ -1883,7 +1883,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category({RunnableOnService.class, UsesStatefulParDo.class})
+  @Category({ValidatesRunner.class, UsesStatefulParDo.class})
   public void testCombiningState() {
     final String stateId = "foo";
 
@@ -1922,7 +1922,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category({RunnableOnService.class, UsesStatefulParDo.class})
+  @Category({ValidatesRunner.class, UsesStatefulParDo.class})
   public void testBagStateSideInput() {
 
     final PCollectionView<List<Integer>> listView =
@@ -1984,7 +1984,7 @@ public class ParDoTest implements Serializable {
    * and is only supported by the direct runner.
    */
   @Test
-  @Category({RunnableOnService.class, UsesTimersInParDo.class})
+  @Category({ValidatesRunner.class, UsesTimersInParDo.class})
   public void testEventTimeTimerBounded() throws Exception {
     final String timerId = "foo";
 
@@ -2012,7 +2012,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category({RunnableOnService.class, UsesTimersInParDo.class})
+  @Category({ValidatesRunner.class, UsesTimersInParDo.class})
   public void testTimerReceivedInOriginalWindow() throws Exception {
     final String timerId = "foo";
 
@@ -2060,7 +2060,7 @@ public class ParDoTest implements Serializable {
    * supplementary output. The test is otherwise identical to {@link #testEventTimeTimerBounded()}.
    */
   @Test
-  @Category({RunnableOnService.class, UsesTimersInParDo.class})
+  @Category({ValidatesRunner.class, UsesTimersInParDo.class})
   public void testEventTimeTimerAbsolute() throws Exception {
     final String timerId = "foo";
 
@@ -2093,7 +2093,7 @@ public class ParDoTest implements Serializable {
    * implementations that may GC in ways not simply governed by the watermark.
    */
   @Test
-  @Category({RunnableOnService.class, UsesTimersInParDo.class})
+  @Category({ValidatesRunner.class, UsesTimersInParDo.class})
   public void testEventTimeTimerMultipleKeys() throws Exception {
     final String timerId = "foo";
     final String stateId = "sizzle";
@@ -2157,7 +2157,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category({RunnableOnService.class, UsesTimersInParDo.class})
+  @Category({ValidatesRunner.class, UsesTimersInParDo.class})
   public void testAbsoluteProcessingTimeTimerRejected() throws Exception {
     final String timerId = "foo";
 
@@ -2186,7 +2186,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category({RunnableOnService.class, UsesTimersInParDo.class})
+  @Category({ValidatesRunner.class, UsesTimersInParDo.class})
   public void testOutOfBoundsEventTimeTimer() throws Exception {
     final String timerId = "foo";
 

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PartitionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PartitionTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PartitionTest.java
index 87d7460..3d81b39 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PartitionTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PartitionTest.java
@@ -27,8 +27,8 @@ import java.util.ArrayList;
 import java.util.List;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Partition.PartitionFn;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PCollection;
@@ -65,7 +65,7 @@ public class PartitionTest implements Serializable {
 
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testEvenOddPartition() {
 
     PCollectionList<Integer> outputs = pipeline

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
index 8e426c6..80f361f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
@@ -37,8 +37,8 @@ import org.apache.beam.sdk.TestUtils;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PCollection;
 import org.junit.Rule;
@@ -163,7 +163,7 @@ public class SampleTest {
     }
 
     @Test
-    @Category(RunnableOnService.class)
+    @Category(ValidatesRunner.class)
     public void testPickAny() {
       runPickAnyTest(lines, limit);
     }
@@ -235,7 +235,7 @@ public class SampleTest {
     }
 
     @Test
-    @Category(RunnableOnService.class)
+    @Category(ValidatesRunner.class)
     public void testSample() {
 
       PCollection<Integer> input =
@@ -249,7 +249,7 @@ public class SampleTest {
     }
 
     @Test
-    @Category(RunnableOnService.class)
+    @Category(ValidatesRunner.class)
     public void testSampleEmpty() {
 
       PCollection<Integer> input = pipeline.apply(Create.empty(BigEndianIntegerCoder.of()));
@@ -262,7 +262,7 @@ public class SampleTest {
     }
 
     @Test
-    @Category(RunnableOnService.class)
+    @Category(ValidatesRunner.class)
     public void testSampleZero() {
 
       PCollection<Integer> input = pipeline.apply(Create.of(ImmutableList.copyOf(DATA))
@@ -276,7 +276,7 @@ public class SampleTest {
     }
 
     @Test
-    @Category(RunnableOnService.class)
+    @Category(ValidatesRunner.class)
     public void testSampleInsufficientElements() {
 
       PCollection<Integer> input =
@@ -301,7 +301,7 @@ public class SampleTest {
     }
 
     @Test
-    @Category(RunnableOnService.class)
+    @Category(ValidatesRunner.class)
     public void testSampleMultiplicity() {
 
       PCollection<Integer> input =

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
index fefccc4..acd5584 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
@@ -31,10 +31,10 @@ import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.testing.UsesSplittableParDo;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement;
 import org.apache.beam.sdk.transforms.splittabledofn.OffsetRange;
 import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
@@ -105,7 +105,7 @@ public class SplittableDoFnTest {
   public final transient TestPipeline p = TestPipeline.create();
 
   @Test
-  @Category({RunnableOnService.class, UsesSplittableParDo.class})
+  @Category({ValidatesRunner.class, UsesSplittableParDo.class})
   public void testPairWithIndexBasic() {
 
     PCollection<KV<String, Integer>> res =
@@ -129,7 +129,7 @@ public class SplittableDoFnTest {
   }
 
   @Test
-  @Category({RunnableOnService.class, UsesSplittableParDo.class})
+  @Category({ValidatesRunner.class, UsesSplittableParDo.class})
   public void testPairWithIndexWindowedTimestamped() {
     // Tests that Splittable DoFn correctly propagates windowing strategy, windows and timestamps
     // of elements in the input collection.
@@ -227,7 +227,7 @@ public class SplittableDoFnTest {
   }
 
   @Test
-  @Category({RunnableOnService.class, UsesSplittableParDo.class})
+  @Category({ValidatesRunner.class, UsesSplittableParDo.class})
   public void testOutputAfterCheckpoint() throws Exception {
     PCollection<Integer> outputs = p.apply(Create.of("foo"))
         .apply(ParDo.of(new SDFWithMultipleOutputsPerBlock()));
@@ -266,7 +266,7 @@ public class SplittableDoFnTest {
   }
 
   @Test
-  @Category({RunnableOnService.class, UsesSplittableParDo.class})
+  @Category({ValidatesRunner.class, UsesSplittableParDo.class})
   public void testSideInputsAndOutputs() throws Exception {
 
     PCollectionView<String> sideInput =
@@ -292,7 +292,7 @@ public class SplittableDoFnTest {
   }
 
   @Test
-  @Category({RunnableOnService.class, UsesSplittableParDo.class})
+  @Category({ValidatesRunner.class, UsesSplittableParDo.class})
   public void testLateData() throws Exception {
 
     Instant base = Instant.now();
@@ -387,7 +387,7 @@ public class SplittableDoFnTest {
   }
 
   @Test
-  @Category({RunnableOnService.class, UsesSplittableParDo.class})
+  @Category({ValidatesRunner.class, UsesSplittableParDo.class})
   public void testLifecycleMethods() throws Exception {
 
     PCollection<String> res =

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ToStringTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ToStringTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ToStringTest.java
index ab446a4..5b9c2c8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ToStringTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ToStringTest.java
@@ -20,12 +20,11 @@ package org.apache.beam.sdk.transforms;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.junit.Rule;
@@ -43,7 +42,7 @@ public class ToStringTest {
   public final TestPipeline p = TestPipeline.create();
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testToStringOf() {
     Integer[] ints = {1, 2, 3, 4, 5};
     String[] strings = {"1", "2", "3", "4", "5"};
@@ -54,7 +53,7 @@ public class ToStringTest {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testToStringKV() {
     ArrayList<KV<String, Integer>> kvs = new ArrayList<>();
     kvs.add(KV.of("one", 1));
@@ -71,7 +70,7 @@ public class ToStringTest {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testToStringKVWithDelimiter() {
     ArrayList<KV<String, Integer>> kvs = new ArrayList<>();
     kvs.add(KV.of("one", 1));
@@ -88,7 +87,7 @@ public class ToStringTest {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testToStringIterable() {
     ArrayList<Iterable<String>> iterables = new ArrayList<>();
     iterables.add(Arrays.asList(new String[]{"one", "two", "three"}));
@@ -106,7 +105,7 @@ public class ToStringTest {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testToStringIterableWithDelimiter() {
     ArrayList<Iterable<String>> iterables = new ArrayList<>();
     iterables.add(Arrays.asList(new String[]{"one", "two", "three"}));

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ValuesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ValuesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ValuesTest.java
index 5e27552..e290771 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ValuesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ValuesTest.java
@@ -24,8 +24,8 @@ import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.junit.Rule;
@@ -56,7 +56,7 @@ public class ValuesTest {
   public final TestPipeline p = TestPipeline.create();
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testValues() {
 
     PCollection<KV<String, Integer>> input =
@@ -72,7 +72,7 @@ public class ValuesTest {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testValuesEmpty() {
 
     PCollection<KV<String, Integer>> input =

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
index ee8bf40..740d808 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
@@ -49,8 +49,8 @@ import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.InvalidWindows;
@@ -90,7 +90,7 @@ public class ViewTest implements Serializable {
   public transient ExpectedException thrown = ExpectedException.none();
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testSingletonSideInput() {
 
     final PCollectionView<Integer> view =
@@ -112,7 +112,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testWindowedSingletonSideInput() {
 
     final PCollectionView<Integer> view =
@@ -191,7 +191,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testListSideInput() {
 
     final PCollectionView<List<Integer>> view =
@@ -217,7 +217,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testWindowedListSideInput() {
 
     final PCollectionView<List<Integer>> view =
@@ -257,7 +257,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testEmptyListSideInput() throws Exception {
 
     final PCollectionView<List<Integer>> view =
@@ -283,7 +283,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testListSideInputIsImmutable() {
 
     final PCollectionView<List<Integer>> view =
@@ -328,7 +328,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testIterableSideInput() {
 
     final PCollectionView<Iterable<Integer>> view =
@@ -353,7 +353,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testWindowedIterableSideInput() {
 
     final PCollectionView<Iterable<Integer>> view =
@@ -392,7 +392,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testEmptyIterableSideInput() throws Exception {
 
     final PCollectionView<Iterable<Integer>> view =
@@ -417,7 +417,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testIterableSideInputIsImmutable() {
 
     final PCollectionView<Iterable<Integer>> view =
@@ -448,7 +448,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testMultimapSideInput() {
 
     final PCollectionView<Map<String, Iterable<Integer>>> view =
@@ -475,7 +475,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testMultimapAsEntrySetSideInput() {
 
     final PCollectionView<Map<String, Iterable<Integer>>> view =
@@ -526,7 +526,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testMultimapSideInputWithNonDeterministicKeyCoder() {
 
     final PCollectionView<Map<String, Iterable<Integer>>> view =
@@ -555,7 +555,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testWindowedMultimapSideInput() {
 
     final PCollectionView<Map<String, Iterable<Integer>>> view =
@@ -593,7 +593,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testWindowedMultimapAsEntrySetSideInput() {
 
     final PCollectionView<Map<String, Iterable<Integer>>> view =
@@ -635,7 +635,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testWindowedMultimapSideInputWithNonDeterministicKeyCoder() {
 
     final PCollectionView<Map<String, Iterable<Integer>>> view =
@@ -674,7 +674,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testEmptyMultimapSideInput() throws Exception {
 
     final PCollectionView<Map<String, Iterable<Integer>>> view =
@@ -702,7 +702,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testEmptyMultimapSideInputWithNonDeterministicKeyCoder() throws Exception {
 
     final PCollectionView<Map<String, Iterable<Integer>>> view =
@@ -732,7 +732,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testMultimapSideInputIsImmutable() {
 
     final PCollectionView<Map<String, Iterable<Integer>>> view =
@@ -779,7 +779,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testMapSideInput() {
 
     final PCollectionView<Map<String, Integer>> view =
@@ -805,7 +805,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testMapAsEntrySetSideInput() {
 
     final PCollectionView<Map<String, Integer>> view =
@@ -834,7 +834,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testMapSideInputWithNonDeterministicKeyCoder() {
 
     final PCollectionView<Map<String, Integer>> view =
@@ -862,7 +862,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testWindowedMapSideInput() {
 
     final PCollectionView<Map<String, Integer>> view =
@@ -899,7 +899,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testWindowedMapAsEntrySetSideInput() {
 
     final PCollectionView<Map<String, Integer>> view =
@@ -940,7 +940,7 @@ public class ViewTest implements Serializable {
 
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testWindowedMapSideInputWithNonDeterministicKeyCoder() {
 
     final PCollectionView<Map<String, Integer>> view =
@@ -979,7 +979,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testEmptyMapSideInput() throws Exception {
 
     final PCollectionView<Map<String, Integer>> view =
@@ -1008,7 +1008,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testEmptyMapSideInputWithNonDeterministicKeyCoder() throws Exception {
 
     final PCollectionView<Map<String, Integer>> view =
@@ -1071,7 +1071,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testMapSideInputIsImmutable() {
 
     final PCollectionView<Map<String, Integer>> view =
@@ -1117,7 +1117,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testCombinedMapSideInput() {
 
     final PCollectionView<Map<String, Integer>> view =
@@ -1143,7 +1143,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testWindowedSideInputFixedToFixed() {
 
     final PCollectionView<Integer> view =
@@ -1175,7 +1175,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testWindowedSideInputFixedToGlobal() {
 
     final PCollectionView<Integer> view =
@@ -1207,7 +1207,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testWindowedSideInputFixedToFixedWithDefault() {
 
     final PCollectionView<Integer> view =
@@ -1237,7 +1237,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testSideInputWithNullDefault() {
 
     final PCollectionView<Void> view =
@@ -1266,7 +1266,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testSideInputWithNestedIterables() {
     final PCollectionView<Iterable<Integer>> view1 =
         pipeline.apply("CreateVoid1", Create.of((Void) null).withCoder(VoidCoder.of()))

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java
index 67a2658..48e07ad 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java
@@ -23,8 +23,8 @@ import java.io.Serializable;
 import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Duration;
@@ -49,7 +49,7 @@ public class WithTimestampsTest implements Serializable {
   public transient ExpectedException thrown = ExpectedException.none();
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void withTimestampsShouldApplyTimestamps() {
 
     SerializableFunction<String, Instant> timestampFn =
@@ -120,7 +120,7 @@ public class WithTimestampsTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void withTimestampsBackwardsInTimeAndWithAllowedTimestampSkewShouldSucceed() {
 
     SerializableFunction<String, Instant> timestampFn =
@@ -195,7 +195,7 @@ public class WithTimestampsTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void withTimestampsWithNullFnShouldThrowOnConstruction() {
 
     SerializableFunction<String, Instant> timestampFn = null;

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
index 0e5c177..4e61f4e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
@@ -32,8 +32,8 @@ import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFnTester;
@@ -123,7 +123,7 @@ public class CoGroupByKeyTest implements Serializable {
   public final transient TestPipeline p = TestPipeline.create();
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testCoGroupByKeyGetOnly() {
     final TupleTag<String> tag1 = new TupleTag<>();
     final TupleTag<String> tag2 = new TupleTag<>();
@@ -260,7 +260,7 @@ public class CoGroupByKeyTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testCoGroupByKey() {
     final TupleTag<String> namesTag = new TupleTag<>();
     final TupleTag<String> addressesTag = new TupleTag<>();
@@ -451,7 +451,7 @@ public class CoGroupByKeyTest implements Serializable {
    */
   @SuppressWarnings("unchecked")
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testCoGroupByKeyHandleResults() {
     TupleTag<String> namesTag = new TupleTag<>();
     TupleTag<String> addressesTag = new TupleTag<>();
@@ -480,7 +480,7 @@ public class CoGroupByKeyTest implements Serializable {
    */
   @SuppressWarnings("unchecked")
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testCoGroupByKeyWithWindowing() {
     TupleTag<String> clicksTag = new TupleTag<>();
     TupleTag<String> purchasesTag = new TupleTag<>();

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
index 327b7a8..2bc8d86 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
@@ -42,8 +42,8 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.CountingInput;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
@@ -312,7 +312,7 @@ public class WindowTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testNoWindowFnDoesNotReassignWindows() {
     pipeline.enableAbandonedNodeEnforcement(true);
 
@@ -364,7 +364,7 @@ public class WindowTest implements Serializable {
    * with the windowing function default, the end of the window.
    */
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testOutputTimeFnDefault() {
     pipeline.enableAbandonedNodeEnforcement(true);
 
@@ -398,7 +398,7 @@ public class WindowTest implements Serializable {
    * with the windowing function customized to use the end of the window.
    */
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testOutputTimeFnEndOfWindow() {
     pipeline.enableAbandonedNodeEnforcement(true);
 
@@ -449,7 +449,7 @@ public class WindowTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testPrimitiveDisplayData() {
     FixedWindows windowFn = FixedWindows.of(Duration.standardHours(5));
     AfterWatermark.FromEndOfWindow triggerBuilder = AfterWatermark.pastEndOfWindow();

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
index b0976e4..a3f5352 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
@@ -25,8 +25,8 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -89,7 +89,7 @@ public class WindowingTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testPartitioningWindowing() {
     PCollection<String> input =
         p.apply(
@@ -114,7 +114,7 @@ public class WindowingTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testNonPartitioningWindowing() {
     PCollection<String> input =
         p.apply(
@@ -139,7 +139,7 @@ public class WindowingTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testMergingWindowing() {
     PCollection<String> input =
         p.apply(
@@ -160,7 +160,7 @@ public class WindowingTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testWindowPreservation() {
     PCollection<String> input1 = p.apply("Create12",
         Create.timestamped(

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReifyTimestampsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReifyTimestampsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReifyTimestampsTest.java
index b78de8e..2942efd 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReifyTimestampsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReifyTimestampsTest.java
@@ -23,8 +23,8 @@ import static org.junit.Assert.assertThat;
 
 import java.io.Serializable;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -48,7 +48,7 @@ public class ReifyTimestampsTest implements Serializable {
   @Rule public transient TestPipeline pipeline = TestPipeline.create();
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void inValuesSucceeds() {
     PCollection<KV<String, Integer>> timestamped =
         pipeline
@@ -76,7 +76,7 @@ public class ReifyTimestampsTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void extractFromValuesSucceeds() {
     PCollection<KV<String, TimestampedValue<Integer>>> preified =
         pipeline.apply(

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTest.java
index 81a6d82..eae465c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTest.java
@@ -28,8 +28,8 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -79,7 +79,7 @@ public class ReshuffleTest implements Serializable {
   public final transient TestPipeline pipeline = TestPipeline.create();
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testJustReshuffle() {
 
     PCollection<KV<String, Integer>> input = pipeline
@@ -103,7 +103,7 @@ public class ReshuffleTest implements Serializable {
    * {@link WindowingStrategy}.
    */
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testReshufflePreservesTimestamps() {
     PCollection<KV<String, TimestampedValue<String>>> input =
         pipeline
@@ -155,7 +155,7 @@ public class ReshuffleTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testReshuffleAfterSessionsAndGroupByKey() {
 
     PCollection<KV<String, Iterable<Integer>>> input = pipeline
@@ -178,7 +178,7 @@ public class ReshuffleTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testReshuffleAfterFixedWindowsAndGroupByKey() {
 
     PCollection<KV<String, Iterable<Integer>>> input = pipeline
@@ -201,7 +201,7 @@ public class ReshuffleTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testReshuffleAfterSlidingWindowsAndGroupByKey() {
 
     PCollection<KV<String, Iterable<Integer>>> input = pipeline
@@ -224,7 +224,7 @@ public class ReshuffleTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testReshuffleAfterFixedWindows() {
 
     PCollection<KV<String, Integer>> input = pipeline
@@ -247,7 +247,7 @@ public class ReshuffleTest implements Serializable {
 
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testReshuffleAfterSlidingWindows() {
 
     PCollection<KV<String, Integer>> input = pipeline

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
index 7d767cf..010d726 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
@@ -32,8 +32,8 @@ import java.util.Map;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.CountingInput;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.MapElements;
@@ -73,7 +73,7 @@ public final class PCollectionTupleTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testComposePCollectionTuple() {
     pipeline.enableAbandonedNodeEnforcement(true);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java
index ba7477d..7c9d1d9 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java
@@ -22,8 +22,8 @@ import static org.apache.beam.sdk.TestUtils.LINES;
 import java.io.File;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.testing.NeedsRunner;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.junit.Ignore;
@@ -79,7 +79,7 @@ public class PDoneTest {
   // transforms that contain no nested transforms.
   @Ignore
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testEmptyTransform() {
     p.begin().apply(new EmptyTransform());
 

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index 60054c6..a14f480 100644
--- a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++ b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -32,9 +32,9 @@ import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.SourceTestUtils;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFnTester;
@@ -135,7 +135,7 @@ public class ElasticsearchIOTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testRead() throws Exception {
     ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, node.client());
 
@@ -152,7 +152,7 @@ public class ElasticsearchIOTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testReadWithQuery() throws Exception {
     ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, node.client());
 
@@ -179,7 +179,7 @@ public class ElasticsearchIOTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testWrite() throws Exception {
     List<String> data =
         ElasticSearchIOTestUtils.createDocuments(

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 4dcc8d5..35bb9c3 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -127,10 +127,10 @@ import org.apache.beam.sdk.testing.CoderProperties;
 import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.SourceTestUtils;
 import org.apache.beam.sdk.testing.SourceTestUtils.ExpectedSplitOutcome;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFnTester;
@@ -1295,8 +1295,8 @@ public class BigQueryIOTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
-  @Ignore("[BEAM-436] DirectRunner RunnableOnService tempLocation configuration insufficient")
+  @Category(ValidatesRunner.class)
+  @Ignore("[BEAM-436] DirectRunner ValidatesRunner tempLocation configuration insufficient")
   public void testTableSourcePrimitiveDisplayData() throws IOException, InterruptedException {
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
     BigQueryIO.Read read = BigQueryIO.read()
@@ -1312,8 +1312,8 @@ public class BigQueryIOTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
-  @Ignore("[BEAM-436] DirectRunner RunnableOnService tempLocation configuration insufficient")
+  @Category(ValidatesRunner.class)
+  @Ignore("[BEAM-436] DirectRunner ValidatesRunner tempLocation configuration insufficient")
   public void testQuerySourcePrimitiveDisplayData() throws IOException, InterruptedException {
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
     BigQueryIO.Read read = BigQueryIO.read()
@@ -1339,15 +1339,15 @@ public class BigQueryIOTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
-  @Ignore("[BEAM-436] DirectRunner RunnableOnService tempLocation configuration insufficient")
+  @Category(ValidatesRunner.class)
+  @Ignore("[BEAM-436] DirectRunner ValidatesRunner tempLocation configuration insufficient")
   public void testBatchWritePrimitiveDisplayData() throws IOException, InterruptedException {
     testWritePrimitiveDisplayData(/* streaming: */ false);
   }
 
   @Test
-  @Category(RunnableOnService.class)
-  @Ignore("[BEAM-436] DirectRunner RunnableOnService tempLocation configuration insufficient")
+  @Category(ValidatesRunner.class)
+  @Ignore("[BEAM-436] DirectRunner ValidatesRunner tempLocation configuration insufficient")
   public void testStreamingWritePrimitiveDisplayData() throws IOException, InterruptedException {
     testWritePrimitiveDisplayData(/* streaming: */ true);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
index 879e30e..af27926 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
@@ -87,8 +87,8 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.DoFnTester;
 import org.apache.beam.sdk.transforms.DoFnTester.CloningBehavior;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -429,7 +429,7 @@ public class DatastoreV1Test {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testWritePrimitiveDisplayData() {
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
     PTransform<PCollection<Entity>, ?> write =
@@ -443,7 +443,7 @@ public class DatastoreV1Test {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testDeleteEntityPrimitiveDisplayData() {
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
     PTransform<PCollection<Entity>, ?> write =
@@ -457,7 +457,7 @@ public class DatastoreV1Test {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testDeleteKeyPrimitiveDisplayData() {
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
     PTransform<PCollection<Key>, ?> write =

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
index 28ca5f7..da89aa2 100644
--- a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
+++ b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
@@ -24,13 +24,12 @@ import java.net.ServerSocket;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Set;
-
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.Connection;
 import org.apache.beam.sdk.io.mqtt.MqttIO.Read;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.values.PCollection;
 import org.fusesource.hawtbuf.Buffer;
@@ -79,7 +78,7 @@ public class MqttIOTest {
   }
 
   @Test(timeout = 60 * 1000)
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testReadNoClientId() throws Exception {
     final String topicName = "READ_TOPIC_NO_CLIENT_ID";
     Read mqttReader = MqttIO.read()
@@ -139,7 +138,7 @@ public class MqttIOTest {
   }
 
   @Test(timeout = 60 * 1000)
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testRead() throws Exception {
     PCollection<byte[]> output = pipeline.apply(
         MqttIO.read()
@@ -199,7 +198,7 @@ public class MqttIOTest {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testWrite() throws Exception {
     MQTT client = new MQTT();
     client.setHost("tcp://localhost:" + port);

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FilterJava8Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FilterJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FilterJava8Test.java
index f91371e..a3481e1 100644
--- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FilterJava8Test.java
+++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FilterJava8Test.java
@@ -20,8 +20,8 @@ package org.apache.beam.sdk.transforms;
 import java.io.Serializable;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.values.PCollection;
 import org.junit.Rule;
 import org.junit.Test;
@@ -44,7 +44,7 @@ public class FilterJava8Test implements Serializable {
   public transient ExpectedException thrown = ExpectedException.none();
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testIdentityFilterByPredicate() {
 
     PCollection<Integer> output = pipeline
@@ -67,7 +67,7 @@ public class FilterJava8Test implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testFilterByPredicate() {
 
     PCollection<Integer> output = pipeline
@@ -95,7 +95,7 @@ public class FilterJava8Test implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testFilterByMethodReference() {
 
     PCollection<Integer> output = pipeline

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java
index 6ba41fa..4f0361e 100644
--- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java
+++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java
@@ -20,8 +20,8 @@ package org.apache.beam.sdk.transforms;
 import static org.hamcrest.Matchers.containsString;
 
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TypeDescriptor;
@@ -45,7 +45,7 @@ public class WithKeysJava8Test {
   public ExpectedException thrown = ExpectedException.none();
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void withLambdaAndTypeDescriptorShouldSucceed() {
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsJava8Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsJava8Test.java
index a0c6370..ee23d95 100644
--- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsJava8Test.java
+++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsJava8Test.java
@@ -19,8 +19,8 @@ package org.apache.beam.sdk.transforms;
 
 import java.io.Serializable;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Instant;
@@ -40,7 +40,7 @@ public class WithTimestampsJava8Test implements Serializable {
   public final transient TestPipeline p = TestPipeline.create();
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void withTimestampsLambdaShouldApplyTimestamps() {
 
     final String yearTwoThousand = "946684800000";