You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/05/25 21:30:16 UTC

[1/2] incubator-beam git commit: Add NeedsRunner, annotate core SDK Tests

Repository: incubator-beam
Updated Branches:
  refs/heads/master 9f97ea0a7 -> bde2a856f


Add NeedsRunner, annotate core SDK Tests

Core SDK tests that require a runner to run cannot be executed while
building the Core SDK, as all runners depend on the Core SDK to be
built. Instead, defer the execution of these tests until the direct
runner is built, and execute them with it on the classpath.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/928a302d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/928a302d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/928a302d

Branch: refs/heads/master
Commit: 928a302d2dc1ac1187bb310632d359a803e60cff
Parents: e2307f2
Author: Thomas Groh <tg...@google.com>
Authored: Thu May 19 18:52:54 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue May 24 10:37:32 2016 -0700

----------------------------------------------------------------------
 sdks/java/core/pom.xml                          |  2 +-
 .../apache/beam/sdk/testing/NeedsRunner.java    | 28 ++++++++++++++++++++
 .../beam/sdk/testing/RunnableOnService.java     |  2 +-
 .../java/org/apache/beam/sdk/PipelineTest.java  |  2 ++
 .../apache/beam/sdk/coders/AvroCoderTest.java   |  3 +++
 .../beam/sdk/coders/CoderRegistryTest.java      |  4 +++
 .../beam/sdk/io/AvroIOGeneratedClassTest.java   |  9 +++++++
 .../java/org/apache/beam/sdk/io/AvroIOTest.java |  7 ++++-
 .../org/apache/beam/sdk/io/BigQueryIOTest.java  | 11 +++++---
 .../beam/sdk/io/CompressedSourceTest.java       | 15 +++++++++++
 .../apache/beam/sdk/io/CountingInputTest.java   |  2 ++
 .../apache/beam/sdk/io/CountingSourceTest.java  |  3 +++
 .../apache/beam/sdk/io/FileBasedSourceTest.java |  5 +++-
 .../beam/sdk/io/PubsubUnboundedSinkTest.java    |  5 ++++
 .../java/org/apache/beam/sdk/io/TextIOTest.java | 15 +++++++++++
 .../java/org/apache/beam/sdk/io/WriteTest.java  |  6 +++++
 .../org/apache/beam/sdk/io/XmlSourceTest.java   | 10 ++++---
 .../beam/sdk/runners/TransformTreeTest.java     |  3 +++
 .../transforms/ApproximateQuantilesTest.java    |  7 ++++-
 .../sdk/transforms/ApproximateUniqueTest.java   |  6 +++++
 .../apache/beam/sdk/transforms/CombineTest.java |  4 +--
 .../apache/beam/sdk/transforms/DoFnTest.java    |  7 ++++-
 .../sdk/transforms/DoFnWithContextTest.java     |  7 ++++-
 .../apache/beam/sdk/transforms/FilterTest.java  |  4 +++
 .../sdk/transforms/FlatMapElementsTest.java     |  5 ++++
 .../apache/beam/sdk/transforms/FlattenTest.java |  4 +++
 .../beam/sdk/transforms/GroupByKeyTest.java     |  5 +++-
 .../IntraBundleParallelizationTest.java         |  6 ++++-
 .../beam/sdk/transforms/MapElementsTest.java    |  7 ++++-
 .../apache/beam/sdk/transforms/ParDoTest.java   | 21 +++++++++++++--
 .../beam/sdk/transforms/PartitionTest.java      |  5 +++-
 .../apache/beam/sdk/transforms/SampleTest.java  |  3 ++-
 .../org/apache/beam/sdk/transforms/TopTest.java |  5 ++++
 .../apache/beam/sdk/transforms/ViewTest.java    |  4 +++
 .../beam/sdk/transforms/WithKeysTest.java       |  5 ++++
 .../beam/sdk/transforms/WithTimestampsTest.java |  3 +++
 .../sdk/transforms/join/CoGroupByKeyTest.java   |  2 ++
 .../sdk/transforms/windowing/WindowingTest.java |  3 +++
 .../org/apache/beam/sdk/values/PDoneTest.java   |  2 ++
 39 files changed, 225 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/928a302d/sdks/java/core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml
index aa7edb5..644d8e1 100644
--- a/sdks/java/core/pom.xml
+++ b/sdks/java/core/pom.xml
@@ -129,7 +129,7 @@
         <artifactId>maven-surefire-plugin</artifactId>
         <configuration>
           <excludedGroups>
-            org.apache.beam.sdk.testing.RunnableOnService
+            org.apache.beam.sdk.testing.NeedsRunner
           </excludedGroups>
         </configuration>
       </plugin>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/928a302d/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/NeedsRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/NeedsRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/NeedsRunner.java
new file mode 100644
index 0000000..5f22bee
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/NeedsRunner.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.testing;
+
+import org.apache.beam.sdk.runners.PipelineRunner;
+
+/**
+ * Category tag for validation tests which utilize {@link TestPipeline} for execution and expect
+ * to be executed by a {@link PipelineRunner}.
+ */
+public interface NeedsRunner {
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/928a302d/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/RunnableOnService.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/RunnableOnService.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/RunnableOnService.java
index 4c794dd..5741433 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/RunnableOnService.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/RunnableOnService.java
@@ -31,4 +31,4 @@ package org.apache.beam.sdk.testing;
  *     }
  * </code></pre>
  */
