You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/04/27 03:08:38 UTC

[06/21] incubator-beam git commit: Reorganize Java packages in the sources of the Google Cloud Dataflow runner

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
new file mode 100644
index 0000000..f0e677e
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.transforms;
+
+import org.apache.beam.runners.dataflow.DataflowPipelineRunner;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.NoopPathValidator;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+import org.joda.time.Duration;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** Tests for {@link GroupByKey} for the {@link DataflowPipelineRunner}. */
+@RunWith(JUnit4.class)
+public class DataflowGroupByKeyTest {
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  /**
+   * Create a test pipeline that uses the {@link DataflowPipelineRunner} so that {@link GroupByKey}
+   * is not expanded. This is used for verifying that even without expansion the proper errors show
+   * up.
+   */
+  private Pipeline createTestServiceRunner() {
+    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+    options.setRunner(DataflowPipelineRunner.class);
+    options.setProject("someproject");
+    options.setStagingLocation("gs://staging");
+    options.setPathValidatorClass(NoopPathValidator.class);
+    options.setDataflowClient(null);
+    return Pipeline.create(options);
+  }
+
+  @Test
+  public void testInvalidWindowsService() {
+    Pipeline p = createTestServiceRunner();
+
+    List<KV<String, Integer>> ungroupedPairs = Arrays.asList();
+
+    PCollection<KV<String, Integer>> input =
+        p.apply(Create.of(ungroupedPairs)
+            .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())))
+        .apply(Window.<KV<String, Integer>>into(
+            Sessions.withGapDuration(Duration.standardMinutes(1))));
+
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage("GroupByKey must have a valid Window merge function");
+    input
+        .apply("GroupByKey", GroupByKey.<String, Integer>create())
+        .apply("GroupByKeyAgain", GroupByKey.<String, Iterable<Integer>>create());
+  }
+
+  @Test
+  public void testGroupByKeyServiceUnbounded() {
+    Pipeline p = createTestServiceRunner();
+
+    PCollection<KV<String, Integer>> input =
+        p.apply(
+            new PTransform<PBegin, PCollection<KV<String, Integer>>>() {
+              @Override
+              public PCollection<KV<String, Integer>> apply(PBegin input) {
+                return PCollection.<KV<String, Integer>>createPrimitiveOutputInternal(
+                        input.getPipeline(),
+                        WindowingStrategy.globalDefault(),
+                        PCollection.IsBounded.UNBOUNDED)
+                    .setTypeDescriptorInternal(new TypeDescriptor<KV<String, Integer>>() {});
+              }
+            });
+
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage(
+        "GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without "
+        + "a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey.");
+
+    input.apply("GroupByKey", GroupByKey.<String, Integer>create());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
new file mode 100644
index 0000000..d787500
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.transforms;
+
+import org.apache.beam.runners.dataflow.DataflowPipelineRunner;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.InvalidWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.NoopPathValidator;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+import org.hamcrest.Matchers;
+import org.joda.time.Duration;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.internal.matchers.ThrowableMessageMatcher;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link View} for a {@link DataflowPipelineRunner}. */
+@RunWith(JUnit4.class)
+public class DataflowViewTest {
+  @Rule
+  public transient ExpectedException thrown = ExpectedException.none();
+
+  private Pipeline createTestBatchRunner() {
+    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+    options.setRunner(DataflowPipelineRunner.class);
+    options.setProject("someproject");
+    options.setStagingLocation("gs://staging");
+    options.setPathValidatorClass(NoopPathValidator.class);
+    options.setDataflowClient(null);
+    return Pipeline.create(options);
+  }
+
+  private Pipeline createTestStreamingRunner() {
+    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+    options.setRunner(DataflowPipelineRunner.class);
+    options.setStreaming(true);
+    options.setProject("someproject");
+    options.setStagingLocation("gs://staging");
+    options.setPathValidatorClass(NoopPathValidator.class);
+    options.setDataflowClient(null);
+    return Pipeline.create(options);
+  }
+
+  private void testViewUnbounded(
+      Pipeline pipeline,
+      PTransform<PCollection<KV<String, Integer>>, ? extends PCollectionView<?>> view) {
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage("Unable to create a side-input view from input");
+    thrown.expectCause(
+        ThrowableMessageMatcher.hasMessage(Matchers.containsString("non-bounded PCollection")));
+    pipeline
+        .apply(
+            new PTransform<PBegin, PCollection<KV<String, Integer>>>() {
+              @Override
+              public PCollection<KV<String, Integer>> apply(PBegin input) {
+                return PCollection.<KV<String, Integer>>createPrimitiveOutputInternal(
+                        input.getPipeline(),
+                        WindowingStrategy.globalDefault(),
+                        PCollection.IsBounded.UNBOUNDED)
+                    .setTypeDescriptorInternal(new TypeDescriptor<KV<String, Integer>>() {});
+              }
+            })
+        .apply(view);
+  }
+
+  private void testViewNonmerging(
+      Pipeline pipeline,
+      PTransform<PCollection<KV<String, Integer>>, ? extends PCollectionView<?>> view) {
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage("Unable to create a side-input view from input");
+    thrown.expectCause(
+        ThrowableMessageMatcher.hasMessage(Matchers.containsString("Consumed by GroupByKey")));
+    pipeline.apply(Create.<KV<String, Integer>>of(KV.of("hello", 5)))
+        .apply(Window.<KV<String, Integer>>into(new InvalidWindows<>(
+            "Consumed by GroupByKey", FixedWindows.of(Duration.standardHours(1)))))
+        .apply(view);
+  }
+
+  @Test
+  public void testViewUnboundedAsSingletonBatch() {
+    testViewUnbounded(createTestBatchRunner(), View.<KV<String, Integer>>asSingleton());
+  }
+
+  @Test
+  public void testViewUnboundedAsSingletonStreaming() {
+    testViewUnbounded(createTestStreamingRunner(), View.<KV<String, Integer>>asSingleton());
+  }
+
+  @Test
+  public void testViewUnboundedAsIterableBatch() {
+    testViewUnbounded(createTestBatchRunner(), View.<KV<String, Integer>>asIterable());
+  }
+
+  @Test
+  public void testViewUnboundedAsIterableStreaming() {
+    testViewUnbounded(createTestStreamingRunner(), View.<KV<String, Integer>>asIterable());
+  }
+
+  @Test
+  public void testViewUnboundedAsListBatch() {
+    testViewUnbounded(createTestBatchRunner(), View.<KV<String, Integer>>asList());
+  }
+
+  @Test
+  public void testViewUnboundedAsListStreaming() {
+    testViewUnbounded(createTestStreamingRunner(), View.<KV<String, Integer>>asList());
+  }
+
+  @Test
+  public void testViewUnboundedAsMapBatch() {
+    testViewUnbounded(createTestBatchRunner(), View.<String, Integer>asMap());
+  }
+
+  @Test
+  public void testViewUnboundedAsMapStreaming() {
+    testViewUnbounded(createTestStreamingRunner(), View.<String, Integer>asMap());
+  }
+
+  @Test
+  public void testViewUnboundedAsMultimapBatch() {
+    testViewUnbounded(createTestBatchRunner(), View.<String, Integer>asMultimap());
+  }
+
+  @Test
+  public void testViewUnboundedAsMultimapStreaming() {
+    testViewUnbounded(createTestStreamingRunner(), View.<String, Integer>asMultimap());
+  }
+
+  @Test
+  public void testViewNonmergingAsSingletonBatch() {
+    testViewNonmerging(createTestBatchRunner(), View.<KV<String, Integer>>asSingleton());
+  }
+
+  @Test
+  public void testViewNonmergingAsSingletonStreaming() {
+    testViewNonmerging(createTestStreamingRunner(), View.<KV<String, Integer>>asSingleton());
+  }
+
+  @Test
+  public void testViewNonmergingAsIterableBatch() {
+    testViewNonmerging(createTestBatchRunner(), View.<KV<String, Integer>>asIterable());
+  }
+
+  @Test
+  public void testViewNonmergingAsIterableStreaming() {
+    testViewNonmerging(createTestStreamingRunner(), View.<KV<String, Integer>>asIterable());
+  }
+
+  @Test
+  public void testViewNonmergingAsListBatch() {
+    testViewNonmerging(createTestBatchRunner(), View.<KV<String, Integer>>asList());
+  }
+
+  @Test
+  public void testViewNonmergingAsListStreaming() {
+    testViewNonmerging(createTestStreamingRunner(), View.<KV<String, Integer>>asList());
+  }
+
+  @Test
+  public void testViewNonmergingAsMapBatch() {
+    testViewNonmerging(createTestBatchRunner(), View.<String, Integer>asMap());
+  }
+
+  @Test
+  public void testViewNonmergingAsMapStreaming() {
+    testViewNonmerging(createTestStreamingRunner(), View.<String, Integer>asMap());
+  }
+
+  @Test
+  public void testViewNonmergingAsMultimapBatch() {
+    testViewNonmerging(createTestBatchRunner(), View.<String, Integer>asMultimap());
+  }
+
+  @Test
+  public void testViewNonmergingAsMultimapStreaming() {
+    testViewNonmerging(createTestStreamingRunner(), View.<String, Integer>asMultimap());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/DataflowPathValidatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/DataflowPathValidatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/DataflowPathValidatorTest.java
new file mode 100644
index 0000000..5587986
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/DataflowPathValidatorTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.util;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.runners.dataflow.DataflowPipelineRunner;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.util.GcsUtil;
+import org.apache.beam.sdk.util.TestCredential;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Tests for {@link DataflowPathValidator}. */
+@RunWith(JUnit4.class)
+public class DataflowPathValidatorTest {
+  @Rule public ExpectedException expectedException = ExpectedException.none();
+
+  @Mock private GcsUtil mockGcsUtil;
+  private DataflowPathValidator validator;
+
+  @Before
+  public void setUp() throws Exception {
+    MockitoAnnotations.initMocks(this);
+    when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(true);
+    when(mockGcsUtil.isGcsPatternSupported(anyString())).thenCallRealMethod();
+    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+    options.setGcpCredential(new TestCredential());
+    options.setRunner(DataflowPipelineRunner.class);
+    options.setGcsUtil(mockGcsUtil);
+    validator = new DataflowPathValidator(options);
+  }
+
+  @Test
+  public void testValidFilePattern() {
+    validator.validateInputFilePatternSupported("gs://bucket/path");
+  }
+
+  @Test
+  public void testInvalidFilePattern() {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage(
+        "DataflowPipelineRunner expected a valid 'gs://' path but was given '/local/path'");
+    validator.validateInputFilePatternSupported("/local/path");
+  }
+
+  @Test
+  public void testWhenBucketDoesNotExist() throws Exception {
+    when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(false);
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage(
+        "Could not find file gs://non-existent-bucket/location");
+    validator.validateInputFilePatternSupported("gs://non-existent-bucket/location");
+  }
+
+  @Test
+  public void testValidOutputPrefix() {
+    validator.validateOutputFilePrefixSupported("gs://bucket/path");
+  }
+
+  @Test
+  public void testInvalidOutputPrefix() {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage(
+        "DataflowPipelineRunner expected a valid 'gs://' path but was given '/local/path'");
+    validator.validateOutputFilePrefixSupported("/local/path");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java
new file mode 100644
index 0000000..ee1532d
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.sdk.PipelineResult.State;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.util.TestCredential;
+import org.apache.beam.sdk.util.TimeUtil;
+
+import com.google.api.services.dataflow.Dataflow;
+import com.google.api.services.dataflow.model.JobMessage;
+import com.google.api.services.dataflow.model.ListJobMessagesResponse;
+
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests for MonitoringUtil.
+ */
+@RunWith(JUnit4.class)
+public class MonitoringUtilTest {
+  private static final String PROJECT_ID = "someProject";
+  private static final String JOB_ID = "1234";
+
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testGetJobMessages() throws IOException {
+    Dataflow.Projects.Jobs.Messages mockMessages = mock(Dataflow.Projects.Jobs.Messages.class);
+
+    // Two requests are needed to get all the messages.
+    Dataflow.Projects.Jobs.Messages.List firstRequest =
+        mock(Dataflow.Projects.Jobs.Messages.List.class);
+    Dataflow.Projects.Jobs.Messages.List secondRequest =
+        mock(Dataflow.Projects.Jobs.Messages.List.class);
+
+    when(mockMessages.list(PROJECT_ID, JOB_ID)).thenReturn(firstRequest).thenReturn(secondRequest);
+
+    ListJobMessagesResponse firstResponse = new ListJobMessagesResponse();
+    firstResponse.setJobMessages(new ArrayList<JobMessage>());
+    for (int i = 0; i < 100; ++i) {
+      JobMessage message = new JobMessage();
+      message.setId("message_" + i);
+      message.setTime(TimeUtil.toCloudTime(new Instant(i)));
+      firstResponse.getJobMessages().add(message);
+    }
+    String pageToken = "page_token";
+    firstResponse.setNextPageToken(pageToken);
+
+    ListJobMessagesResponse secondResponse = new ListJobMessagesResponse();
+    secondResponse.setJobMessages(new ArrayList<JobMessage>());
+    for (int i = 100; i < 150; ++i) {
+      JobMessage message = new JobMessage();
+      message.setId("message_" + i);
+      message.setTime(TimeUtil.toCloudTime(new Instant(i)));
+      secondResponse.getJobMessages().add(message);
+    }
+
+    when(firstRequest.execute()).thenReturn(firstResponse);
+    when(secondRequest.execute()).thenReturn(secondResponse);
+
+    MonitoringUtil util = new MonitoringUtil(PROJECT_ID, mockMessages);
+
+    List<JobMessage> messages = util.getJobMessages(JOB_ID, -1);
+
+    verify(secondRequest).setPageToken(pageToken);
+
+    assertEquals(150, messages.size());
+  }
+
+  @Test
+  public void testToStateCreatesState() {
+    String stateName = "JOB_STATE_DONE";
+
+    State result = MonitoringUtil.toState(stateName);
+
+    assertEquals(State.DONE, result);
+  }
+
+  @Test
+  public void testToStateWithNullReturnsUnknown() {
+    String stateName = null;
+
+    State result = MonitoringUtil.toState(stateName);
+
+    assertEquals(State.UNKNOWN, result);
+  }
+
+  @Test
+  public void testToStateWithOtherValueReturnsUnknown() {
+    String stateName = "FOO_BAR_BAZ";
+
+    State result = MonitoringUtil.toState(stateName);
+
+    assertEquals(State.UNKNOWN, result);
+  }
+
+  @Test
+  public void testDontOverrideEndpointWithDefaultApi() {
+    DataflowPipelineOptions options =
+        PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
+    options.setProject(PROJECT_ID);
+    options.setGcpCredential(new TestCredential());
+    String cancelCommand = MonitoringUtil.getGcloudCancelCommand(options, JOB_ID);
+    assertEquals("gcloud alpha dataflow jobs --project=someProject cancel 1234", cancelCommand);
+  }
+
+  @Test
+  public void testOverridesEndpointWithStagedDataflowEndpoint() {
+    DataflowPipelineOptions options =
+        PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
+    options.setProject(PROJECT_ID);
+    options.setGcpCredential(new TestCredential());
+    String stagingDataflowEndpoint = "v0neverExisted";
+    options.setDataflowEndpoint(stagingDataflowEndpoint);
+    String cancelCommand = MonitoringUtil.getGcloudCancelCommand(options, JOB_ID);
+    assertEquals(
+        "CLOUDSDK_API_ENDPOINT_OVERRIDES_DATAFLOW=https://dataflow.googleapis.com/v0neverExisted/ "
+        + "gcloud alpha dataflow jobs --project=someProject cancel 1234",
+        cancelCommand);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
new file mode 100644
index 0000000..41ad05d
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.util;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.runners.dataflow.util.PackageUtil.PackageAttributes;
+import org.apache.beam.sdk.options.GcsOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.ExpectedLogs;
+import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
+import org.apache.beam.sdk.util.GcsUtil;
+import org.apache.beam.sdk.util.IOChannelUtils;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+
+import com.google.api.client.googleapis.json.GoogleJsonError.ErrorInfo;
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.client.http.HttpRequest;
+import com.google.api.client.http.HttpResponse;
+import com.google.api.client.http.HttpStatusCodes;
+import com.google.api.client.http.HttpTransport;
+import com.google.api.client.http.LowLevelHttpRequest;
+import com.google.api.client.json.GenericJson;
+import com.google.api.client.json.Json;
+import com.google.api.client.json.JsonFactory;
+import com.google.api.client.json.jackson2.JacksonFactory;
+import com.google.api.client.testing.http.HttpTesting;
+import com.google.api.client.testing.http.MockHttpTransport;
+import com.google.api.client.testing.http.MockLowLevelHttpRequest;
+import com.google.api.client.testing.http.MockLowLevelHttpResponse;
+import com.google.api.services.dataflow.model.DataflowPackage;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+import com.google.common.io.LineReader;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matchers;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.channels.Channels;
+import java.nio.channels.Pipe;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+/** Tests for PackageUtil. */
+@RunWith(JUnit4.class)
+public class PackageUtilTest {
+  @Rule public ExpectedLogs logged = ExpectedLogs.none(PackageUtil.class);
+  @Rule
+  public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+  @Rule
+  public FastNanoClockAndSleeper fastNanoClockAndSleeper = new FastNanoClockAndSleeper();
+
+  @Mock
+  GcsUtil mockGcsUtil;
+
+  // 128 bits, base64 encoded is 171 bits, rounds to 22 bytes
+  private static final String HASH_PATTERN = "[a-zA-Z0-9+-]{22}";
+
+  // Hamcrest matcher to assert a string matches a pattern
+  private static class RegexMatcher extends BaseMatcher<String> {
+    private final Pattern pattern;
+
+    public RegexMatcher(String regex) {
+      this.pattern = Pattern.compile(regex);
+    }
+
+    @Override
+    public boolean matches(Object o) {
+      if (!(o instanceof String)) {
+        return false;
+      }
+      return pattern.matcher((String) o).matches();
+    }
+
+    @Override
+    public void describeTo(Description description) {
+      description.appendText(String.format("matches regular expression %s", pattern));
+    }
+
+    public static RegexMatcher matches(String regex) {
+      return new RegexMatcher(regex);
+    }
+  }
+
+  @Before
+  public void setUp() {
+    MockitoAnnotations.initMocks(this);
+
+    GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class);
+    pipelineOptions.setGcsUtil(mockGcsUtil);
+
+    IOChannelUtils.registerStandardIOFactories(pipelineOptions);
+  }
+
+  private File makeFileWithContents(String name, String contents) throws Exception {
+    File tmpFile = tmpFolder.newFile(name);
+    Files.write(contents, tmpFile, StandardCharsets.UTF_8);
+    tmpFile.setLastModified(0);  // required for determinism with directories
+    return tmpFile;
+  }
+
+  static final String STAGING_PATH = GcsPath.fromComponents("somebucket", "base/path").toString();
+  private static PackageAttributes makePackageAttributes(File file, String overridePackageName) {
+    return PackageUtil.createPackageAttributes(file, STAGING_PATH, overridePackageName);
+  }
+
+  @Test
+  public void testFileWithExtensionPackageNamingAndSize() throws Exception {
+    String contents = "This is a test!";
+    File tmpFile = makeFileWithContents("file.txt", contents);
+    PackageAttributes attr = makePackageAttributes(tmpFile, null);
+    DataflowPackage target = attr.getDataflowPackage();
+
+    assertThat(target.getName(), RegexMatcher.matches("file-" + HASH_PATTERN + ".txt"));
+    assertThat(target.getLocation(), equalTo(STAGING_PATH + '/' + target.getName()));
+    assertThat(attr.getSize(), equalTo((long) contents.length()));
+  }
+
+  @Test
+  public void testPackageNamingWithFileNoExtension() throws Exception {
+    File tmpFile = makeFileWithContents("file", "This is a test!");
+    DataflowPackage target = makePackageAttributes(tmpFile, null).getDataflowPackage();
+
+    assertThat(target.getName(), RegexMatcher.matches("file-" + HASH_PATTERN));
+    assertThat(target.getLocation(), equalTo(STAGING_PATH + '/' + target.getName()));
+  }
+
+  @Test
+  public void testPackageNamingWithDirectory() throws Exception {
+    File tmpDirectory = tmpFolder.newFolder("folder");
+    DataflowPackage target = makePackageAttributes(tmpDirectory, null).getDataflowPackage();
+
+    assertThat(target.getName(), RegexMatcher.matches("folder-" + HASH_PATTERN + ".jar"));
+    assertThat(target.getLocation(), equalTo(STAGING_PATH + '/' + target.getName()));
+  }
+
+  @Test
+  public void testPackageNamingWithFilesHavingSameContentsAndSameNames() throws Exception {
+    File tmpDirectory1 = tmpFolder.newFolder("folder1", "folderA");
+    makeFileWithContents("folder1/folderA/sameName", "This is a test!");
+    DataflowPackage target1 = makePackageAttributes(tmpDirectory1, null).getDataflowPackage();
+
+    File tmpDirectory2 = tmpFolder.newFolder("folder2", "folderA");
+    makeFileWithContents("folder2/folderA/sameName", "This is a test!");
+    DataflowPackage target2 = makePackageAttributes(tmpDirectory2, null).getDataflowPackage();
+
+    assertEquals(target1.getName(), target2.getName());
+    assertEquals(target1.getLocation(), target2.getLocation());
+  }
+
+  @Test
+  public void testPackageNamingWithFilesHavingSameContentsButDifferentNames() throws Exception {
+    File tmpDirectory1 = tmpFolder.newFolder("folder1", "folderA");
+    makeFileWithContents("folder1/folderA/uniqueName1", "This is a test!");
+    DataflowPackage target1 = makePackageAttributes(tmpDirectory1, null).getDataflowPackage();
+
+    File tmpDirectory2 = tmpFolder.newFolder("folder2", "folderA");
+    makeFileWithContents("folder2/folderA/uniqueName2", "This is a test!");
+    DataflowPackage target2 = makePackageAttributes(tmpDirectory2, null).getDataflowPackage();
+
+    assertNotEquals(target1.getName(), target2.getName());
+    assertNotEquals(target1.getLocation(), target2.getLocation());
+  }
+
+  @Test
+  public void testPackageNamingWithDirectoriesHavingSameContentsButDifferentNames()
+      throws Exception {
+    File tmpDirectory1 = tmpFolder.newFolder("folder1", "folderA");
+    tmpFolder.newFolder("folder1", "folderA", "uniqueName1");
+    DataflowPackage target1 = makePackageAttributes(tmpDirectory1, null).getDataflowPackage();
+
+    File tmpDirectory2 = tmpFolder.newFolder("folder2", "folderA");
+    tmpFolder.newFolder("folder2", "folderA", "uniqueName2");
+    DataflowPackage target2 = makePackageAttributes(tmpDirectory2, null).getDataflowPackage();
+
+    assertNotEquals(target1.getName(), target2.getName());
+    assertNotEquals(target1.getLocation(), target2.getLocation());
+  }
+
+  @Test
+  public void testPackageUploadWithLargeClasspathLogsWarning() throws Exception {
+    File tmpFile = makeFileWithContents("file.txt", "This is a test!");
+    // all files will be present and cached so no upload needed.
+    when(mockGcsUtil.fileSize(any(GcsPath.class))).thenReturn(tmpFile.length());
+
+    List<String> classpathElements = Lists.newLinkedList();
+    for (int i = 0; i < 1005; ++i) {
+      String eltName = "element" + i;
+      classpathElements.add(eltName + '=' + tmpFile.getAbsolutePath());
+    }
+
+    PackageUtil.stageClasspathElements(classpathElements, STAGING_PATH);
+
+    logged.verifyWarn("Your classpath contains 1005 elements, which Google Cloud Dataflow");
+  }
+
+  @Test
+  public void testPackageUploadWithFileSucceeds() throws Exception {
+    Pipe pipe = Pipe.open();
+    String contents = "This is a test!";
+    File tmpFile = makeFileWithContents("file.txt", contents);
+    when(mockGcsUtil.fileSize(any(GcsPath.class)))
+        .thenThrow(new FileNotFoundException("some/path"));
+    when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
+
+    List<DataflowPackage> targets = PackageUtil.stageClasspathElements(
+        ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH);
+    DataflowPackage target = Iterables.getOnlyElement(targets);
+
+    verify(mockGcsUtil).fileSize(any(GcsPath.class));
+    verify(mockGcsUtil).create(any(GcsPath.class), anyString());
+    verifyNoMoreInteractions(mockGcsUtil);
+
+    assertThat(target.getName(), RegexMatcher.matches("file-" + HASH_PATTERN + ".txt"));
+    assertThat(target.getLocation(), equalTo(STAGING_PATH + '/' + target.getName()));
+    assertThat(new LineReader(Channels.newReader(pipe.source(), "UTF-8")).readLine(),
+        equalTo(contents));
+  }
+
+  @Test
+  public void testPackageUploadWithDirectorySucceeds() throws Exception {
+    Pipe pipe = Pipe.open();
+    File tmpDirectory = tmpFolder.newFolder("folder");
+    tmpFolder.newFolder("folder", "empty_directory");
+    tmpFolder.newFolder("folder", "directory");
+    makeFileWithContents("folder/file.txt", "This is a test!");
+    makeFileWithContents("folder/directory/file.txt", "This is also a test!");
+
+    when(mockGcsUtil.fileSize(any(GcsPath.class)))
+        .thenThrow(new FileNotFoundException("some/path"));
+    when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
+
+    PackageUtil.stageClasspathElements(
+        ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH);
+
+    verify(mockGcsUtil).fileSize(any(GcsPath.class));
+    verify(mockGcsUtil).create(any(GcsPath.class), anyString());
+    verifyNoMoreInteractions(mockGcsUtil);
+
+    ZipInputStream inputStream = new ZipInputStream(Channels.newInputStream(pipe.source()));
+    List<String> zipEntryNames = new ArrayList<>();
+    for (ZipEntry entry = inputStream.getNextEntry(); entry != null;
+        entry = inputStream.getNextEntry()) {
+      zipEntryNames.add(entry.getName());
+    }
+
+    assertThat(zipEntryNames,
+        containsInAnyOrder("directory/file.txt", "empty_directory/", "file.txt"));
+  }
+
+  @Test
+  public void testPackageUploadWithEmptyDirectorySucceeds() throws Exception {
+    Pipe pipe = Pipe.open();
+    File tmpDirectory = tmpFolder.newFolder("folder");
+
+    when(mockGcsUtil.fileSize(any(GcsPath.class)))
+        .thenThrow(new FileNotFoundException("some/path"));
+    when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
+
+    List<DataflowPackage> targets = PackageUtil.stageClasspathElements(
+        ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH);
+    DataflowPackage target = Iterables.getOnlyElement(targets);
+
+    verify(mockGcsUtil).fileSize(any(GcsPath.class));
+    verify(mockGcsUtil).create(any(GcsPath.class), anyString());
+    verifyNoMoreInteractions(mockGcsUtil);
+
+    assertThat(target.getName(), RegexMatcher.matches("folder-" + HASH_PATTERN + ".jar"));
+    assertThat(target.getLocation(), equalTo(STAGING_PATH + '/' + target.getName()));
+    assertNull(new ZipInputStream(Channels.newInputStream(pipe.source())).getNextEntry());
+  }
+
+  @Test(expected = RuntimeException.class)
+  public void testPackageUploadFailsWhenIOExceptionThrown() throws Exception {
+    File tmpFile = makeFileWithContents("file.txt", "This is a test!");
+    when(mockGcsUtil.fileSize(any(GcsPath.class)))
+        .thenThrow(new FileNotFoundException("some/path"));
+    when(mockGcsUtil.create(any(GcsPath.class), anyString()))
+        .thenThrow(new IOException("Fake Exception: Upload error"));
+
+    try {
+      PackageUtil.stageClasspathElements(
+          ImmutableList.of(tmpFile.getAbsolutePath()),
+          STAGING_PATH, fastNanoClockAndSleeper);
+    } finally {
+      verify(mockGcsUtil).fileSize(any(GcsPath.class));
+      verify(mockGcsUtil, times(5)).create(any(GcsPath.class), anyString());
+      verifyNoMoreInteractions(mockGcsUtil);
+    }
+  }
+
+  @Test
+  public void testPackageUploadFailsWithPermissionsErrorGivesDetailedMessage() throws Exception {
+    File tmpFile = makeFileWithContents("file.txt", "This is a test!");
+    when(mockGcsUtil.fileSize(any(GcsPath.class)))
+        .thenThrow(new FileNotFoundException("some/path"));
+    when(mockGcsUtil.create(any(GcsPath.class), anyString()))
+        .thenThrow(new IOException("Failed to write to GCS path " + STAGING_PATH,
+            googleJsonResponseException(
+                HttpStatusCodes.STATUS_CODE_FORBIDDEN, "Permission denied", "Test message")));
+
+    try {
+      PackageUtil.stageClasspathElements(
+          ImmutableList.of(tmpFile.getAbsolutePath()),
+          STAGING_PATH, fastNanoClockAndSleeper);
+      fail("Expected RuntimeException");
+    } catch (RuntimeException e) {
+      assertTrue("Expected IOException containing detailed message.",
+          e.getCause() instanceof IOException);
+      assertThat(e.getCause().getMessage(),
+          Matchers.allOf(
+              Matchers.containsString("Uploaded failed due to permissions error"),
+              Matchers.containsString(
+                  "Stale credentials can be resolved by executing 'gcloud auth login'")));
+    } finally {
+      verify(mockGcsUtil).fileSize(any(GcsPath.class));
+      verify(mockGcsUtil).create(any(GcsPath.class), anyString());
+      verifyNoMoreInteractions(mockGcsUtil);
+    }
+  }
+
+  @Test
+  public void testPackageUploadEventuallySucceeds() throws Exception {
+    Pipe pipe = Pipe.open();
+    File tmpFile = makeFileWithContents("file.txt", "This is a test!");
+    when(mockGcsUtil.fileSize(any(GcsPath.class)))
+        .thenThrow(new FileNotFoundException("some/path"));
+    when(mockGcsUtil.create(any(GcsPath.class), anyString()))
+        .thenThrow(new IOException("Fake Exception: 410 Gone")) // First attempt fails
+        .thenReturn(pipe.sink());                               // second attempt succeeds
+
+    try {
+      PackageUtil.stageClasspathElements(
+                                              ImmutableList.of(tmpFile.getAbsolutePath()),
+                                              STAGING_PATH,
+                                              fastNanoClockAndSleeper);
+    } finally {
+      verify(mockGcsUtil).fileSize(any(GcsPath.class));
+      verify(mockGcsUtil, times(2)).create(any(GcsPath.class), anyString());
+      verifyNoMoreInteractions(mockGcsUtil);
+    }
+  }
+
+  @Test
+  public void testPackageUploadIsSkippedWhenFileAlreadyExists() throws Exception {
+    File tmpFile = makeFileWithContents("file.txt", "This is a test!");
+    when(mockGcsUtil.fileSize(any(GcsPath.class))).thenReturn(tmpFile.length());
+
+    PackageUtil.stageClasspathElements(
+        ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH);
+
+    verify(mockGcsUtil).fileSize(any(GcsPath.class));
+    verifyNoMoreInteractions(mockGcsUtil);
+  }
+
+  @Test
+  public void testPackageUploadIsNotSkippedWhenSizesAreDifferent() throws Exception {
+    Pipe pipe = Pipe.open();
+    File tmpDirectory = tmpFolder.newFolder("folder");
+    tmpFolder.newFolder("folder", "empty_directory");
+    tmpFolder.newFolder("folder", "directory");
+    makeFileWithContents("folder/file.txt", "This is a test!");
+    makeFileWithContents("folder/directory/file.txt", "This is also a test!");
+    when(mockGcsUtil.fileSize(any(GcsPath.class))).thenReturn(Long.MAX_VALUE);
+    when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
+
+    PackageUtil.stageClasspathElements(
+        ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH);
+
+    verify(mockGcsUtil).fileSize(any(GcsPath.class));
+    verify(mockGcsUtil).create(any(GcsPath.class), anyString());
+    verifyNoMoreInteractions(mockGcsUtil);
+  }
+
+  @Test
+  public void testPackageUploadWithExplicitPackageName() throws Exception {
+    Pipe pipe = Pipe.open();
+    File tmpFile = makeFileWithContents("file.txt", "This is a test!");
+    final String overriddenName = "alias.txt";
+
+    when(mockGcsUtil.fileSize(any(GcsPath.class)))
+        .thenThrow(new FileNotFoundException("some/path"));
+    when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
+
+    List<DataflowPackage> targets = PackageUtil.stageClasspathElements(
+        ImmutableList.of(overriddenName + "=" + tmpFile.getAbsolutePath()), STAGING_PATH);
+    DataflowPackage target = Iterables.getOnlyElement(targets);
+
+    verify(mockGcsUtil).fileSize(any(GcsPath.class));
+    verify(mockGcsUtil).create(any(GcsPath.class), anyString());
+    verifyNoMoreInteractions(mockGcsUtil);
+
+    assertThat(target.getName(), equalTo(overriddenName));
+    assertThat(target.getLocation(),
+        RegexMatcher.matches(STAGING_PATH + "/file-" + HASH_PATTERN + ".txt"));
+  }
+
+  @Test
+  public void testPackageUploadIsSkippedWithNonExistentResource() throws Exception {
+    String nonExistentFile =
+        IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "non-existent-file");
+    assertEquals(Collections.EMPTY_LIST, PackageUtil.stageClasspathElements(
+        ImmutableList.of(nonExistentFile), STAGING_PATH));
+  }
+
+  /**
+   * Builds a fake GoogleJsonResponseException for testing API error handling.
+   */
+  private static GoogleJsonResponseException googleJsonResponseException(
+      final int status, final String reason, final String message) throws IOException {
+    final JsonFactory jsonFactory = new JacksonFactory();
+    HttpTransport transport = new MockHttpTransport() {
+      @Override
+      public LowLevelHttpRequest buildRequest(String method, String url) throws IOException {
+        ErrorInfo errorInfo = new ErrorInfo();
+        errorInfo.setReason(reason);
+        errorInfo.setMessage(message);
+        errorInfo.setFactory(jsonFactory);
+        GenericJson error = new GenericJson();
+        error.set("code", status);
+        error.set("errors", Arrays.asList(errorInfo));
+        error.setFactory(jsonFactory);
+        GenericJson errorResponse = new GenericJson();
+        errorResponse.set("error", error);
+        errorResponse.setFactory(jsonFactory);
+        return new MockLowLevelHttpRequest().setResponse(
+            new MockLowLevelHttpResponse().setContent(errorResponse.toPrettyString())
+            .setContentType(Json.MEDIA_TYPE).setStatusCode(status));
+        }
+    };
+    HttpRequest request =
+        transport.createRequestFactory().buildGetRequest(HttpTesting.SIMPLE_GENERIC_URL);
+    request.setThrowExceptionOnExecuteError(false);
+    HttpResponse response = request.execute();
+    return GoogleJsonResponseException.from(jsonFactory, response);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/io/DataflowTextIOTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/io/DataflowTextIOTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/io/DataflowTextIOTest.java
deleted file mode 100644
index 6b9fbb4..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/io/DataflowTextIOTest.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.runners.DataflowPipelineRunner;
-import org.apache.beam.sdk.testing.TestDataflowPipelineOptions;
-import org.apache.beam.sdk.util.GcsUtil;
-import org.apache.beam.sdk.util.TestCredential;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
-
-import com.google.common.collect.ImmutableList;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.io.IOException;
-import java.nio.channels.FileChannel;
-import java.nio.channels.SeekableByteChannel;
-import java.nio.file.Files;
-import java.nio.file.StandardOpenOption;
-import java.util.List;
-
-/**
- * {@link DataflowPipelineRunner} specific tests for TextIO Read and Write transforms.
- */
-@RunWith(JUnit4.class)
-public class DataflowTextIOTest {
-
-  private TestDataflowPipelineOptions buildTestPipelineOptions() {
-    TestDataflowPipelineOptions options =
-        PipelineOptionsFactory.as(TestDataflowPipelineOptions.class);
-    options.setGcpCredential(new TestCredential());
-    return options;
-  }
-
-  private GcsUtil buildMockGcsUtil() throws IOException {
-    GcsUtil mockGcsUtil = Mockito.mock(GcsUtil.class);
-
-    // Any request to open gets a new bogus channel
-    Mockito
-        .when(mockGcsUtil.open(Mockito.any(GcsPath.class)))
-        .then(new Answer<SeekableByteChannel>() {
-          @Override
-          public SeekableByteChannel answer(InvocationOnMock invocation) throws Throwable {
-            return FileChannel.open(
-                Files.createTempFile("channel-", ".tmp"),
-                StandardOpenOption.CREATE, StandardOpenOption.DELETE_ON_CLOSE);
-          }
-        });
-
-    // Any request for expansion returns a list containing the original GcsPath
-    // This is required to pass validation that occurs in TextIO during apply()
-    Mockito
-        .when(mockGcsUtil.expand(Mockito.any(GcsPath.class)))
-        .then(new Answer<List<GcsPath>>() {
-          @Override
-          public List<GcsPath> answer(InvocationOnMock invocation) throws Throwable {
-            return ImmutableList.of((GcsPath) invocation.getArguments()[0]);
-          }
-        });
-
-    return mockGcsUtil;
-  }
-
-  /**
-   * This tests a few corner cases that should not crash.
-   */
-  @Test
-  public void testGoodWildcards() throws Exception {
-    TestDataflowPipelineOptions options = buildTestPipelineOptions();
-    options.setGcsUtil(buildMockGcsUtil());
-
-    Pipeline pipeline = Pipeline.create(options);
-
-    applyRead(pipeline, "gs://bucket/foo");
-    applyRead(pipeline, "gs://bucket/foo/");
-    applyRead(pipeline, "gs://bucket/foo/*");
-    applyRead(pipeline, "gs://bucket/foo/?");
-    applyRead(pipeline, "gs://bucket/foo/[0-9]");
-    applyRead(pipeline, "gs://bucket/foo/*baz*");
-    applyRead(pipeline, "gs://bucket/foo/*baz?");
-    applyRead(pipeline, "gs://bucket/foo/[0-9]baz?");
-    applyRead(pipeline, "gs://bucket/foo/baz/*");
-    applyRead(pipeline, "gs://bucket/foo/baz/*wonka*");
-    applyRead(pipeline, "gs://bucket/foo/*baz/wonka*");
-    applyRead(pipeline, "gs://bucket/foo*/baz");
-    applyRead(pipeline, "gs://bucket/foo?/baz");
-    applyRead(pipeline, "gs://bucket/foo[0-9]/baz");
-
-    // Check that running doesn't fail.
-    pipeline.run();
-  }
-
-  private void applyRead(Pipeline pipeline, String path) {
-    pipeline.apply("Read(" + path + ")", TextIO.Read.from(path));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowPipelineDebugOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowPipelineDebugOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowPipelineDebugOptionsTest.java
deleted file mode 100644
index c3f3a18..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowPipelineDebugOptionsTest.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.options;
-
-import static org.hamcrest.Matchers.hasEntry;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link DataflowPipelineDebugOptions}. */
-@RunWith(JUnit4.class)
-public class DataflowPipelineDebugOptionsTest {
-  @Test
-  public void testTransformNameMapping() throws Exception {
-    DataflowPipelineDebugOptions options = PipelineOptionsFactory
-        .fromArgs(new String[]{"--transformNameMapping={\"a\":\"b\",\"foo\":\"\",\"bar\":\"baz\"}"})
-        .as(DataflowPipelineDebugOptions.class);
-    assertEquals(3, options.getTransformNameMapping().size());
-    assertThat(options.getTransformNameMapping(), hasEntry("a", "b"));
-    assertThat(options.getTransformNameMapping(), hasEntry("foo", ""));
-    assertThat(options.getTransformNameMapping(), hasEntry("bar", "baz"));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowPipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowPipelineOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowPipelineOptionsTest.java
deleted file mode 100644
index c9eac56..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowPipelineOptionsTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.options;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.beam.sdk.testing.ResetDateTimeProvider;
-import org.apache.beam.sdk.testing.RestoreSystemProperties;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestRule;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link DataflowPipelineOptions}. */
-@RunWith(JUnit4.class)
-public class DataflowPipelineOptionsTest {
-  @Rule public TestRule restoreSystemProperties = new RestoreSystemProperties();
-  @Rule public ResetDateTimeProvider resetDateTimeProviderRule = new ResetDateTimeProvider();
-
-  @Test
-  public void testJobNameIsSet() {
-    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    options.setJobName("TestJobName");
-    assertEquals("TestJobName", options.getJobName());
-  }
-
-  @Test
-  public void testUserNameIsNotSet() {
-    resetDateTimeProviderRule.setDateTimeFixed("2014-12-08T19:07:06.698Z");
-    System.getProperties().remove("user.name");
-    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    options.setAppName("TestApplication");
-    assertEquals("testapplication--1208190706", options.getJobName());
-    assertTrue(options.getJobName().length() <= 40);
-  }
-
-  @Test
-  public void testAppNameAndUserNameAreLong() {
-    resetDateTimeProviderRule.setDateTimeFixed("2014-12-08T19:07:06.698Z");
-    System.getProperties().put("user.name", "abcdeabcdeabcdeabcdeabcdeabcde");
-    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    options.setAppName("1234567890123456789012345678901234567890");
-    assertEquals(
-        "a234567890123456789012345678901234567890-abcdeabcdeabcdeabcdeabcdeabcde-1208190706",
-        options.getJobName());
-  }
-
-  @Test
-  public void testAppNameIsLong() {
-    resetDateTimeProviderRule.setDateTimeFixed("2014-12-08T19:07:06.698Z");
-    System.getProperties().put("user.name", "abcde");
-    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    options.setAppName("1234567890123456789012345678901234567890");
-    assertEquals("a234567890123456789012345678901234567890-abcde-1208190706", options.getJobName());
-  }
-
-  @Test
-  public void testUserNameIsLong() {
-    resetDateTimeProviderRule.setDateTimeFixed("2014-12-08T19:07:06.698Z");
-    System.getProperties().put("user.name", "abcdeabcdeabcdeabcdeabcdeabcde");
-    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    options.setAppName("1234567890");
-    assertEquals("a234567890-abcdeabcdeabcdeabcdeabcdeabcde-1208190706", options.getJobName());
-  }
-
-  @Test
-  public void testUtf8UserNameAndApplicationNameIsNormalized() {
-    resetDateTimeProviderRule.setDateTimeFixed("2014-12-08T19:07:06.698Z");
-    System.getProperties().put("user.name", "ði ıntəˈnæʃənəl ");
-    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    options.setAppName("fəˈnɛtık əsoʊsiˈeıʃn");
-    assertEquals("f00n0t0k00so0si0e00n-0i00nt00n000n0l0-1208190706", options.getJobName());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowProfilingOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowProfilingOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowProfilingOptionsTest.java
deleted file mode 100644
index 18c8085..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowProfilingOptionsTest.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.options;
-
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import org.hamcrest.Matchers;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link DataflowProfilingOptions}.
- */
-@RunWith(JUnit4.class)
-public class DataflowProfilingOptionsTest {
-
-  private static final ObjectMapper MAPPER = new ObjectMapper();
-
-  @Test
-  public void testOptionsObject() throws Exception {
-    DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(new String[] {
-        "--enableProfilingAgent", "--profilingAgentConfiguration={\"interval\": 21}"})
-        .as(DataflowPipelineOptions.class);
-    assertTrue(options.getEnableProfilingAgent());
-
-    String json = MAPPER.writeValueAsString(options);
-    assertThat(json, Matchers.containsString(
-        "\"profilingAgentConfiguration\":{\"interval\":21}"));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowWorkerLoggingOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowWorkerLoggingOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowWorkerLoggingOptionsTest.java
deleted file mode 100644
index 47d518d..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowWorkerLoggingOptionsTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.options;
-
-import static org.apache.beam.sdk.options.DataflowWorkerLoggingOptions.Level.WARN;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.beam.sdk.options.DataflowWorkerLoggingOptions.WorkerLogLevelOverrides;
-
-import com.google.common.collect.ImmutableMap;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link DataflowWorkerLoggingOptions}. */
-@RunWith(JUnit4.class)
-public class DataflowWorkerLoggingOptionsTest {
-  private static final ObjectMapper MAPPER = new ObjectMapper();
-  @Rule public ExpectedException expectedException = ExpectedException.none();
-
-  @Test
-  public void testWorkerLogLevelOverrideWithInvalidLogLevel() {
-    expectedException.expect(IllegalArgumentException.class);
-    expectedException.expectMessage("Unsupported log level");
-    WorkerLogLevelOverrides.from(ImmutableMap.of("Name", "FakeLevel"));
-  }
-
-  @Test
-  public void testWorkerLogLevelOverrideForClass() throws Exception {
-    assertEquals("{\"org.junit.Test\":\"WARN\"}",
-        MAPPER.writeValueAsString(
-            new WorkerLogLevelOverrides().addOverrideForClass(Test.class, WARN)));
-  }
-
-  @Test
-  public void testWorkerLogLevelOverrideForPackage() throws Exception {
-    assertEquals("{\"org.junit\":\"WARN\"}",
-        MAPPER.writeValueAsString(
-            new WorkerLogLevelOverrides().addOverrideForPackage(Test.class.getPackage(), WARN)));
-  }
-
-  @Test
-  public void testWorkerLogLevelOverrideForName() throws Exception {
-    assertEquals("{\"A\":\"WARN\"}",
-        MAPPER.writeValueAsString(
-            new WorkerLogLevelOverrides().addOverrideForName("A", WARN)));
-  }
-
-  @Test
-  public void testSerializationAndDeserializationOf() throws Exception {
-    String testValue = "{\"A\":\"WARN\"}";
-    assertEquals(testValue,
-        MAPPER.writeValueAsString(
-            MAPPER.readValue(testValue, WorkerLogLevelOverrides.class)));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/BlockingDataflowPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/BlockingDataflowPipelineRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/BlockingDataflowPipelineRunnerTest.java
deleted file mode 100644
index 13e120b..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/BlockingDataflowPipelineRunnerTest.java
+++ /dev/null
@@ -1,302 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners;
-
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.isA;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineResult.State;
-import org.apache.beam.sdk.options.DataflowPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.ExpectedLogs;
-import org.apache.beam.sdk.testing.TestDataflowPipelineOptions;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.util.MonitoringUtil;
-import org.apache.beam.sdk.util.NoopPathValidator;
-import org.apache.beam.sdk.util.TestCredential;
-
-import org.hamcrest.Description;
-import org.hamcrest.Factory;
-import org.hamcrest.Matcher;
-import org.hamcrest.TypeSafeMatcher;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * Tests for BlockingDataflowPipelineRunner.
- */
-@RunWith(JUnit4.class)
-public class BlockingDataflowPipelineRunnerTest {
-
-  @Rule
-  public ExpectedLogs expectedLogs = ExpectedLogs.none(BlockingDataflowPipelineRunner.class);
-
-  @Rule
-  public ExpectedException expectedThrown = ExpectedException.none();
-
-  /**
-   * A {@link Matcher} for a {@link DataflowJobException} that applies an underlying {@link Matcher}
-   * to the {@link DataflowPipelineJob} returned by {@link DataflowJobException#getJob()}.
-   */
-  private static class DataflowJobExceptionMatcher<T extends DataflowJobException>
-      extends TypeSafeMatcher<T> {
-
-    private final Matcher<DataflowPipelineJob> matcher;
-
-    public DataflowJobExceptionMatcher(Matcher<DataflowPipelineJob> matcher) {
-        this.matcher = matcher;
-    }
-
-    @Override
-    public boolean matchesSafely(T ex) {
-      return matcher.matches(ex.getJob());
-    }
-
-    @Override
-    protected void describeMismatchSafely(T item, Description description) {
-        description.appendText("job ");
-        matcher.describeMismatch(item.getMessage(), description);
-    }
-
-    @Override
-    public void describeTo(Description description) {
-      description.appendText("exception with job matching ");
-      description.appendDescriptionOf(matcher);
-    }
-
-    @Factory
-    public static <T extends DataflowJobException> Matcher<T> expectJob(
-        Matcher<DataflowPipelineJob> matcher) {
-      return new DataflowJobExceptionMatcher<T>(matcher);
-    }
-  }
-
-  /**
-   * A {@link Matcher} for a {@link DataflowPipelineJob} that applies an underlying {@link Matcher}
-   * to the return value of {@link DataflowPipelineJob#getJobId()}.
-   */
-  private static class JobIdMatcher<T extends DataflowPipelineJob> extends TypeSafeMatcher<T> {
-
-    private final Matcher<String> matcher;
-
-    public JobIdMatcher(Matcher<String> matcher) {
-        this.matcher = matcher;
-    }
-
-    @Override
-    public boolean matchesSafely(T job) {
-      return matcher.matches(job.getJobId());
-    }
-
-    @Override
-    protected void describeMismatchSafely(T item, Description description) {
-        description.appendText("jobId ");
-        matcher.describeMismatch(item.getJobId(), description);
-    }
-
-    @Override
-    public void describeTo(Description description) {
-      description.appendText("job with jobId ");
-      description.appendDescriptionOf(matcher);
-    }
-
-    @Factory
-    public static <T extends DataflowPipelineJob> Matcher<T> expectJobId(final String jobId) {
-      return new JobIdMatcher<T>(equalTo(jobId));
-    }
-  }
-
-  /**
-   * A {@link Matcher} for a {@link DataflowJobUpdatedException} that applies an underlying
-   * {@link Matcher} to the {@link DataflowPipelineJob} returned by
-   * {@link DataflowJobUpdatedException#getReplacedByJob()}.
-   */
-  private static class ReplacedByJobMatcher<T extends DataflowJobUpdatedException>
-      extends TypeSafeMatcher<T> {
-
-    private final Matcher<DataflowPipelineJob> matcher;
-
-    public ReplacedByJobMatcher(Matcher<DataflowPipelineJob> matcher) {
-        this.matcher = matcher;
-    }
-
-    @Override
-    public boolean matchesSafely(T ex) {
-      return matcher.matches(ex.getReplacedByJob());
-    }
-
-    @Override
-    protected void describeMismatchSafely(T item, Description description) {
-        description.appendText("job ");
-        matcher.describeMismatch(item.getMessage(), description);
-    }
-
-    @Override
-    public void describeTo(Description description) {
-      description.appendText("exception with replacedByJob() ");
-      description.appendDescriptionOf(matcher);
-    }
-
-    @Factory
-    public static <T extends DataflowJobUpdatedException> Matcher<T> expectReplacedBy(
-        Matcher<DataflowPipelineJob> matcher) {
-      return new ReplacedByJobMatcher<T>(matcher);
-    }
-  }
-
-  /**
-   * Creates a mocked {@link DataflowPipelineJob} with the given {@code projectId} and {@code jobId}
-   * that will immediately terminate in the provided {@code terminalState}.
-   *
-   * <p>The return value may be further mocked.
-   */
-  private DataflowPipelineJob createMockJob(
-      String projectId, String jobId, State terminalState) throws Exception {
-    DataflowPipelineJob mockJob = mock(DataflowPipelineJob.class);
-    when(mockJob.getProjectId()).thenReturn(projectId);
-    when(mockJob.getJobId()).thenReturn(jobId);
-    when(mockJob.waitToFinish(
-        anyLong(), isA(TimeUnit.class), isA(MonitoringUtil.JobMessagesHandler.class)))
-        .thenReturn(terminalState);
-    return mockJob;
-  }
-
-  /**
-   * Returns a {@link BlockingDataflowPipelineRunner} that will return the provided a job to return.
-   * Some {@link PipelineOptions} will be extracted from the job, such as the project ID.
-   */
-  private BlockingDataflowPipelineRunner createMockRunner(DataflowPipelineJob job)
-      throws Exception {
-    DataflowPipelineRunner mockRunner = mock(DataflowPipelineRunner.class);
-    TestDataflowPipelineOptions options =
-        PipelineOptionsFactory.as(TestDataflowPipelineOptions.class);
-    options.setProject(job.getProjectId());
-
-    when(mockRunner.run(isA(Pipeline.class))).thenReturn(job);
-
-    return new BlockingDataflowPipelineRunner(mockRunner, options);
-  }
-
-  /**
-   * Tests that the {@link BlockingDataflowPipelineRunner} returns normally when a job terminates in
-   * the {@link State#DONE DONE} state.
-   */
-  @Test
-  public void testJobDoneComplete() throws Exception {
-    createMockRunner(createMockJob("testJobDone-projectId", "testJobDone-jobId", State.DONE))
-        .run(TestPipeline.create());
-    expectedLogs.verifyInfo("Job finished with status DONE");
-  }
-
-  /**
-   * Tests that the {@link BlockingDataflowPipelineRunner} throws the appropriate exception
-   * when a job terminates in the {@link State#FAILED FAILED} state.
-   */
-  @Test
-  public void testFailedJobThrowsException() throws Exception {
-    expectedThrown.expect(DataflowJobExecutionException.class);
-    expectedThrown.expect(DataflowJobExceptionMatcher.expectJob(
-        JobIdMatcher.expectJobId("testFailedJob-jobId")));
-    createMockRunner(createMockJob("testFailedJob-projectId", "testFailedJob-jobId", State.FAILED))
-        .run(TestPipeline.create());
-  }
-
-  /**
-   * Tests that the {@link BlockingDataflowPipelineRunner} throws the appropriate exception
-   * when a job terminates in the {@link State#CANCELLED CANCELLED} state.
-   */
-  @Test
-  public void testCancelledJobThrowsException() throws Exception {
-    expectedThrown.expect(DataflowJobCancelledException.class);
-    expectedThrown.expect(DataflowJobExceptionMatcher.expectJob(
-        JobIdMatcher.expectJobId("testCancelledJob-jobId")));
-    createMockRunner(
-            createMockJob("testCancelledJob-projectId", "testCancelledJob-jobId", State.CANCELLED))
-        .run(TestPipeline.create());
-  }
-
-  /**
-   * Tests that the {@link BlockingDataflowPipelineRunner} throws the appropriate exception
-   * when a job terminates in the {@link State#UPDATED UPDATED} state.
-   */
-  @Test
-  public void testUpdatedJobThrowsException() throws Exception {
-    expectedThrown.expect(DataflowJobUpdatedException.class);
-    expectedThrown.expect(DataflowJobExceptionMatcher.expectJob(
-        JobIdMatcher.expectJobId("testUpdatedJob-jobId")));
-    expectedThrown.expect(ReplacedByJobMatcher.expectReplacedBy(
-        JobIdMatcher.expectJobId("testUpdatedJob-replacedByJobId")));
-    DataflowPipelineJob job =
-        createMockJob("testUpdatedJob-projectId", "testUpdatedJob-jobId", State.UPDATED);
-    DataflowPipelineJob replacedByJob =
-        createMockJob("testUpdatedJob-projectId", "testUpdatedJob-replacedByJobId", State.DONE);
-    when(job.getReplacedByJob()).thenReturn(replacedByJob);
-    createMockRunner(job).run(TestPipeline.create());
-  }
-
-  /**
-   * Tests that the {@link BlockingDataflowPipelineRunner} throws the appropriate exception
-   * when a job terminates in the {@link State#UNKNOWN UNKNOWN} state, indicating that the
-   * Dataflow service returned a state that the SDK is unfamiliar with (possibly because it
-   * is an old SDK relative the service).
-   */
-  @Test
-  public void testUnknownJobThrowsException() throws Exception {
-    expectedThrown.expect(IllegalStateException.class);
-    createMockRunner(
-            createMockJob("testUnknownJob-projectId", "testUnknownJob-jobId", State.UNKNOWN))
-        .run(TestPipeline.create());
-  }
-
-  /**
-   * Tests that the {@link BlockingDataflowPipelineRunner} throws the appropriate exception
-   * when a job returns a {@code null} state, indicating that it failed to contact the service,
-   * including all of its built-in resilience logic.
-   */
-  @Test
-  public void testNullJobThrowsException() throws Exception {
-    expectedThrown.expect(DataflowServiceException.class);
-    expectedThrown.expect(DataflowJobExceptionMatcher.expectJob(
-        JobIdMatcher.expectJobId("testNullJob-jobId")));
-    createMockRunner(createMockJob("testNullJob-projectId", "testNullJob-jobId", null))
-        .run(TestPipeline.create());
-  }
-
-  @Test
-  public void testToString() {
-    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    options.setJobName("TestJobName");
-    options.setProject("test-project");
-    options.setTempLocation("gs://test/temp/location");
-    options.setGcpCredential(new TestCredential());
-    options.setPathValidatorClass(NoopPathValidator.class);
-    assertEquals("BlockingDataflowPipelineRunner#testjobname",
-        BlockingDataflowPipelineRunner.fromOptions(options).toString());
-  }
-}