-public interface RunnableOnService {}
+public interface RunnableOnService extends NeedsRunner {}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/928a302d/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
index 9d38c1c..ea708e5 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
@@ -31,6 +31,7 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.runners.DirectPipelineRunner;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.testing.ExpectedLogs;
+import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -291,6 +292,7 @@ public class PipelineTest {
    * Tests that an empty pipeline runs.
    */
   @Test
+  @Category(NeedsRunner.class)
   public void testEmptyPipeline() throws Exception {
     Pipeline pipeline = TestPipeline.create();
     pipeline.run();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/928a302d/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
index 4da8772..8f28cc4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
@@ -27,6 +27,7 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder.Context;
 import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
 import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
@@ -55,6 +56,7 @@ import org.hamcrest.Matchers;
 import org.hamcrest.TypeSafeMatcher;
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
@@ -211,6 +213,7 @@ public class AvroCoderTest {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testDefaultCoder() throws Exception {
     Pipeline p = TestPipeline.create();
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/928a302d/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
index df440f4..37f0e10 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertEquals;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.CoderRegistry.IncompatibleCoderException;
 import org.apache.beam.sdk.coders.protobuf.ProtoCoder;
+import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -41,6 +42,7 @@ import com.google.protobuf.Duration;
 
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -400,6 +402,7 @@ public class CoderRegistryTest {
    * {@link #testDefaultCoderAnnotationGeneric} is invoked in the right ways.
    */
   @Test
+  @Category(NeedsRunner.class)
   public void testSpecializedButIgnoredGenericInPipeline() throws Exception {
     Pipeline pipeline = TestPipeline.create();
 
@@ -428,6 +431,7 @@ public class CoderRegistryTest {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testIgnoredGenericInPipeline() throws Exception {
     Pipeline pipeline = TestPipeline.create();
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/928a302d/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOGeneratedClassTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOGeneratedClassTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOGeneratedClassTest.java
index f757b4e..da886de 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOGeneratedClassTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOGeneratedClassTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertThat;
 
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
@@ -40,6 +41,7 @@ import org.apache.avro.specific.SpecificDatumWriter;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -53,6 +55,7 @@ import java.util.List;
 /**
  * Tests for AvroIO Read and Write transforms, using classes generated from {@code user.avsc}.
  */
+// TODO: Stop requiring local files
 @RunWith(JUnit4.class)
 public class AvroIOGeneratedClassTest {
   @Rule
@@ -145,6 +148,7 @@ public class AvroIOGeneratedClassTest {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testReadFromGeneratedClass() throws Exception {
     runTestRead(
         AvroIO.Read.from(avroFile.getPath()).withSchema(AvroGeneratedUser.class),
@@ -181,6 +185,7 @@ public class AvroIOGeneratedClassTest {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testReadFromSchema() throws Exception {
     runTestRead(
         AvroIO.Read.from(avroFile.getPath()).withSchema(schema),
@@ -217,6 +222,7 @@ public class AvroIOGeneratedClassTest {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testReadFromSchemaString() throws Exception {
     runTestRead(
         AvroIO.Read.from(avroFile.getPath()).withSchema(schemaString),
@@ -268,6 +274,7 @@ public class AvroIOGeneratedClassTest {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testWriteFromGeneratedClass() throws Exception {
     runTestWrite(AvroIO.Write.to(avroFile.getPath())
                              .withSchema(AvroGeneratedUser.class),
@@ -302,6 +309,7 @@ public class AvroIOGeneratedClassTest {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testWriteFromSchema() throws Exception {
     runTestWrite(AvroIO.Write.to(avroFile.getPath())
                              .withSchema(schema),
@@ -336,6 +344,7 @@ public class AvroIOGeneratedClassTest {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testWriteFromSchemaString() throws Exception {
     runTestWrite(AvroIO.Write.to(avroFile.getPath())
                              .withSchema(schemaString),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/928a302d/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 43b1219..13c1bcf 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -18,7 +18,6 @@
 package org.apache.beam.sdk.io;
 
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
-
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -28,6 +27,7 @@ import static org.junit.Assert.assertTrue;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.DefaultCoder;
 import org.apache.beam.sdk.io.AvroIO.Write.Bound;
+import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
@@ -44,6 +44,7 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.reflect.Nullable;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -117,6 +118,7 @@ public class AvroIOTest {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testAvroIOWriteAndReadASingleFile() throws Throwable {
     TestPipeline p = TestPipeline.create();
     List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"),
@@ -180,6 +182,7 @@ public class AvroIOTest {
    * <p>For more information, see http://avro.apache.org/docs/1.7.7/spec.html#Schema+Resolution
    */
   @Test
+  @Category(NeedsRunner.class)
   public void testAvroIOWriteAndReadSchemaUpgrade() throws Throwable {
     TestPipeline p = TestPipeline.create();
     List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"),
@@ -245,6 +248,7 @@ public class AvroIOTest {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testAvroSinkWrite() throws Exception {
     String[] expectedElements = new String[] {"first", "second", "third"};
 
@@ -252,6 +256,7 @@ public class AvroIOTest {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testAvroSinkShardedWrite() throws Exception {
     String[] expectedElements = new String[] {"first", "second", "third", "fourth", "fifth"};
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/928a302d/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
index 2d1b550..b30f860 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
@@ -44,6 +44,7 @@ import org.apache.beam.sdk.options.BigQueryOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.ExpectedLogs;
+import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.SourceTestUtils;
@@ -416,10 +417,10 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testValidateReadSetsDefaultProject() {
-    BigQueryOptions options = PipelineOptionsFactory.as(BigQueryOptions.class);
+    BigQueryOptions options = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
     options.setProject("someproject");
 
-    Pipeline p = Pipeline.create(options);
+    Pipeline p = TestPipeline.create(options);
 
     TableReference tableRef = new TableReference();
     tableRef.setDatasetId("somedataset");
@@ -475,6 +476,7 @@ public class BigQueryIOTest implements Serializable {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testReadFromTable() {
     FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
         .withJobService(new FakeJobService()
@@ -504,6 +506,7 @@ public class BigQueryIOTest implements Serializable {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testCustomSink() throws Exception {
     FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
         .withJobService(new FakeJobService()
@@ -537,6 +540,7 @@ public class BigQueryIOTest implements Serializable {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testCustomSinkUnknown() throws Exception {
     FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
         .withJobService(new FakeJobService()
@@ -720,7 +724,7 @@ public class BigQueryIOTest implements Serializable {
   }
 
   private void testWriteValidatesDataset(boolean streaming) {
-    BigQueryOptions options = PipelineOptionsFactory.as(BigQueryOptions.class);
+    BigQueryOptions options = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
     options.setProject("someproject");
     options.setStreaming(streaming);
 
@@ -1078,6 +1082,7 @@ public class BigQueryIOTest implements Serializable {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testPassThroughThenCleanupExecuted() throws Exception {
     Pipeline p = TestPipeline.create();
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/928a302d/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
index 7161c1d..9b44386 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
@@ -37,6 +37,7 @@ import org.apache.beam.sdk.io.CompressedSource.DecompressingChannelFactory;
 import org.apache.beam.sdk.io.FileBasedSource.FileBasedReader;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.SourceTestUtils;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -51,6 +52,7 @@ import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
 import org.hamcrest.Matchers;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.internal.matchers.ThrowableMessageMatcher;
 import org.junit.rules.ExpectedException;
 import org.junit.rules.TemporaryFolder;
@@ -88,6 +90,7 @@ public class CompressedSourceTest {
    * Test reading nonempty input with gzip.
    */
   @Test
+  @Category(NeedsRunner.class)
   public void testReadGzip() throws Exception {
     byte[] input = generateInput(5000);
     runReadTest(input, CompressionMode.GZIP);
@@ -97,6 +100,7 @@ public class CompressedSourceTest {
    * Test reading nonempty input with bzip2.
    */
   @Test
+  @Category(NeedsRunner.class)
   public void testReadBzip2() throws Exception {
     byte[] input = generateInput(5000);
     runReadTest(input, CompressionMode.BZIP2);
@@ -106,6 +110,7 @@ public class CompressedSourceTest {
    * Test reading empty input with gzip.
    */
   @Test
+  @Category(NeedsRunner.class)
   public void testEmptyReadGzip() throws Exception {
     byte[] input = generateInput(0);
     runReadTest(input, CompressionMode.GZIP);
@@ -133,6 +138,7 @@ public class CompressedSourceTest {
    * to be the concatenation of those individual files.
    */
   @Test
+  @Category(NeedsRunner.class)
   public void testReadConcatenatedGzip() throws IOException {
     byte[] header = "a,b,c\n".getBytes(StandardCharsets.UTF_8);
     byte[] body = "1,2,3\n4,5,6\n7,8,9\n".getBytes(StandardCharsets.UTF_8);
@@ -158,6 +164,7 @@ public class CompressedSourceTest {
    * Test reading empty input with bzip2.
    */
   @Test
+  @Category(NeedsRunner.class)
   public void testCompressedReadBzip2() throws Exception {
     byte[] input = generateInput(0);
     runReadTest(input, CompressionMode.BZIP2);
@@ -167,6 +174,7 @@ public class CompressedSourceTest {
    * Test reading according to filepattern when the file is bzipped.
    */
   @Test
+  @Category(NeedsRunner.class)
   public void testCompressedAccordingToFilepatternGzip() throws Exception {
     byte[] input = generateInput(100);
     File tmpFile = tmpFolder.newFile("test.gz");
@@ -178,6 +186,7 @@ public class CompressedSourceTest {
    * Test reading according to filepattern when the file is gzipped.
    */
   @Test
+  @Category(NeedsRunner.class)
   public void testCompressedAccordingToFilepatternBzip2() throws Exception {
     byte[] input = generateInput(100);
     File tmpFile = tmpFolder.newFile("test.bz2");
@@ -189,6 +198,7 @@ public class CompressedSourceTest {
    * Test reading multiple files with different compression.
    */
   @Test
+  @Category(NeedsRunner.class)
   public void testHeterogeneousCompression() throws Exception {
     String baseName = "test-input";
 
@@ -266,6 +276,7 @@ public class CompressedSourceTest {
    * this due to properties of services that we read from.
    */
   @Test
+  @Category(NeedsRunner.class)
   public void testFalseGzipStream() throws Exception {
     byte[] input = generateInput(1000);
     File tmpFile = tmpFolder.newFile("test.gz");
@@ -278,6 +289,7 @@ public class CompressedSourceTest {
    * we fail.
    */
   @Test
+  @Category(NeedsRunner.class)
   public void testFalseBzip2Stream() throws Exception {
     byte[] input = generateInput(1000);
     File tmpFile = tmpFolder.newFile("test.bz2");
@@ -294,6 +306,7 @@ public class CompressedSourceTest {
    * the gzip header is two bytes.
    */
   @Test
+  @Category(NeedsRunner.class)
   public void testEmptyReadGzipUncompressed() throws Exception {
     byte[] input = generateInput(0);
     File tmpFile = tmpFolder.newFile("test.gz");
@@ -306,6 +319,7 @@ public class CompressedSourceTest {
    * the gzip header is two bytes.
    */
   @Test
+  @Category(NeedsRunner.class)
   public void testOneByteReadGzipUncompressed() throws Exception {
     byte[] input = generateInput(1);
     File tmpFile = tmpFolder.newFile("test.gz");
@@ -317,6 +331,7 @@ public class CompressedSourceTest {
    * Test reading multiple files.
    */
   @Test
+  @Category(NeedsRunner.class)
   public void testCompressedReadMultipleFiles() throws Exception {
     int numFiles = 10;
     String baseName = "test_input-";

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/928a302d/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
index 8c87c26..c5f7478 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertThat;
 
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.CountingInput.UnboundedCountingInput;
+import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -97,6 +98,7 @@ public class CountingInputTest {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testUnboundedInputRate() {
     Pipeline p = TestPipeline.create();
     long numElements = 5000;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/928a302d/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
index bf68d41..321f066 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
@@ -29,6 +29,7 @@ import org.apache.beam.sdk.io.CountingSource.CounterMark;
 import org.apache.beam.sdk.io.CountingSource.UnboundedCountingSource;
 import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -186,6 +187,7 @@ public class CountingSourceTest {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testUnboundedSourceWithRate() {
     Pipeline p = TestPipeline.create();
 
@@ -245,6 +247,7 @@ public class CountingSourceTest {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testUnboundedSourceRateSplits() throws Exception {
     Pipeline p = TestPipeline.create();
     int elementsPerPeriod = 10;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/928a302d/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
index 1f16d39..b0c577d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
@@ -21,7 +21,6 @@ import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionE
 import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionFails;
 import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionSucceedsAndConsistent;
 import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
-
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
@@ -36,6 +35,7 @@ import org.apache.beam.sdk.io.FileBasedSource.FileBasedReader;
 import org.apache.beam.sdk.io.Source.Reader;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.util.CoderUtils;
@@ -47,6 +47,7 @@ import com.google.common.collect.ImmutableList;
 
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -707,6 +708,7 @@ public class FileBasedSourceTest {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testDataflowFile() throws IOException {
     Pipeline p = TestPipeline.create();
     List<String> data = createStringDataset(3, 50);
@@ -722,6 +724,7 @@ public class FileBasedSourceTest {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testDataflowFilePattern() throws IOException {
     Pipeline p = TestPipeline.create();
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/928a302d/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
index bf70e47..9c75972 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
@@ -21,6 +21,7 @@ package org.apache.beam.sdk.io;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.PubsubUnboundedSink.RecordIdMethod;
 import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -37,6 +38,7 @@ import com.google.common.hash.Hashing;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
@@ -75,6 +77,7 @@ public class PubsubUnboundedSinkTest {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void sendOneMessage() throws IOException {
     List<OutgoingMessage> outgoing =
         ImmutableList.of(new OutgoingMessage(DATA.getBytes(), TIMESTAMP, getRecordId(DATA)));
@@ -98,6 +101,7 @@ public class PubsubUnboundedSinkTest {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void sendMoreThanOneBatchByNumMessages() throws IOException {
     List<OutgoingMessage> outgoing = new ArrayList<>();
     List<String> data = new ArrayList<>();
@@ -126,6 +130,7 @@ public class PubsubUnboundedSinkTest {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void sendMoreThanOneBatchByByteSize() throws IOException {
     List<OutgoingMessage> outgoing = new ArrayList<>();
     List<String> data = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/928a302d/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 53a2a89..73aeda9 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -38,6 +38,7 @@ import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
 import org.apache.beam.sdk.io.TextIO.CompressionType;
 import org.apache.beam.sdk.io.TextIO.TextSource;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.SourceTestUtils;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -53,6 +54,7 @@ import com.google.common.collect.ImmutableList;
 
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.rules.ExpectedException;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
@@ -74,6 +76,7 @@ import java.util.zip.GZIPOutputStream;
 /**
  * Tests for TextIO Read and Write transforms.
  */
+// TODO: Change the tests to use RunnableOnService instead of NeedsRunner
 @RunWith(JUnit4.class)
 @SuppressWarnings("unchecked")
 public class TextIOTest {
@@ -110,26 +113,31 @@ public class TextIOTest {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testReadStrings() throws Exception {
     runTestRead(LINES_ARRAY, StringUtf8Coder.of());
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testReadEmptyStrings() throws Exception {
     runTestRead(NO_LINES_ARRAY, StringUtf8Coder.of());
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testReadInts() throws Exception {
     runTestRead(INTS_ARRAY, TextualIntegerCoder.of());
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testReadEmptyInts() throws Exception {
     runTestRead(NO_INTS_ARRAY, TextualIntegerCoder.of());
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testReadNulls() throws Exception {
     runTestRead(new Void[]{ null, null, null }, VoidCoder.of());
   }
@@ -246,21 +254,25 @@ public class TextIOTest {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testWriteStrings() throws Exception {
     runTestWrite(LINES_ARRAY, StringUtf8Coder.of());
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testWriteEmptyStrings() throws Exception {
     runTestWrite(NO_LINES_ARRAY, StringUtf8Coder.of());
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testWriteInts() throws Exception {
     runTestWrite(INTS_ARRAY, TextualIntegerCoder.of());
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testWriteEmptyInts() throws Exception {
     runTestWrite(NO_INTS_ARRAY, TextualIntegerCoder.of());
   }
@@ -287,6 +299,7 @@ public class TextIOTest {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testShardedWrite() throws Exception {
     runTestWrite(LINES_ARRAY, StringUtf8Coder.of(), 5);
   }
@@ -364,6 +377,7 @@ public class TextIOTest {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testCompressedRead() throws Exception {
     String[] lines = {"Irritable eagle", "Optimistic jay", "Fanciful hawk"};
     File tmpFile = tmpFolder.newFile();
@@ -389,6 +403,7 @@ public class TextIOTest {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testGZIPReadWhenUncompressed() throws Exception {
     String[] lines = {"Meritorious condor", "Obnoxious duck"};
     File tmpFile = tmpFolder.newFile();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/928a302d/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
index 66cad60..45a4374 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
@@ -37,6 +37,7 @@ import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.PipelineOptionsFactoryTest.TestPipelineOptions;
+import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
@@ -55,6 +56,7 @@ import com.google.common.base.MoreObjects;
 
 import org.joda.time.Duration;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
@@ -120,6 +122,7 @@ public class WriteTest {
    * Test a Write transform with a PCollection of elements.
    */
   @Test
+  @Category(NeedsRunner.class)
   public void testWrite() {
     List<String> inputs = Arrays.asList("Critical canary", "Apprehensive eagle",
         "Intimidating pigeon", "Pedantic gull", "Frisky finch");
@@ -130,6 +133,7 @@ public class WriteTest {
    * Test a Write transform with an empty PCollection.
    */
   @Test
+  @Category(NeedsRunner.class)
   public void testWriteWithEmptyPCollection() {
     List<String> inputs = new ArrayList<>();
     runWrite(inputs, IDENTITY_MAP);
@@ -139,6 +143,7 @@ public class WriteTest {
    * Test a Write with a windowed PCollection.
    */
   @Test
+  @Category(NeedsRunner.class)
   public void testWriteWindowed() {
     List<String> inputs = Arrays.asList("Critical canary", "Apprehensive eagle",
         "Intimidating pigeon", "Pedantic gull", "Frisky finch");
@@ -150,6 +155,7 @@ public class WriteTest {
    * Test a Write with sessions.
    */
   @Test
+  @Category(NeedsRunner.class)
   public void testWriteWithSessions() {
     List<String> inputs = Arrays.asList("Critical canary", "Apprehensive eagle",
         "Intimidating pigeon", "Pedantic gull", "Frisky finch");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/928a302d/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java
index f5bad18..eb65468 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.io;
 import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionExhaustive;
 import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionFails;
 import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionSucceedsAndConsistent;
-
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.hamcrest.Matchers.both;
 import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -33,6 +32,7 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.Source.Reader;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -44,6 +44,7 @@ import org.hamcrest.Matchers;
 import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.rules.ExpectedException;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
@@ -567,7 +568,8 @@ public class XmlSourceTest {
   }
 
   @Test
-  public void testReadXMLSmallDataflow() throws IOException {
+  @Category(NeedsRunner.class)
+  public void testReadXMLSmallPipeline() throws IOException {
     Pipeline p = TestPipeline.create();
 
     File file = tempFolder.newFile("trainXMLSmall");
@@ -657,7 +659,8 @@ public class XmlSourceTest {
   }
 
   @Test
-  public void testReadXMLLargeDataflow() throws IOException {
+  @Category(NeedsRunner.class)
+  public void testReadXMLLargePipeline() throws IOException {
     String fileName = "temp.xml";
     List<Train> trains = generateRandomTrainList(100);
     File file = createRandomTrainXML(fileName, trains);
@@ -798,6 +801,7 @@ public class XmlSourceTest {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testReadXMLFilePattern() throws IOException {
     List<Train> trains1 = generateRandomTrainList(20);
     File file = createRandomTrainXML("temp1.xml", trains1);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/928a302d/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
index aecebd7..0c992c4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
@@ -30,6 +30,7 @@ import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.io.Write;
+import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
@@ -43,6 +44,7 @@ import org.apache.beam.sdk.values.PDone;
 
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -180,6 +182,7 @@ public class TransformTreeTest {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testMultiGraphSetup() {
     Pipeline p = TestPipeline.create();
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/928a302d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java
index cc81748..8c2451b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java
@@ -18,7 +18,6 @@
 package org.apache.beam.sdk.transforms;
 
 import static org.apache.beam.sdk.TestUtils.checkCombineFn;
-
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
@@ -27,6 +26,7 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.ApproximateQuantiles.ApproximateQuantilesCombineFn;
@@ -39,6 +39,7 @@ import org.hamcrest.Description;
 import org.hamcrest.Matcher;
 import org.hamcrest.TypeSafeDiagnosingMatcher;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
@@ -70,6 +71,7 @@ public class ApproximateQuantilesTest {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testQuantilesGlobally() {
     TestPipeline p = TestPipeline.create();
 
@@ -83,6 +85,7 @@ public class ApproximateQuantilesTest {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testQuantilesGobally_comparable() {
     TestPipeline p = TestPipeline.create();
 
@@ -97,6 +100,7 @@ public class ApproximateQuantilesTest {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testQuantilesPerKey() {
     Pipeline p = TestPipeline.create();
 
@@ -113,6 +117,7 @@ public class ApproximateQuantilesTest {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testQuantilesPerKey_reversed() {
     Pipeline p = TestPipeline.create();
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/928a302d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
index 4f00ed4..1a42947 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.fail;
 
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.TestUtils;
+import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -83,6 +84,7 @@ public class ApproximateUniqueTest implements Serializable {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testApproximateUniqueWithDuplicates() {
     runApproximateUniqueWithDuplicates(100, 100, 100);
     runApproximateUniqueWithDuplicates(1000, 1000, 100);
@@ -111,6 +113,7 @@ public class ApproximateUniqueTest implements Serializable {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testApproximateUniqueWithSkewedDistributions() {
     runApproximateUniqueWithSkewedDistributions(100, 100, 100);
     runApproximateUniqueWithSkewedDistributions(10000, 10000, 100);
@@ -119,6 +122,7 @@ public class ApproximateUniqueTest implements Serializable {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testApproximateUniqueWithSkewedDistributionsAndLargeSampleSize() {
     runApproximateUniqueWithSkewedDistributions(10000, 2000, 1000);
   }
@@ -148,6 +152,7 @@ public class ApproximateUniqueTest implements Serializable {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testApproximateUniquePerKey() {
     List<KV<Long, Long>> elements = Lists.newArrayList();
     List<Long> keys = ImmutableList.of(20L, 50L, 100L);
@@ -177,6 +182,7 @@ public class ApproximateUniqueTest implements Serializable {
    * {@code 2 / sqrt(sampleSize)}.
    */
   @Test
+  @Category(NeedsRunner.class)
   public void testApproximateUniqueWithDifferentSampleSizes() {
     runApproximateUniquePipeline(16);
     runApproximateUniquePipeline(64);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/928a302d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index 21768a8..a0b508c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -21,9 +21,7 @@ import static org.apache.beam.sdk.TestUtils.checkCombineFn;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasNamespace;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFrom;
-
 import static com.google.common.base.Preconditions.checkNotNull;
-
 import static org.hamcrest.Matchers.hasItem;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
@@ -42,6 +40,7 @@ import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.coders.StandardCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.runners.DirectPipelineRunner;
+import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -606,6 +605,7 @@ public class CombineTest implements Serializable {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testBinaryCombineFn() {
     Pipeline pipeline = TestPipeline.create();
     PCollection<KV<String, Integer>> input = copy(createInput(pipeline, TABLE), 2);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/928a302d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java
index b7906b7..dbcc1fe 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java
@@ -17,13 +17,14 @@
  */
 package org.apache.beam.sdk.transforms;
 
-import static org.hamcrest.CoreMatchers.isA;
 import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.isA;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertThat;
 
 import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
+import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.Max.MaxIntegerFn;
@@ -31,6 +32,7 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
 
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -127,6 +129,7 @@ public class DoFnTest implements Serializable {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testCreateAggregatorInStartBundleThrows() {
     TestPipeline p = createTestPipeline(new DoFn<String, String>() {
       @Override
@@ -145,6 +148,7 @@ public class DoFnTest implements Serializable {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testCreateAggregatorInProcessElementThrows() {
     TestPipeline p = createTestPipeline(new DoFn<String, String>() {
       @Override
@@ -160,6 +164,7 @@ public class DoFnTest implements Serializable {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testCreateAggregatorInFinishBundleThrows() {
     TestPipeline p = createTestPipeline(new DoFn<String, String>() {
       @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/928a302d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnWithContextTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnWithContextTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnWithContextTest.java
index 391081a..3b314b2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnWithContextTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnWithContextTest.java
@@ -17,8 +17,8 @@
  */
 package org.apache.beam.sdk.transforms;
 
-import static org.hamcrest.CoreMatchers.isA;
 import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.isA;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertThat;
@@ -28,6 +28,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
+import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.Max.MaxIntegerFn;
@@ -35,6 +36,7 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
 
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -169,6 +171,7 @@ public class DoFnWithContextTest implements Serializable {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testCreateAggregatorInStartBundleThrows() {
     TestPipeline p = createTestPipeline(new DoFnWithContext<String, String>() {
       @StartBundle
@@ -187,6 +190,7 @@ public class DoFnWithContextTest implements Serializable {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testCreateAggregatorInProcessElementThrows() {
     TestPipeline p = createTestPipeline(new DoFnWithContext<String, String>() {
       @ProcessElement
@@ -202,6 +206,7 @@ public class DoFnWithContextTest implements Serializable {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testCreateAggregatorInFinishBundleThrows() {
     TestPipeline p = createTestPipeline(new DoFnWithContext<String, String>() {
       @FinishBundle

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/928a302d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java
index f58ba17..367bbc0 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java
@@ -21,6 +21,7 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisp
 
 import static org.hamcrest.MatcherAssert.assertThat;
 
+import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -76,6 +77,7 @@ public class FilterTest implements Serializable {
 
   @Deprecated
   @Test
+  @Category(NeedsRunner.class)
   public void testNoFilter() {
     TestPipeline p = TestPipeline.create();
 
@@ -115,6 +117,7 @@ public class FilterTest implements Serializable {
   }
 
   @Test
+  @Category(RunnableOnService.class)
   public void testNoFilterByPredicate() {
     TestPipeline p = TestPipeline.create();
 
@@ -153,6 +156,7 @@ public class FilterTest implements Serializable {
   }
 
   @Test
+  @Category(RunnableOnService.class)
   public void testFilterGreaterThan() {
     TestPipeline p = TestPipeline.create();
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/928a302d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java
index aee2427..057fd19 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java
@@ -21,6 +21,7 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertThat;
 
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.KV;
@@ -32,6 +33,7 @@ import com.google.common.collect.ImmutableSet;
 
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -54,6 +56,7 @@ public class FlatMapElementsTest implements Serializable {
    * Basic test of {@link FlatMapElements} with a {@link SimpleFunction}.
    */
   @Test
+  @Category(NeedsRunner.class)
   public void testFlatMapBasic() throws Exception {
     Pipeline pipeline = TestPipeline.create();
     PCollection<Integer> output = pipeline
@@ -78,6 +81,7 @@ public class FlatMapElementsTest implements Serializable {
    * of the output reflects its static type.
    */
   @Test
+  @Category(NeedsRunner.class)
   public void testFlatMapFnOutputTypeDescriptor() throws Exception {
     Pipeline pipeline = TestPipeline.create();
     PCollection<String> output = pipeline
@@ -99,6 +103,7 @@ public class FlatMapElementsTest implements Serializable {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testVoidValues() throws Exception {
     Pipeline pipeline = TestPipeline.create();
     pipeline

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/928a302d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
index 042b864..80825cb 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
@@ -31,6 +31,7 @@ import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.SetCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -158,6 +159,7 @@ public class FlattenTest implements Serializable {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testFlattenNoListsNoCoder() {
     // not RunnableOnService because it should fail at pipeline construction time anyhow.
     thrown.expect(IllegalStateException.class);
@@ -263,6 +265,7 @@ public class FlattenTest implements Serializable {
   /////////////////////////////////////////////////////////////////////////////
 
   @Test
+  @Category(NeedsRunner.class)
   public void testEqualWindowFnPropagation() {
     Pipeline p = TestPipeline.create();
 
@@ -284,6 +287,7 @@ public class FlattenTest implements Serializable {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testCompatibleWindowFnPropagation() {
     Pipeline p = TestPipeline.create();
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/928a302d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
index b84845a..4ce025d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
@@ -19,7 +19,6 @@ package org.apache.beam.sdk.transforms;
 
 import static org.apache.beam.sdk.TestUtils.KvMatcher.isKv;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
-
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
@@ -34,6 +33,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.options.DirectPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.runners.DirectPipelineRunner;
+import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -198,6 +198,7 @@ public class GroupByKeyTest {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testIdentityWindowFnPropagation() {
     Pipeline p = TestPipeline.create();
 
@@ -218,6 +219,7 @@ public class GroupByKeyTest {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testWindowFnInvalidation() {
     Pipeline p = TestPipeline.create();
 
@@ -268,6 +270,7 @@ public class GroupByKeyTest {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testRemerge() {
     Pipeline p = TestPipeline.create();
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/928a302d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/IntraBundleParallelizationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/IntraBundleParallelizationTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/IntraBundleParallelizationTest.java
index 66b189a..3355aeb 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/IntraBundleParallelizationTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/IntraBundleParallelizationTest.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.transforms;
 import static org.apache.beam.sdk.testing.SystemNanoTimeSleeper.sleepMillis;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFrom;
-
 import static org.hamcrest.Matchers.both;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@@ -32,11 +31,13 @@ import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
@@ -157,6 +158,7 @@ public class IntraBundleParallelizationTest {
    * greater than that amount.
    */
   @Test
+  @Category(NeedsRunner.class)
   public void testParallelization() {
     int maxConcurrency = Integer.MIN_VALUE;
     // Take the minimum from multiple runs.
@@ -174,6 +176,7 @@ public class IntraBundleParallelizationTest {
   }
 
   @Test(timeout = 5000L)
+  @Category(NeedsRunner.class)
   public void testExceptionHandling() {
     ExceptionThrowingFn<Integer> fn = new ExceptionThrowingFn<>(10);
     try {
@@ -196,6 +199,7 @@ public class IntraBundleParallelizationTest {
   }
 
   @Test(timeout = 5000L)
+  @Category(NeedsRunner.class)
   public void testExceptionHandlingOnLastElement() {
     ExceptionThrowingFn<Integer> fn = new ExceptionThrowingFn<>(9);
     try {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/928a302d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
index 1e2c826..e6694d2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
@@ -18,11 +18,11 @@
 package org.apache.beam.sdk.transforms;
 
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
-
 import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertThat;
 
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -32,6 +32,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
 
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -51,6 +52,7 @@ public class MapElementsTest implements Serializable {
    * Basic test of {@link MapElements} with a {@link SimpleFunction}.
    */
   @Test
+  @Category(NeedsRunner.class)
   public void testMapBasic() throws Exception {
     Pipeline pipeline = TestPipeline.create();
     PCollection<Integer> output = pipeline
@@ -71,6 +73,7 @@ public class MapElementsTest implements Serializable {
    * generally discouraged in Java 7, in favor of {@link SimpleFunction}.
    */
   @Test
+  @Category(NeedsRunner.class)
   public void testMapBasicSerializableFunction() throws Exception {
     Pipeline pipeline = TestPipeline.create();
     PCollection<Integer> output = pipeline
@@ -91,6 +94,7 @@ public class MapElementsTest implements Serializable {
    * of the output reflects its static type.
    */
   @Test
+  @Category(NeedsRunner.class)
   public void testSimpleFunctionOutputTypeDescriptor() throws Exception {
     Pipeline pipeline = TestPipeline.create();
     PCollection<String> output = pipeline
@@ -111,6 +115,7 @@ public class MapElementsTest implements Serializable {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testVoidValues() throws Exception {
     Pipeline pipeline = TestPipeline.create();
     pipeline

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/928a302d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 9193267..c0c8051 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -42,6 +42,7 @@ import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.runners.DirectPipelineRunner;
+import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -544,6 +545,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testParDoWritingToUndeclaredSideOutput() {
     Pipeline pipeline = TestPipeline.create();
 
@@ -564,6 +566,8 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
+  // TODO: The exception thrown is runner-specific, even if the behavior is general
+  @Category(NeedsRunner.class)
   public void testParDoUndeclaredSideOutputLimit() {
     Pipeline pipeline = TestPipeline.create();
     PCollection<Integer> input = pipeline.apply(Create.of(Arrays.asList(3)));
@@ -742,6 +746,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testParDoReadingFromUnknownSideInput() {
     Pipeline pipeline = TestPipeline.create();
 
@@ -762,6 +767,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testParDoWithErrorInStartBatch() {
     Pipeline pipeline = TestPipeline.create();
 
@@ -776,6 +782,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testParDoWithErrorInProcessElement() {
     Pipeline pipeline = TestPipeline.create();
 
@@ -790,6 +797,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testParDoWithErrorInFinishBatch() {
     Pipeline pipeline = TestPipeline.create();
 
@@ -909,6 +917,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testMultiOutputChaining() {
     Pipeline pipeline = TestPipeline.create();
 
@@ -1109,6 +1118,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testSideOutputUnknownCoder() throws Exception {
     Pipeline pipeline = TestPipeline.create();
     PCollection<Integer> input = pipeline
@@ -1119,7 +1129,7 @@ public class ParDoTest implements Serializable {
     input.apply(ParDo.of(new SideOutputDummyFn(sideOutputTag))
         .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)));
 
-    thrown.expect(PipelineExecutionException.class);
+    thrown.expect(IllegalStateException.class);
     thrown.expectMessage("Unable to return a default Coder");
     pipeline.run();
   }
@@ -1143,10 +1153,10 @@ public class ParDoTest implements Serializable {
     outputTuple.get(sideOutputTag).finishSpecifyingOutput(); // Check for crashes
     assertEquals(new TestDummyCoder(),
         outputTuple.get(sideOutputTag).getCoder()); // Check for corruption
-    pipeline.run();
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testMainOutputUnregisteredExplicitCoder() {
     Pipeline pipeline = TestPipeline.create();
     PCollection<Integer> input = pipeline
@@ -1163,6 +1173,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testMainOutputApplySideOutputNoCoder() {
     // Regression test: applying a transform to the main output
     // should not cause a crash based on lack of a coder for the
@@ -1203,6 +1214,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testParDoOutputWithTimestamp() {
     Pipeline pipeline = TestPipeline.create();
 
@@ -1224,6 +1236,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testParDoSideOutputWithTimestamp() {
     Pipeline pipeline = TestPipeline.create();
 
@@ -1255,6 +1268,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testParDoShiftTimestamp() {
     Pipeline pipeline = TestPipeline.create();
 
@@ -1277,6 +1291,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testParDoShiftTimestampInvalid() {
     Pipeline pipeline = TestPipeline.create();
 
@@ -1295,6 +1310,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testParDoShiftTimestampInvalidZeroAllowed() {
     Pipeline pipeline = TestPipeline.create();
 
@@ -1381,6 +1397,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testWindowingInStartBundleException() {
     Pipeline pipeline = TestPipeline.create();
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/928a302d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PartitionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PartitionTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PartitionTest.java
index 608da0f..243b52b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PartitionTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PartitionTest.java
@@ -18,12 +18,12 @@
 package org.apache.beam.sdk.transforms;
 
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
-
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -81,6 +81,7 @@ public class PartitionTest implements Serializable {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testModPartition() {
     Pipeline pipeline = TestPipeline.create();
 
@@ -95,6 +96,7 @@ public class PartitionTest implements Serializable {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testOutOfBoundsPartitions() {
     Pipeline pipeline = TestPipeline.create();
 
@@ -120,6 +122,7 @@ public class PartitionTest implements Serializable {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testDroppedPartition() {
     Pipeline pipeline = TestPipeline.create();
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/928a302d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
index 4b1d5dc..4cc0c9b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
@@ -19,7 +19,6 @@ package org.apache.beam.sdk.transforms;
 
 import static org.apache.beam.sdk.TestUtils.LINES;
 import static org.apache.beam.sdk.TestUtils.NO_LINES;
-
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
@@ -28,6 +27,7 @@ import static org.junit.Assert.assertTrue;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -246,6 +246,7 @@ public class SampleTest {
 
   @Test
   // Extra tests, not worth the time to run on the real service.
+  @Category(NeedsRunner.class)
   public void testPickAnyMore() {
     runPickAnyTest(LINES, LINES.size() - 1);
     runPickAnyTest(LINES, LINES.size());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/928a302d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java
index 6d580e7..a96d19b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java
@@ -25,6 +25,7 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -38,6 +39,7 @@ import org.hamcrest.Matchers;
 import org.joda.time.Duration;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -90,6 +92,7 @@ public class TopTest {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   @SuppressWarnings("unchecked")
   public void testTop() {
     Pipeline p = TestPipeline.create();
@@ -121,6 +124,7 @@ public class TopTest {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   @SuppressWarnings("unchecked")
   public void testTopEmpty() {
     Pipeline p = TestPipeline.create();
@@ -165,6 +169,7 @@ public class TopTest {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   @SuppressWarnings("unchecked")
   public void testTopZero() {
     Pipeline p = TestPipeline.create();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/928a302d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
index 3d4dfe0..18d39d7 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
@@ -36,6 +36,7 @@ import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.options.DirectPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.runners.DirectPipelineRunner;
+import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -143,6 +144,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testEmptySingletonSideInput() throws Exception {
     Pipeline pipeline = TestPipeline.create();
 
@@ -168,6 +170,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testNonSingletonSideInput() throws Exception {
     Pipeline pipeline = TestPipeline.create();
 
@@ -1045,6 +1048,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testMapSideInputWithNullValuesCatchesDuplicates() {
     Pipeline pipeline = TestPipeline.create();
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/928a302d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithKeysTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithKeysTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithKeysTest.java
index 6873624..c23dd36 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithKeysTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithKeysTest.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
 
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.KV;
@@ -28,6 +29,7 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TypeDescriptor;
 
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
@@ -64,6 +66,7 @@ public class WithKeysTest {
   );
 
   @Test
+  @Category(NeedsRunner.class)
   public void testExtractKeys() {
     Pipeline p = TestPipeline.create();
 
@@ -80,6 +83,7 @@ public class WithKeysTest {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testConstantKeys() {
     Pipeline p = TestPipeline.create();
 
@@ -101,6 +105,7 @@ public class WithKeysTest {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testWithKeysWithUnneededWithKeyTypeSucceeds() {
     TestPipeline p = TestPipeline.create();
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/928a302d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java
index aee6a99..ac67bb4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.transforms;
 import static org.hamcrest.Matchers.isA;
 
 import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
+import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -85,6 +86,7 @@ public class WithTimestampsTest implements Serializable {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void withTimestampsBackwardsInTimeShouldThrow() {
     TestPipeline p = TestPipeline.create();
 
@@ -171,6 +173,7 @@ public class WithTimestampsTest implements Serializable {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void withTimestampsWithNullTimestampShouldThrow() {
     SerializableFunction<String, Instant> timestampFn =
         new SerializableFunction<String, Instant>() {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/928a302d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
index f4ce2ca..f8e7f08 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
@@ -122,6 +122,7 @@ public class CoGroupByKeyTest implements Serializable {
   }
 
   @Test
+  @Category(RunnableOnService.class)
   public void testCoGroupByKeyGetOnly() {
     final TupleTag<String> tag1 = new TupleTag<>();
     final TupleTag<String> tag2 = new TupleTag<>();
@@ -260,6 +261,7 @@ public class CoGroupByKeyTest implements Serializable {
   }
 
   @Test
+  @Category(RunnableOnService.class)
   public void testCoGroupByKey() {
     final TupleTag<String> namesTag = new TupleTag<>();
     final TupleTag<String> addressesTag = new TupleTag<>();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/928a302d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
index 65adac1..5377f23 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.transforms.windowing;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -190,6 +191,7 @@ public class WindowingTest implements Serializable {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testEmptyInput() {
     Pipeline p = TestPipeline.create();
     PCollection<String> input =
@@ -206,6 +208,7 @@ public class WindowingTest implements Serializable {
   }
 
   @Test
+  @Category(NeedsRunner.class)
   public void testTextIoInput() throws Exception {
     File tmpFile = tmpFolder.newFile("file.txt");
     String filename = tmpFile.getPath();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/928a302d/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java
index 24896c0..5554b31 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java
@@ -21,6 +21,7 @@ import static org.apache.beam.sdk.TestUtils.LINES;
 
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
@@ -90,6 +91,7 @@ public class PDoneTest {
   // instead of a local temp file.  Or switch to applying a different
   // transform that returns PDone.
   @Test
+  @Category(NeedsRunner.class)
   public void testSimpleTransform() throws Exception {
     File tmpFile = tmpFolder.newFile("file.txt");
     String filename = tmpFile.getPath();


[2/2] incubator-beam git commit: This closes #361

Posted by ke...@apache.org.
This closes #361


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bde2a856
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bde2a856
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bde2a856

Branch: refs/heads/master
Commit: bde2a856f1652f6ebab7aae2d5e78eeb4683a15a
Parents: 9f97ea0 928a302
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed May 25 14:16:54 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed May 25 14:16:54 2016 -0700

----------------------------------------------------------------------
 sdks/java/core/pom.xml                          |  2 +-
 .../apache/beam/sdk/testing/NeedsRunner.java    | 28 ++++++++++++++++++++
 .../beam/sdk/testing/RunnableOnService.java     |  2 +-
 .../java/org/apache/beam/sdk/PipelineTest.java  |  2 ++
 .../apache/beam/sdk/coders/AvroCoderTest.java   |  3 +++
 .../beam/sdk/coders/CoderRegistryTest.java      |  4 +++
 .../beam/sdk/io/AvroIOGeneratedClassTest.java   |  9 +++++++
 .../java/org/apache/beam/sdk/io/AvroIOTest.java |  7 ++++-
 .../org/apache/beam/sdk/io/BigQueryIOTest.java  | 11 +++++---
 .../beam/sdk/io/CompressedSourceTest.java       | 15 +++++++++++
 .../apache/beam/sdk/io/CountingInputTest.java   |  2 ++
 .../apache/beam/sdk/io/CountingSourceTest.java  |  3 +++
 .../apache/beam/sdk/io/FileBasedSourceTest.java |  5 +++-
 .../beam/sdk/io/PubsubUnboundedSinkTest.java    |  5 ++++
 .../java/org/apache/beam/sdk/io/TextIOTest.java | 15 +++++++++++
 .../java/org/apache/beam/sdk/io/WriteTest.java  |  6 +++++
 .../org/apache/beam/sdk/io/XmlSourceTest.java   | 10 ++++---
 .../beam/sdk/runners/TransformTreeTest.java     |  3 +++
 .../transforms/ApproximateQuantilesTest.java    |  7 ++++-
 .../sdk/transforms/ApproximateUniqueTest.java   |  6 +++++
 .../apache/beam/sdk/transforms/CombineTest.java |  4 +--
 .../apache/beam/sdk/transforms/DoFnTest.java    |  7 ++++-
 .../sdk/transforms/DoFnWithContextTest.java     |  7 ++++-
 .../apache/beam/sdk/transforms/FilterTest.java  |  4 +++
 .../sdk/transforms/FlatMapElementsTest.java     |  5 ++++
 .../apache/beam/sdk/transforms/FlattenTest.java |  4 +++
 .../beam/sdk/transforms/GroupByKeyTest.java     |  5 +++-
 .../IntraBundleParallelizationTest.java         |  6 ++++-
 .../beam/sdk/transforms/MapElementsTest.java    |  7 ++++-
 .../apache/beam/sdk/transforms/ParDoTest.java   | 21 +++++++++++++--
 .../beam/sdk/transforms/PartitionTest.java      |  5 +++-
 .../apache/beam/sdk/transforms/SampleTest.java  |  3 ++-
 .../org/apache/beam/sdk/transforms/TopTest.java |  5 ++++
 .../apache/beam/sdk/transforms/ViewTest.java    |  4 +++
 .../beam/sdk/transforms/WithKeysTest.java       |  5 ++++
 .../beam/sdk/transforms/WithTimestampsTest.java |  3 +++
 .../sdk/transforms/join/CoGroupByKeyTest.java   |  2 ++
 .../sdk/transforms/windowing/WindowingTest.java |  3 +++
 .../org/apache/beam/sdk/values/PDoneTest.java   |  2 ++
 39 files changed, 225 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bde2a856/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
----------------------------------------------------------------------