You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/04/27 03:08:38 UTC
[06/21] incubator-beam git commit: Reorganize Java packages in the
sources of the Google Cloud Dataflow runner
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
new file mode 100644
index 0000000..f0e677e
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.transforms;
+
+import org.apache.beam.runners.dataflow.DataflowPipelineRunner;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.NoopPathValidator;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+import org.joda.time.Duration;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** Tests for {@link GroupByKey} for the {@link DataflowPipelineRunner}. */
+@RunWith(JUnit4.class)
+public class DataflowGroupByKeyTest {
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ /**
+ * Create a test pipeline that uses the {@link DataflowPipelineRunner} so that {@link GroupByKey}
+ * is not expanded. This is used for verifying that even without expansion the proper errors show
+ * up.
+ */
+ private Pipeline createTestServiceRunner() {
+ DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+ options.setRunner(DataflowPipelineRunner.class);
+ options.setProject("someproject");
+ options.setStagingLocation("gs://staging");
+ options.setPathValidatorClass(NoopPathValidator.class);
+ options.setDataflowClient(null);
+ return Pipeline.create(options);
+ }
+
+ @Test
+ public void testInvalidWindowsService() {
+ Pipeline p = createTestServiceRunner();
+
+ List<KV<String, Integer>> ungroupedPairs = Arrays.asList();
+
+ PCollection<KV<String, Integer>> input =
+ p.apply(Create.of(ungroupedPairs)
+ .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())))
+ .apply(Window.<KV<String, Integer>>into(
+ Sessions.withGapDuration(Duration.standardMinutes(1))));
+
+ thrown.expect(IllegalStateException.class);
+ thrown.expectMessage("GroupByKey must have a valid Window merge function");
+ input
+ .apply("GroupByKey", GroupByKey.<String, Integer>create())
+ .apply("GroupByKeyAgain", GroupByKey.<String, Iterable<Integer>>create());
+ }
+
+ @Test
+ public void testGroupByKeyServiceUnbounded() {
+ Pipeline p = createTestServiceRunner();
+
+ PCollection<KV<String, Integer>> input =
+ p.apply(
+ new PTransform<PBegin, PCollection<KV<String, Integer>>>() {
+ @Override
+ public PCollection<KV<String, Integer>> apply(PBegin input) {
+ return PCollection.<KV<String, Integer>>createPrimitiveOutputInternal(
+ input.getPipeline(),
+ WindowingStrategy.globalDefault(),
+ PCollection.IsBounded.UNBOUNDED)
+ .setTypeDescriptorInternal(new TypeDescriptor<KV<String, Integer>>() {});
+ }
+ });
+
+ thrown.expect(IllegalStateException.class);
+ thrown.expectMessage(
+ "GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without "
+ + "a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey.");
+
+ input.apply("GroupByKey", GroupByKey.<String, Integer>create());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
new file mode 100644
index 0000000..d787500
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.transforms;
+
+import org.apache.beam.runners.dataflow.DataflowPipelineRunner;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.InvalidWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.NoopPathValidator;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+import org.hamcrest.Matchers;
+import org.joda.time.Duration;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.internal.matchers.ThrowableMessageMatcher;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link View} for a {@link DataflowPipelineRunner}. */
+@RunWith(JUnit4.class)
+public class DataflowViewTest {
+ @Rule
+ public transient ExpectedException thrown = ExpectedException.none();
+
+ private Pipeline createTestBatchRunner() {
+ DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+ options.setRunner(DataflowPipelineRunner.class);
+ options.setProject("someproject");
+ options.setStagingLocation("gs://staging");
+ options.setPathValidatorClass(NoopPathValidator.class);
+ options.setDataflowClient(null);
+ return Pipeline.create(options);
+ }
+
+ private Pipeline createTestStreamingRunner() {
+ DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+ options.setRunner(DataflowPipelineRunner.class);
+ options.setStreaming(true);
+ options.setProject("someproject");
+ options.setStagingLocation("gs://staging");
+ options.setPathValidatorClass(NoopPathValidator.class);
+ options.setDataflowClient(null);
+ return Pipeline.create(options);
+ }
+
+ private void testViewUnbounded(
+ Pipeline pipeline,
+ PTransform<PCollection<KV<String, Integer>>, ? extends PCollectionView<?>> view) {
+ thrown.expect(IllegalStateException.class);
+ thrown.expectMessage("Unable to create a side-input view from input");
+ thrown.expectCause(
+ ThrowableMessageMatcher.hasMessage(Matchers.containsString("non-bounded PCollection")));
+ pipeline
+ .apply(
+ new PTransform<PBegin, PCollection<KV<String, Integer>>>() {
+ @Override
+ public PCollection<KV<String, Integer>> apply(PBegin input) {
+ return PCollection.<KV<String, Integer>>createPrimitiveOutputInternal(
+ input.getPipeline(),
+ WindowingStrategy.globalDefault(),
+ PCollection.IsBounded.UNBOUNDED)
+ .setTypeDescriptorInternal(new TypeDescriptor<KV<String, Integer>>() {});
+ }
+ })
+ .apply(view);
+ }
+
+ private void testViewNonmerging(
+ Pipeline pipeline,
+ PTransform<PCollection<KV<String, Integer>>, ? extends PCollectionView<?>> view) {
+ thrown.expect(IllegalStateException.class);
+ thrown.expectMessage("Unable to create a side-input view from input");
+ thrown.expectCause(
+ ThrowableMessageMatcher.hasMessage(Matchers.containsString("Consumed by GroupByKey")));
+ pipeline.apply(Create.<KV<String, Integer>>of(KV.of("hello", 5)))
+ .apply(Window.<KV<String, Integer>>into(new InvalidWindows<>(
+ "Consumed by GroupByKey", FixedWindows.of(Duration.standardHours(1)))))
+ .apply(view);
+ }
+
+ @Test
+ public void testViewUnboundedAsSingletonBatch() {
+ testViewUnbounded(createTestBatchRunner(), View.<KV<String, Integer>>asSingleton());
+ }
+
+ @Test
+ public void testViewUnboundedAsSingletonStreaming() {
+ testViewUnbounded(createTestStreamingRunner(), View.<KV<String, Integer>>asSingleton());
+ }
+
+ @Test
+ public void testViewUnboundedAsIterableBatch() {
+ testViewUnbounded(createTestBatchRunner(), View.<KV<String, Integer>>asIterable());
+ }
+
+ @Test
+ public void testViewUnboundedAsIterableStreaming() {
+ testViewUnbounded(createTestStreamingRunner(), View.<KV<String, Integer>>asIterable());
+ }
+
+ @Test
+ public void testViewUnboundedAsListBatch() {
+ testViewUnbounded(createTestBatchRunner(), View.<KV<String, Integer>>asList());
+ }
+
+ @Test
+ public void testViewUnboundedAsListStreaming() {
+ testViewUnbounded(createTestStreamingRunner(), View.<KV<String, Integer>>asList());
+ }
+
+ @Test
+ public void testViewUnboundedAsMapBatch() {
+ testViewUnbounded(createTestBatchRunner(), View.<String, Integer>asMap());
+ }
+
+ @Test
+ public void testViewUnboundedAsMapStreaming() {
+ testViewUnbounded(createTestStreamingRunner(), View.<String, Integer>asMap());
+ }
+
+ @Test
+ public void testViewUnboundedAsMultimapBatch() {
+ testViewUnbounded(createTestBatchRunner(), View.<String, Integer>asMultimap());
+ }
+
+ @Test
+ public void testViewUnboundedAsMultimapStreaming() {
+ testViewUnbounded(createTestStreamingRunner(), View.<String, Integer>asMultimap());
+ }
+
+ @Test
+ public void testViewNonmergingAsSingletonBatch() {
+ testViewNonmerging(createTestBatchRunner(), View.<KV<String, Integer>>asSingleton());
+ }
+
+ @Test
+ public void testViewNonmergingAsSingletonStreaming() {
+ testViewNonmerging(createTestStreamingRunner(), View.<KV<String, Integer>>asSingleton());
+ }
+
+ @Test
+ public void testViewNonmergingAsIterableBatch() {
+ testViewNonmerging(createTestBatchRunner(), View.<KV<String, Integer>>asIterable());
+ }
+
+ @Test
+ public void testViewNonmergingAsIterableStreaming() {
+ testViewNonmerging(createTestStreamingRunner(), View.<KV<String, Integer>>asIterable());
+ }
+
+ @Test
+ public void testViewNonmergingAsListBatch() {
+ testViewNonmerging(createTestBatchRunner(), View.<KV<String, Integer>>asList());
+ }
+
+ @Test
+ public void testViewNonmergingAsListStreaming() {
+ testViewNonmerging(createTestStreamingRunner(), View.<KV<String, Integer>>asList());
+ }
+
+ @Test
+ public void testViewNonmergingAsMapBatch() {
+ testViewNonmerging(createTestBatchRunner(), View.<String, Integer>asMap());
+ }
+
+ @Test
+ public void testViewNonmergingAsMapStreaming() {
+ testViewNonmerging(createTestStreamingRunner(), View.<String, Integer>asMap());
+ }
+
+ @Test
+ public void testViewNonmergingAsMultimapBatch() {
+ testViewNonmerging(createTestBatchRunner(), View.<String, Integer>asMultimap());
+ }
+
+ @Test
+ public void testViewNonmergingAsMultimapStreaming() {
+ testViewNonmerging(createTestStreamingRunner(), View.<String, Integer>asMultimap());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/DataflowPathValidatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/DataflowPathValidatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/DataflowPathValidatorTest.java
new file mode 100644
index 0000000..5587986
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/DataflowPathValidatorTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.util;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.runners.dataflow.DataflowPipelineRunner;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.util.GcsUtil;
+import org.apache.beam.sdk.util.TestCredential;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Tests for {@link DataflowPathValidator}. */
+@RunWith(JUnit4.class)
+public class DataflowPathValidatorTest {
+ @Rule public ExpectedException expectedException = ExpectedException.none();
+
+ @Mock private GcsUtil mockGcsUtil;
+ private DataflowPathValidator validator;
+
+ @Before
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(true);
+ when(mockGcsUtil.isGcsPatternSupported(anyString())).thenCallRealMethod();
+ DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+ options.setGcpCredential(new TestCredential());
+ options.setRunner(DataflowPipelineRunner.class);
+ options.setGcsUtil(mockGcsUtil);
+ validator = new DataflowPathValidator(options);
+ }
+
+ @Test
+ public void testValidFilePattern() {
+ validator.validateInputFilePatternSupported("gs://bucket/path");
+ }
+
+ @Test
+ public void testInvalidFilePattern() {
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage(
+ "DataflowPipelineRunner expected a valid 'gs://' path but was given '/local/path'");
+ validator.validateInputFilePatternSupported("/local/path");
+ }
+
+ @Test
+ public void testWhenBucketDoesNotExist() throws Exception {
+ when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(false);
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage(
+ "Could not find file gs://non-existent-bucket/location");
+ validator.validateInputFilePatternSupported("gs://non-existent-bucket/location");
+ }
+
+ @Test
+ public void testValidOutputPrefix() {
+ validator.validateOutputFilePrefixSupported("gs://bucket/path");
+ }
+
+ @Test
+ public void testInvalidOutputPrefix() {
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage(
+ "DataflowPipelineRunner expected a valid 'gs://' path but was given '/local/path'");
+ validator.validateOutputFilePrefixSupported("/local/path");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java
new file mode 100644
index 0000000..ee1532d
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.sdk.PipelineResult.State;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.util.TestCredential;
+import org.apache.beam.sdk.util.TimeUtil;
+
+import com.google.api.services.dataflow.Dataflow;
+import com.google.api.services.dataflow.model.JobMessage;
+import com.google.api.services.dataflow.model.ListJobMessagesResponse;
+
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests for MonitoringUtil.
+ */
+@RunWith(JUnit4.class)
+public class MonitoringUtilTest {
+ private static final String PROJECT_ID = "someProject";
+ private static final String JOB_ID = "1234";
+
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void testGetJobMessages() throws IOException {
+ Dataflow.Projects.Jobs.Messages mockMessages = mock(Dataflow.Projects.Jobs.Messages.class);
+
+ // Two requests are needed to get all the messages.
+ Dataflow.Projects.Jobs.Messages.List firstRequest =
+ mock(Dataflow.Projects.Jobs.Messages.List.class);
+ Dataflow.Projects.Jobs.Messages.List secondRequest =
+ mock(Dataflow.Projects.Jobs.Messages.List.class);
+
+ when(mockMessages.list(PROJECT_ID, JOB_ID)).thenReturn(firstRequest).thenReturn(secondRequest);
+
+ ListJobMessagesResponse firstResponse = new ListJobMessagesResponse();
+ firstResponse.setJobMessages(new ArrayList<JobMessage>());
+ for (int i = 0; i < 100; ++i) {
+ JobMessage message = new JobMessage();
+ message.setId("message_" + i);
+ message.setTime(TimeUtil.toCloudTime(new Instant(i)));
+ firstResponse.getJobMessages().add(message);
+ }
+ String pageToken = "page_token";
+ firstResponse.setNextPageToken(pageToken);
+
+ ListJobMessagesResponse secondResponse = new ListJobMessagesResponse();
+ secondResponse.setJobMessages(new ArrayList<JobMessage>());
+ for (int i = 100; i < 150; ++i) {
+ JobMessage message = new JobMessage();
+ message.setId("message_" + i);
+ message.setTime(TimeUtil.toCloudTime(new Instant(i)));
+ secondResponse.getJobMessages().add(message);
+ }
+
+ when(firstRequest.execute()).thenReturn(firstResponse);
+ when(secondRequest.execute()).thenReturn(secondResponse);
+
+ MonitoringUtil util = new MonitoringUtil(PROJECT_ID, mockMessages);
+
+ List<JobMessage> messages = util.getJobMessages(JOB_ID, -1);
+
+ verify(secondRequest).setPageToken(pageToken);
+
+ assertEquals(150, messages.size());
+ }
+
+ @Test
+ public void testToStateCreatesState() {
+ String stateName = "JOB_STATE_DONE";
+
+ State result = MonitoringUtil.toState(stateName);
+
+ assertEquals(State.DONE, result);
+ }
+
+ @Test
+ public void testToStateWithNullReturnsUnknown() {
+ String stateName = null;
+
+ State result = MonitoringUtil.toState(stateName);
+
+ assertEquals(State.UNKNOWN, result);
+ }
+
+ @Test
+ public void testToStateWithOtherValueReturnsUnknown() {
+ String stateName = "FOO_BAR_BAZ";
+
+ State result = MonitoringUtil.toState(stateName);
+
+ assertEquals(State.UNKNOWN, result);
+ }
+
+ @Test
+ public void testDontOverrideEndpointWithDefaultApi() {
+ DataflowPipelineOptions options =
+ PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
+ options.setProject(PROJECT_ID);
+ options.setGcpCredential(new TestCredential());
+ String cancelCommand = MonitoringUtil.getGcloudCancelCommand(options, JOB_ID);
+ assertEquals("gcloud alpha dataflow jobs --project=someProject cancel 1234", cancelCommand);
+ }
+
+ @Test
+ public void testOverridesEndpointWithStagedDataflowEndpoint() {
+ DataflowPipelineOptions options =
+ PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
+ options.setProject(PROJECT_ID);
+ options.setGcpCredential(new TestCredential());
+ String stagingDataflowEndpoint = "v0neverExisted";
+ options.setDataflowEndpoint(stagingDataflowEndpoint);
+ String cancelCommand = MonitoringUtil.getGcloudCancelCommand(options, JOB_ID);
+ assertEquals(
+ "CLOUDSDK_API_ENDPOINT_OVERRIDES_DATAFLOW=https://dataflow.googleapis.com/v0neverExisted/ "
+ + "gcloud alpha dataflow jobs --project=someProject cancel 1234",
+ cancelCommand);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
new file mode 100644
index 0000000..41ad05d
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.util;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.runners.dataflow.util.PackageUtil.PackageAttributes;
+import org.apache.beam.sdk.options.GcsOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.ExpectedLogs;
+import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
+import org.apache.beam.sdk.util.GcsUtil;
+import org.apache.beam.sdk.util.IOChannelUtils;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+
+import com.google.api.client.googleapis.json.GoogleJsonError.ErrorInfo;
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.client.http.HttpRequest;
+import com.google.api.client.http.HttpResponse;
+import com.google.api.client.http.HttpStatusCodes;
+import com.google.api.client.http.HttpTransport;
+import com.google.api.client.http.LowLevelHttpRequest;
+import com.google.api.client.json.GenericJson;
+import com.google.api.client.json.Json;
+import com.google.api.client.json.JsonFactory;
+import com.google.api.client.json.jackson2.JacksonFactory;
+import com.google.api.client.testing.http.HttpTesting;
+import com.google.api.client.testing.http.MockHttpTransport;
+import com.google.api.client.testing.http.MockLowLevelHttpRequest;
+import com.google.api.client.testing.http.MockLowLevelHttpResponse;
+import com.google.api.services.dataflow.model.DataflowPackage;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+import com.google.common.io.LineReader;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matchers;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.channels.Channels;
+import java.nio.channels.Pipe;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+/** Tests for PackageUtil. */
+@RunWith(JUnit4.class)
+public class PackageUtilTest {
+ @Rule public ExpectedLogs logged = ExpectedLogs.none(PackageUtil.class);
+ @Rule
+ public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+ @Rule
+ public FastNanoClockAndSleeper fastNanoClockAndSleeper = new FastNanoClockAndSleeper();
+
+ @Mock
+ GcsUtil mockGcsUtil;
+
+ // 128 bits, base64 encoded is 171 bits, rounds to 22 bytes
+ private static final String HASH_PATTERN = "[a-zA-Z0-9+-]{22}";
+
+ // Hamcrest matcher to assert a string matches a pattern
+ private static class RegexMatcher extends BaseMatcher<String> {
+ private final Pattern pattern;
+
+ public RegexMatcher(String regex) {
+ this.pattern = Pattern.compile(regex);
+ }
+
+ @Override
+ public boolean matches(Object o) {
+ if (!(o instanceof String)) {
+ return false;
+ }
+ return pattern.matcher((String) o).matches();
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendText(String.format("matches regular expression %s", pattern));
+ }
+
+ public static RegexMatcher matches(String regex) {
+ return new RegexMatcher(regex);
+ }
+ }
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+
+ GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class);
+ pipelineOptions.setGcsUtil(mockGcsUtil);
+
+ IOChannelUtils.registerStandardIOFactories(pipelineOptions);
+ }
+
+ private File makeFileWithContents(String name, String contents) throws Exception {
+ File tmpFile = tmpFolder.newFile(name);
+ Files.write(contents, tmpFile, StandardCharsets.UTF_8);
+ tmpFile.setLastModified(0); // required for determinism with directories
+ return tmpFile;
+ }
+
+ static final String STAGING_PATH = GcsPath.fromComponents("somebucket", "base/path").toString();
+ private static PackageAttributes makePackageAttributes(File file, String overridePackageName) {
+ return PackageUtil.createPackageAttributes(file, STAGING_PATH, overridePackageName);
+ }
+
+ @Test
+ public void testFileWithExtensionPackageNamingAndSize() throws Exception {
+ String contents = "This is a test!";
+ File tmpFile = makeFileWithContents("file.txt", contents);
+ PackageAttributes attr = makePackageAttributes(tmpFile, null);
+ DataflowPackage target = attr.getDataflowPackage();
+
+ assertThat(target.getName(), RegexMatcher.matches("file-" + HASH_PATTERN + ".txt"));
+ assertThat(target.getLocation(), equalTo(STAGING_PATH + '/' + target.getName()));
+ assertThat(attr.getSize(), equalTo((long) contents.length()));
+ }
+
+ @Test
+ public void testPackageNamingWithFileNoExtension() throws Exception {
+ File tmpFile = makeFileWithContents("file", "This is a test!");
+ DataflowPackage target = makePackageAttributes(tmpFile, null).getDataflowPackage();
+
+ assertThat(target.getName(), RegexMatcher.matches("file-" + HASH_PATTERN));
+ assertThat(target.getLocation(), equalTo(STAGING_PATH + '/' + target.getName()));
+ }
+
+ @Test
+ public void testPackageNamingWithDirectory() throws Exception {
+ File tmpDirectory = tmpFolder.newFolder("folder");
+ DataflowPackage target = makePackageAttributes(tmpDirectory, null).getDataflowPackage();
+
+ assertThat(target.getName(), RegexMatcher.matches("folder-" + HASH_PATTERN + ".jar"));
+ assertThat(target.getLocation(), equalTo(STAGING_PATH + '/' + target.getName()));
+ }
+
+ @Test
+ public void testPackageNamingWithFilesHavingSameContentsAndSameNames() throws Exception {
+ File tmpDirectory1 = tmpFolder.newFolder("folder1", "folderA");
+ makeFileWithContents("folder1/folderA/sameName", "This is a test!");
+ DataflowPackage target1 = makePackageAttributes(tmpDirectory1, null).getDataflowPackage();
+
+ File tmpDirectory2 = tmpFolder.newFolder("folder2", "folderA");
+ makeFileWithContents("folder2/folderA/sameName", "This is a test!");
+ DataflowPackage target2 = makePackageAttributes(tmpDirectory2, null).getDataflowPackage();
+
+ assertEquals(target1.getName(), target2.getName());
+ assertEquals(target1.getLocation(), target2.getLocation());
+ }
+
+ @Test
+ public void testPackageNamingWithFilesHavingSameContentsButDifferentNames() throws Exception {
+ File tmpDirectory1 = tmpFolder.newFolder("folder1", "folderA");
+ makeFileWithContents("folder1/folderA/uniqueName1", "This is a test!");
+ DataflowPackage target1 = makePackageAttributes(tmpDirectory1, null).getDataflowPackage();
+
+ File tmpDirectory2 = tmpFolder.newFolder("folder2", "folderA");
+ makeFileWithContents("folder2/folderA/uniqueName2", "This is a test!");
+ DataflowPackage target2 = makePackageAttributes(tmpDirectory2, null).getDataflowPackage();
+
+ assertNotEquals(target1.getName(), target2.getName());
+ assertNotEquals(target1.getLocation(), target2.getLocation());
+ }
+
+ @Test
+ public void testPackageNamingWithDirectoriesHavingSameContentsButDifferentNames()
+ throws Exception {
+ File tmpDirectory1 = tmpFolder.newFolder("folder1", "folderA");
+ tmpFolder.newFolder("folder1", "folderA", "uniqueName1");
+ DataflowPackage target1 = makePackageAttributes(tmpDirectory1, null).getDataflowPackage();
+
+ File tmpDirectory2 = tmpFolder.newFolder("folder2", "folderA");
+ tmpFolder.newFolder("folder2", "folderA", "uniqueName2");
+ DataflowPackage target2 = makePackageAttributes(tmpDirectory2, null).getDataflowPackage();
+
+ assertNotEquals(target1.getName(), target2.getName());
+ assertNotEquals(target1.getLocation(), target2.getLocation());
+ }
+
+ @Test
+ public void testPackageUploadWithLargeClasspathLogsWarning() throws Exception {
+ File tmpFile = makeFileWithContents("file.txt", "This is a test!");
+ // all files will be present and cached so no upload needed.
+ when(mockGcsUtil.fileSize(any(GcsPath.class))).thenReturn(tmpFile.length());
+
+ List<String> classpathElements = Lists.newLinkedList();
+ for (int i = 0; i < 1005; ++i) {
+ String eltName = "element" + i;
+ classpathElements.add(eltName + '=' + tmpFile.getAbsolutePath());
+ }
+
+ PackageUtil.stageClasspathElements(classpathElements, STAGING_PATH);
+
+ logged.verifyWarn("Your classpath contains 1005 elements, which Google Cloud Dataflow");
+ }
+
+ @Test
+ public void testPackageUploadWithFileSucceeds() throws Exception {
+ Pipe pipe = Pipe.open();
+ String contents = "This is a test!";
+ File tmpFile = makeFileWithContents("file.txt", contents);
+ when(mockGcsUtil.fileSize(any(GcsPath.class)))
+ .thenThrow(new FileNotFoundException("some/path"));
+ when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
+
+ List<DataflowPackage> targets = PackageUtil.stageClasspathElements(
+ ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH);
+ DataflowPackage target = Iterables.getOnlyElement(targets);
+
+ verify(mockGcsUtil).fileSize(any(GcsPath.class));
+ verify(mockGcsUtil).create(any(GcsPath.class), anyString());
+ verifyNoMoreInteractions(mockGcsUtil);
+
+ assertThat(target.getName(), RegexMatcher.matches("file-" + HASH_PATTERN + ".txt"));
+ assertThat(target.getLocation(), equalTo(STAGING_PATH + '/' + target.getName()));
+ assertThat(new LineReader(Channels.newReader(pipe.source(), "UTF-8")).readLine(),
+ equalTo(contents));
+ }
+
+ @Test
+ public void testPackageUploadWithDirectorySucceeds() throws Exception {
+ Pipe pipe = Pipe.open();
+ File tmpDirectory = tmpFolder.newFolder("folder");
+ tmpFolder.newFolder("folder", "empty_directory");
+ tmpFolder.newFolder("folder", "directory");
+ makeFileWithContents("folder/file.txt", "This is a test!");
+ makeFileWithContents("folder/directory/file.txt", "This is also a test!");
+
+ when(mockGcsUtil.fileSize(any(GcsPath.class)))
+ .thenThrow(new FileNotFoundException("some/path"));
+ when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
+
+ PackageUtil.stageClasspathElements(
+ ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH);
+
+ verify(mockGcsUtil).fileSize(any(GcsPath.class));
+ verify(mockGcsUtil).create(any(GcsPath.class), anyString());
+ verifyNoMoreInteractions(mockGcsUtil);
+
+ ZipInputStream inputStream = new ZipInputStream(Channels.newInputStream(pipe.source()));
+ List<String> zipEntryNames = new ArrayList<>();
+ for (ZipEntry entry = inputStream.getNextEntry(); entry != null;
+ entry = inputStream.getNextEntry()) {
+ zipEntryNames.add(entry.getName());
+ }
+
+ assertThat(zipEntryNames,
+ containsInAnyOrder("directory/file.txt", "empty_directory/", "file.txt"));
+ }
+
+ @Test
+ public void testPackageUploadWithEmptyDirectorySucceeds() throws Exception {
+ Pipe pipe = Pipe.open();
+ File tmpDirectory = tmpFolder.newFolder("folder");
+
+ when(mockGcsUtil.fileSize(any(GcsPath.class)))
+ .thenThrow(new FileNotFoundException("some/path"));
+ when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
+
+ List<DataflowPackage> targets = PackageUtil.stageClasspathElements(
+ ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH);
+ DataflowPackage target = Iterables.getOnlyElement(targets);
+
+ verify(mockGcsUtil).fileSize(any(GcsPath.class));
+ verify(mockGcsUtil).create(any(GcsPath.class), anyString());
+ verifyNoMoreInteractions(mockGcsUtil);
+
+ assertThat(target.getName(), RegexMatcher.matches("folder-" + HASH_PATTERN + ".jar"));
+ assertThat(target.getLocation(), equalTo(STAGING_PATH + '/' + target.getName()));
+ assertNull(new ZipInputStream(Channels.newInputStream(pipe.source())).getNextEntry());
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void testPackageUploadFailsWhenIOExceptionThrown() throws Exception {
+ File tmpFile = makeFileWithContents("file.txt", "This is a test!");
+ when(mockGcsUtil.fileSize(any(GcsPath.class)))
+ .thenThrow(new FileNotFoundException("some/path"));
+ when(mockGcsUtil.create(any(GcsPath.class), anyString()))
+ .thenThrow(new IOException("Fake Exception: Upload error"));
+
+ try {
+ PackageUtil.stageClasspathElements(
+ ImmutableList.of(tmpFile.getAbsolutePath()),
+ STAGING_PATH, fastNanoClockAndSleeper);
+ } finally {
+ verify(mockGcsUtil).fileSize(any(GcsPath.class));
+ verify(mockGcsUtil, times(5)).create(any(GcsPath.class), anyString());
+ verifyNoMoreInteractions(mockGcsUtil);
+ }
+ }
+
+ @Test
+ public void testPackageUploadFailsWithPermissionsErrorGivesDetailedMessage() throws Exception {
+ File tmpFile = makeFileWithContents("file.txt", "This is a test!");
+ when(mockGcsUtil.fileSize(any(GcsPath.class)))
+ .thenThrow(new FileNotFoundException("some/path"));
+ when(mockGcsUtil.create(any(GcsPath.class), anyString()))
+ .thenThrow(new IOException("Failed to write to GCS path " + STAGING_PATH,
+ googleJsonResponseException(
+ HttpStatusCodes.STATUS_CODE_FORBIDDEN, "Permission denied", "Test message")));
+
+ try {
+ PackageUtil.stageClasspathElements(
+ ImmutableList.of(tmpFile.getAbsolutePath()),
+ STAGING_PATH, fastNanoClockAndSleeper);
+ fail("Expected RuntimeException");
+ } catch (RuntimeException e) {
+ assertTrue("Expected IOException containing detailed message.",
+ e.getCause() instanceof IOException);
+ assertThat(e.getCause().getMessage(),
+ Matchers.allOf(
+ Matchers.containsString("Uploaded failed due to permissions error"),
+ Matchers.containsString(
+ "Stale credentials can be resolved by executing 'gcloud auth login'")));
+ } finally {
+ verify(mockGcsUtil).fileSize(any(GcsPath.class));
+ verify(mockGcsUtil).create(any(GcsPath.class), anyString());
+ verifyNoMoreInteractions(mockGcsUtil);
+ }
+ }
+
+ @Test
+ public void testPackageUploadEventuallySucceeds() throws Exception {
+ Pipe pipe = Pipe.open();
+ File tmpFile = makeFileWithContents("file.txt", "This is a test!");
+ when(mockGcsUtil.fileSize(any(GcsPath.class)))
+ .thenThrow(new FileNotFoundException("some/path"));
+ when(mockGcsUtil.create(any(GcsPath.class), anyString()))
+ .thenThrow(new IOException("Fake Exception: 410 Gone")) // First attempt fails
+ .thenReturn(pipe.sink()); // second attempt succeeds
+
+ try {
+ PackageUtil.stageClasspathElements(
+ ImmutableList.of(tmpFile.getAbsolutePath()),
+ STAGING_PATH,
+ fastNanoClockAndSleeper);
+ } finally {
+ verify(mockGcsUtil).fileSize(any(GcsPath.class));
+ verify(mockGcsUtil, times(2)).create(any(GcsPath.class), anyString());
+ verifyNoMoreInteractions(mockGcsUtil);
+ }
+ }
+
+ @Test
+ public void testPackageUploadIsSkippedWhenFileAlreadyExists() throws Exception {
+ File tmpFile = makeFileWithContents("file.txt", "This is a test!");
+ when(mockGcsUtil.fileSize(any(GcsPath.class))).thenReturn(tmpFile.length());
+
+ PackageUtil.stageClasspathElements(
+ ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH);
+
+ verify(mockGcsUtil).fileSize(any(GcsPath.class));
+ verifyNoMoreInteractions(mockGcsUtil);
+ }
+
+ @Test
+ public void testPackageUploadIsNotSkippedWhenSizesAreDifferent() throws Exception {
+ Pipe pipe = Pipe.open();
+ File tmpDirectory = tmpFolder.newFolder("folder");
+ tmpFolder.newFolder("folder", "empty_directory");
+ tmpFolder.newFolder("folder", "directory");
+ makeFileWithContents("folder/file.txt", "This is a test!");
+ makeFileWithContents("folder/directory/file.txt", "This is also a test!");
+ when(mockGcsUtil.fileSize(any(GcsPath.class))).thenReturn(Long.MAX_VALUE);
+ when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
+
+ PackageUtil.stageClasspathElements(
+ ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH);
+
+ verify(mockGcsUtil).fileSize(any(GcsPath.class));
+ verify(mockGcsUtil).create(any(GcsPath.class), anyString());
+ verifyNoMoreInteractions(mockGcsUtil);
+ }
+
+ @Test
+ public void testPackageUploadWithExplicitPackageName() throws Exception {
+ Pipe pipe = Pipe.open();
+ File tmpFile = makeFileWithContents("file.txt", "This is a test!");
+ final String overriddenName = "alias.txt";
+
+ when(mockGcsUtil.fileSize(any(GcsPath.class)))
+ .thenThrow(new FileNotFoundException("some/path"));
+ when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
+
+ List<DataflowPackage> targets = PackageUtil.stageClasspathElements(
+ ImmutableList.of(overriddenName + "=" + tmpFile.getAbsolutePath()), STAGING_PATH);
+ DataflowPackage target = Iterables.getOnlyElement(targets);
+
+ verify(mockGcsUtil).fileSize(any(GcsPath.class));
+ verify(mockGcsUtil).create(any(GcsPath.class), anyString());
+ verifyNoMoreInteractions(mockGcsUtil);
+
+ assertThat(target.getName(), equalTo(overriddenName));
+ assertThat(target.getLocation(),
+ RegexMatcher.matches(STAGING_PATH + "/file-" + HASH_PATTERN + ".txt"));
+ }
+
+ @Test
+ public void testPackageUploadIsSkippedWithNonExistentResource() throws Exception {
+ String nonExistentFile =
+ IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "non-existent-file");
+ assertEquals(Collections.EMPTY_LIST, PackageUtil.stageClasspathElements(
+ ImmutableList.of(nonExistentFile), STAGING_PATH));
+ }
+
+ /**
+ * Builds a fake GoogleJsonResponseException for testing API error handling.
+ */
+ private static GoogleJsonResponseException googleJsonResponseException(
+ final int status, final String reason, final String message) throws IOException {
+ final JsonFactory jsonFactory = new JacksonFactory();
+ HttpTransport transport = new MockHttpTransport() {
+ @Override
+ public LowLevelHttpRequest buildRequest(String method, String url) throws IOException {
+ ErrorInfo errorInfo = new ErrorInfo();
+ errorInfo.setReason(reason);
+ errorInfo.setMessage(message);
+ errorInfo.setFactory(jsonFactory);
+ GenericJson error = new GenericJson();
+ error.set("code", status);
+ error.set("errors", Arrays.asList(errorInfo));
+ error.setFactory(jsonFactory);
+ GenericJson errorResponse = new GenericJson();
+ errorResponse.set("error", error);
+ errorResponse.setFactory(jsonFactory);
+ return new MockLowLevelHttpRequest().setResponse(
+ new MockLowLevelHttpResponse().setContent(errorResponse.toPrettyString())
+ .setContentType(Json.MEDIA_TYPE).setStatusCode(status));
+ }
+ };
+ HttpRequest request =
+ transport.createRequestFactory().buildGetRequest(HttpTesting.SIMPLE_GENERIC_URL);
+ request.setThrowExceptionOnExecuteError(false);
+ HttpResponse response = request.execute();
+ return GoogleJsonResponseException.from(jsonFactory, response);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/io/DataflowTextIOTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/io/DataflowTextIOTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/io/DataflowTextIOTest.java
deleted file mode 100644
index 6b9fbb4..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/io/DataflowTextIOTest.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.runners.DataflowPipelineRunner;
-import org.apache.beam.sdk.testing.TestDataflowPipelineOptions;
-import org.apache.beam.sdk.util.GcsUtil;
-import org.apache.beam.sdk.util.TestCredential;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
-
-import com.google.common.collect.ImmutableList;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.io.IOException;
-import java.nio.channels.FileChannel;
-import java.nio.channels.SeekableByteChannel;
-import java.nio.file.Files;
-import java.nio.file.StandardOpenOption;
-import java.util.List;
-
-/**
- * {@link DataflowPipelineRunner} specific tests for TextIO Read and Write transforms.
- */
-@RunWith(JUnit4.class)
-public class DataflowTextIOTest {
-
- private TestDataflowPipelineOptions buildTestPipelineOptions() {
- TestDataflowPipelineOptions options =
- PipelineOptionsFactory.as(TestDataflowPipelineOptions.class);
- options.setGcpCredential(new TestCredential());
- return options;
- }
-
- private GcsUtil buildMockGcsUtil() throws IOException {
- GcsUtil mockGcsUtil = Mockito.mock(GcsUtil.class);
-
- // Any request to open gets a new bogus channel
- Mockito
- .when(mockGcsUtil.open(Mockito.any(GcsPath.class)))
- .then(new Answer<SeekableByteChannel>() {
- @Override
- public SeekableByteChannel answer(InvocationOnMock invocation) throws Throwable {
- return FileChannel.open(
- Files.createTempFile("channel-", ".tmp"),
- StandardOpenOption.CREATE, StandardOpenOption.DELETE_ON_CLOSE);
- }
- });
-
- // Any request for expansion returns a list containing the original GcsPath
- // This is required to pass validation that occurs in TextIO during apply()
- Mockito
- .when(mockGcsUtil.expand(Mockito.any(GcsPath.class)))
- .then(new Answer<List<GcsPath>>() {
- @Override
- public List<GcsPath> answer(InvocationOnMock invocation) throws Throwable {
- return ImmutableList.of((GcsPath) invocation.getArguments()[0]);
- }
- });
-
- return mockGcsUtil;
- }
-
- /**
- * This tests a few corner cases that should not crash.
- */
- @Test
- public void testGoodWildcards() throws Exception {
- TestDataflowPipelineOptions options = buildTestPipelineOptions();
- options.setGcsUtil(buildMockGcsUtil());
-
- Pipeline pipeline = Pipeline.create(options);
-
- applyRead(pipeline, "gs://bucket/foo");
- applyRead(pipeline, "gs://bucket/foo/");
- applyRead(pipeline, "gs://bucket/foo/*");
- applyRead(pipeline, "gs://bucket/foo/?");
- applyRead(pipeline, "gs://bucket/foo/[0-9]");
- applyRead(pipeline, "gs://bucket/foo/*baz*");
- applyRead(pipeline, "gs://bucket/foo/*baz?");
- applyRead(pipeline, "gs://bucket/foo/[0-9]baz?");
- applyRead(pipeline, "gs://bucket/foo/baz/*");
- applyRead(pipeline, "gs://bucket/foo/baz/*wonka*");
- applyRead(pipeline, "gs://bucket/foo/*baz/wonka*");
- applyRead(pipeline, "gs://bucket/foo*/baz");
- applyRead(pipeline, "gs://bucket/foo?/baz");
- applyRead(pipeline, "gs://bucket/foo[0-9]/baz");
-
- // Check that running doesn't fail.
- pipeline.run();
- }
-
- private void applyRead(Pipeline pipeline, String path) {
- pipeline.apply("Read(" + path + ")", TextIO.Read.from(path));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowPipelineDebugOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowPipelineDebugOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowPipelineDebugOptionsTest.java
deleted file mode 100644
index c3f3a18..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowPipelineDebugOptionsTest.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.options;
-
-import static org.hamcrest.Matchers.hasEntry;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link DataflowPipelineDebugOptions}. */
-@RunWith(JUnit4.class)
-public class DataflowPipelineDebugOptionsTest {
- @Test
- public void testTransformNameMapping() throws Exception {
- DataflowPipelineDebugOptions options = PipelineOptionsFactory
- .fromArgs(new String[]{"--transformNameMapping={\"a\":\"b\",\"foo\":\"\",\"bar\":\"baz\"}"})
- .as(DataflowPipelineDebugOptions.class);
- assertEquals(3, options.getTransformNameMapping().size());
- assertThat(options.getTransformNameMapping(), hasEntry("a", "b"));
- assertThat(options.getTransformNameMapping(), hasEntry("foo", ""));
- assertThat(options.getTransformNameMapping(), hasEntry("bar", "baz"));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowPipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowPipelineOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowPipelineOptionsTest.java
deleted file mode 100644
index c9eac56..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowPipelineOptionsTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.options;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.beam.sdk.testing.ResetDateTimeProvider;
-import org.apache.beam.sdk.testing.RestoreSystemProperties;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestRule;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link DataflowPipelineOptions}. */
-@RunWith(JUnit4.class)
-public class DataflowPipelineOptionsTest {
- @Rule public TestRule restoreSystemProperties = new RestoreSystemProperties();
- @Rule public ResetDateTimeProvider resetDateTimeProviderRule = new ResetDateTimeProvider();
-
- @Test
- public void testJobNameIsSet() {
- DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
- options.setJobName("TestJobName");
- assertEquals("TestJobName", options.getJobName());
- }
-
- @Test
- public void testUserNameIsNotSet() {
- resetDateTimeProviderRule.setDateTimeFixed("2014-12-08T19:07:06.698Z");
- System.getProperties().remove("user.name");
- DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
- options.setAppName("TestApplication");
- assertEquals("testapplication--1208190706", options.getJobName());
- assertTrue(options.getJobName().length() <= 40);
- }
-
- @Test
- public void testAppNameAndUserNameAreLong() {
- resetDateTimeProviderRule.setDateTimeFixed("2014-12-08T19:07:06.698Z");
- System.getProperties().put("user.name", "abcdeabcdeabcdeabcdeabcdeabcde");
- DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
- options.setAppName("1234567890123456789012345678901234567890");
- assertEquals(
- "a234567890123456789012345678901234567890-abcdeabcdeabcdeabcdeabcdeabcde-1208190706",
- options.getJobName());
- }
-
- @Test
- public void testAppNameIsLong() {
- resetDateTimeProviderRule.setDateTimeFixed("2014-12-08T19:07:06.698Z");
- System.getProperties().put("user.name", "abcde");
- DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
- options.setAppName("1234567890123456789012345678901234567890");
- assertEquals("a234567890123456789012345678901234567890-abcde-1208190706", options.getJobName());
- }
-
- @Test
- public void testUserNameIsLong() {
- resetDateTimeProviderRule.setDateTimeFixed("2014-12-08T19:07:06.698Z");
- System.getProperties().put("user.name", "abcdeabcdeabcdeabcdeabcdeabcde");
- DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
- options.setAppName("1234567890");
- assertEquals("a234567890-abcdeabcdeabcdeabcdeabcdeabcde-1208190706", options.getJobName());
- }
-
- @Test
- public void testUtf8UserNameAndApplicationNameIsNormalized() {
- resetDateTimeProviderRule.setDateTimeFixed("2014-12-08T19:07:06.698Z");
- System.getProperties().put("user.name", "ði ıntəˈnæʃənəl ");
- DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
- options.setAppName("fəˈnɛtık əsoʊsiˈeıʃn");
- assertEquals("f00n0t0k00so0si0e00n-0i00nt00n000n0l0-1208190706", options.getJobName());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowProfilingOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowProfilingOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowProfilingOptionsTest.java
deleted file mode 100644
index 18c8085..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowProfilingOptionsTest.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.options;
-
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import org.hamcrest.Matchers;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link DataflowProfilingOptions}.
- */
-@RunWith(JUnit4.class)
-public class DataflowProfilingOptionsTest {
-
- private static final ObjectMapper MAPPER = new ObjectMapper();
-
- @Test
- public void testOptionsObject() throws Exception {
- DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(new String[] {
- "--enableProfilingAgent", "--profilingAgentConfiguration={\"interval\": 21}"})
- .as(DataflowPipelineOptions.class);
- assertTrue(options.getEnableProfilingAgent());
-
- String json = MAPPER.writeValueAsString(options);
- assertThat(json, Matchers.containsString(
- "\"profilingAgentConfiguration\":{\"interval\":21}"));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowWorkerLoggingOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowWorkerLoggingOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowWorkerLoggingOptionsTest.java
deleted file mode 100644
index 47d518d..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/options/DataflowWorkerLoggingOptionsTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.options;
-
-import static org.apache.beam.sdk.options.DataflowWorkerLoggingOptions.Level.WARN;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.beam.sdk.options.DataflowWorkerLoggingOptions.WorkerLogLevelOverrides;
-
-import com.google.common.collect.ImmutableMap;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link DataflowWorkerLoggingOptions}. */
-@RunWith(JUnit4.class)
-public class DataflowWorkerLoggingOptionsTest {
- private static final ObjectMapper MAPPER = new ObjectMapper();
- @Rule public ExpectedException expectedException = ExpectedException.none();
-
- @Test
- public void testWorkerLogLevelOverrideWithInvalidLogLevel() {
- expectedException.expect(IllegalArgumentException.class);
- expectedException.expectMessage("Unsupported log level");
- WorkerLogLevelOverrides.from(ImmutableMap.of("Name", "FakeLevel"));
- }
-
- @Test
- public void testWorkerLogLevelOverrideForClass() throws Exception {
- assertEquals("{\"org.junit.Test\":\"WARN\"}",
- MAPPER.writeValueAsString(
- new WorkerLogLevelOverrides().addOverrideForClass(Test.class, WARN)));
- }
-
- @Test
- public void testWorkerLogLevelOverrideForPackage() throws Exception {
- assertEquals("{\"org.junit\":\"WARN\"}",
- MAPPER.writeValueAsString(
- new WorkerLogLevelOverrides().addOverrideForPackage(Test.class.getPackage(), WARN)));
- }
-
- @Test
- public void testWorkerLogLevelOverrideForName() throws Exception {
- assertEquals("{\"A\":\"WARN\"}",
- MAPPER.writeValueAsString(
- new WorkerLogLevelOverrides().addOverrideForName("A", WARN)));
- }
-
- @Test
- public void testSerializationAndDeserializationOf() throws Exception {
- String testValue = "{\"A\":\"WARN\"}";
- assertEquals(testValue,
- MAPPER.writeValueAsString(
- MAPPER.readValue(testValue, WorkerLogLevelOverrides.class)));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/BlockingDataflowPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/BlockingDataflowPipelineRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/BlockingDataflowPipelineRunnerTest.java
deleted file mode 100644
index 13e120b..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/BlockingDataflowPipelineRunnerTest.java
+++ /dev/null
@@ -1,302 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners;
-
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.isA;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineResult.State;
-import org.apache.beam.sdk.options.DataflowPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.ExpectedLogs;
-import org.apache.beam.sdk.testing.TestDataflowPipelineOptions;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.util.MonitoringUtil;
-import org.apache.beam.sdk.util.NoopPathValidator;
-import org.apache.beam.sdk.util.TestCredential;
-
-import org.hamcrest.Description;
-import org.hamcrest.Factory;
-import org.hamcrest.Matcher;
-import org.hamcrest.TypeSafeMatcher;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * Tests for BlockingDataflowPipelineRunner.
- */
-@RunWith(JUnit4.class)
-public class BlockingDataflowPipelineRunnerTest {
-
- @Rule
- public ExpectedLogs expectedLogs = ExpectedLogs.none(BlockingDataflowPipelineRunner.class);
-
- @Rule
- public ExpectedException expectedThrown = ExpectedException.none();
-
- /**
- * A {@link Matcher} for a {@link DataflowJobException} that applies an underlying {@link Matcher}
- * to the {@link DataflowPipelineJob} returned by {@link DataflowJobException#getJob()}.
- */
- private static class DataflowJobExceptionMatcher<T extends DataflowJobException>
- extends TypeSafeMatcher<T> {
-
- private final Matcher<DataflowPipelineJob> matcher;
-
- public DataflowJobExceptionMatcher(Matcher<DataflowPipelineJob> matcher) {
- this.matcher = matcher;
- }
-
- @Override
- public boolean matchesSafely(T ex) {
- return matcher.matches(ex.getJob());
- }
-
- @Override
- protected void describeMismatchSafely(T item, Description description) {
- description.appendText("job ");
- matcher.describeMismatch(item.getMessage(), description);
- }
-
- @Override
- public void describeTo(Description description) {
- description.appendText("exception with job matching ");
- description.appendDescriptionOf(matcher);
- }
-
- @Factory
- public static <T extends DataflowJobException> Matcher<T> expectJob(
- Matcher<DataflowPipelineJob> matcher) {
- return new DataflowJobExceptionMatcher<T>(matcher);
- }
- }
-
- /**
- * A {@link Matcher} for a {@link DataflowPipelineJob} that applies an underlying {@link Matcher}
- * to the return value of {@link DataflowPipelineJob#getJobId()}.
- */
- private static class JobIdMatcher<T extends DataflowPipelineJob> extends TypeSafeMatcher<T> {
-
- private final Matcher<String> matcher;
-
- public JobIdMatcher(Matcher<String> matcher) {
- this.matcher = matcher;
- }
-
- @Override
- public boolean matchesSafely(T job) {
- return matcher.matches(job.getJobId());
- }
-
- @Override
- protected void describeMismatchSafely(T item, Description description) {
- description.appendText("jobId ");
- matcher.describeMismatch(item.getJobId(), description);
- }
-
- @Override
- public void describeTo(Description description) {
- description.appendText("job with jobId ");
- description.appendDescriptionOf(matcher);
- }
-
- @Factory
- public static <T extends DataflowPipelineJob> Matcher<T> expectJobId(final String jobId) {
- return new JobIdMatcher<T>(equalTo(jobId));
- }
- }
-
- /**
- * A {@link Matcher} for a {@link DataflowJobUpdatedException} that applies an underlying
- * {@link Matcher} to the {@link DataflowPipelineJob} returned by
- * {@link DataflowJobUpdatedException#getReplacedByJob()}.
- */
- private static class ReplacedByJobMatcher<T extends DataflowJobUpdatedException>
- extends TypeSafeMatcher<T> {
-
- private final Matcher<DataflowPipelineJob> matcher;
-
- public ReplacedByJobMatcher(Matcher<DataflowPipelineJob> matcher) {
- this.matcher = matcher;
- }
-
- @Override
- public boolean matchesSafely(T ex) {
- return matcher.matches(ex.getReplacedByJob());
- }
-
- @Override
- protected void describeMismatchSafely(T item, Description description) {
- description.appendText("job ");
- matcher.describeMismatch(item.getMessage(), description);
- }
-
- @Override
- public void describeTo(Description description) {
- description.appendText("exception with replacedByJob() ");
- description.appendDescriptionOf(matcher);
- }
-
- @Factory
- public static <T extends DataflowJobUpdatedException> Matcher<T> expectReplacedBy(
- Matcher<DataflowPipelineJob> matcher) {
- return new ReplacedByJobMatcher<T>(matcher);
- }
- }
-
- /**
- * Creates a mocked {@link DataflowPipelineJob} with the given {@code projectId} and {@code jobId}
- * that will immediately terminate in the provided {@code terminalState}.
- *
- * <p>The return value may be further mocked.
- */
- private DataflowPipelineJob createMockJob(
- String projectId, String jobId, State terminalState) throws Exception {
- DataflowPipelineJob mockJob = mock(DataflowPipelineJob.class);
- when(mockJob.getProjectId()).thenReturn(projectId);
- when(mockJob.getJobId()).thenReturn(jobId);
- when(mockJob.waitToFinish(
- anyLong(), isA(TimeUnit.class), isA(MonitoringUtil.JobMessagesHandler.class)))
- .thenReturn(terminalState);
- return mockJob;
- }
-
- /**
- * Returns a {@link BlockingDataflowPipelineRunner} that will return the provided a job to return.
- * Some {@link PipelineOptions} will be extracted from the job, such as the project ID.
- */
- private BlockingDataflowPipelineRunner createMockRunner(DataflowPipelineJob job)
- throws Exception {
- DataflowPipelineRunner mockRunner = mock(DataflowPipelineRunner.class);
- TestDataflowPipelineOptions options =
- PipelineOptionsFactory.as(TestDataflowPipelineOptions.class);
- options.setProject(job.getProjectId());
-
- when(mockRunner.run(isA(Pipeline.class))).thenReturn(job);
-
- return new BlockingDataflowPipelineRunner(mockRunner, options);
- }
-
- /**
- * Tests that the {@link BlockingDataflowPipelineRunner} returns normally when a job terminates in
- * the {@link State#DONE DONE} state.
- */
- @Test
- public void testJobDoneComplete() throws Exception {
- createMockRunner(createMockJob("testJobDone-projectId", "testJobDone-jobId", State.DONE))
- .run(TestPipeline.create());
- expectedLogs.verifyInfo("Job finished with status DONE");
- }
-
- /**
- * Tests that the {@link BlockingDataflowPipelineRunner} throws the appropriate exception
- * when a job terminates in the {@link State#FAILED FAILED} state.
- */
- @Test
- public void testFailedJobThrowsException() throws Exception {
- expectedThrown.expect(DataflowJobExecutionException.class);
- expectedThrown.expect(DataflowJobExceptionMatcher.expectJob(
- JobIdMatcher.expectJobId("testFailedJob-jobId")));
- createMockRunner(createMockJob("testFailedJob-projectId", "testFailedJob-jobId", State.FAILED))
- .run(TestPipeline.create());
- }
-
- /**
- * Tests that the {@link BlockingDataflowPipelineRunner} throws the appropriate exception
- * when a job terminates in the {@link State#CANCELLED CANCELLED} state.
- */
- @Test
- public void testCancelledJobThrowsException() throws Exception {
- expectedThrown.expect(DataflowJobCancelledException.class);
- expectedThrown.expect(DataflowJobExceptionMatcher.expectJob(
- JobIdMatcher.expectJobId("testCancelledJob-jobId")));
- createMockRunner(
- createMockJob("testCancelledJob-projectId", "testCancelledJob-jobId", State.CANCELLED))
- .run(TestPipeline.create());
- }
-
- /**
- * Tests that the {@link BlockingDataflowPipelineRunner} throws the appropriate exception
- * when a job terminates in the {@link State#UPDATED UPDATED} state.
- */
- @Test
- public void testUpdatedJobThrowsException() throws Exception {
- expectedThrown.expect(DataflowJobUpdatedException.class);
- expectedThrown.expect(DataflowJobExceptionMatcher.expectJob(
- JobIdMatcher.expectJobId("testUpdatedJob-jobId")));
- expectedThrown.expect(ReplacedByJobMatcher.expectReplacedBy(
- JobIdMatcher.expectJobId("testUpdatedJob-replacedByJobId")));
- DataflowPipelineJob job =
- createMockJob("testUpdatedJob-projectId", "testUpdatedJob-jobId", State.UPDATED);
- DataflowPipelineJob replacedByJob =
- createMockJob("testUpdatedJob-projectId", "testUpdatedJob-replacedByJobId", State.DONE);
- when(job.getReplacedByJob()).thenReturn(replacedByJob);
- createMockRunner(job).run(TestPipeline.create());
- }
-
- /**
- * Tests that the {@link BlockingDataflowPipelineRunner} throws the appropriate exception
- * when a job terminates in the {@link State#UNKNOWN UNKNOWN} state, indicating that the
- * Dataflow service returned a state that the SDK is unfamiliar with (possibly because it
- * is an old SDK relative the service).
- */
- @Test
- public void testUnknownJobThrowsException() throws Exception {
- expectedThrown.expect(IllegalStateException.class);
- createMockRunner(
- createMockJob("testUnknownJob-projectId", "testUnknownJob-jobId", State.UNKNOWN))
- .run(TestPipeline.create());
- }
-
- /**
- * Tests that the {@link BlockingDataflowPipelineRunner} throws the appropriate exception
- * when a job returns a {@code null} state, indicating that it failed to contact the service,
- * including all of its built-in resilience logic.
- */
- @Test
- public void testNullJobThrowsException() throws Exception {
- expectedThrown.expect(DataflowServiceException.class);
- expectedThrown.expect(DataflowJobExceptionMatcher.expectJob(
- JobIdMatcher.expectJobId("testNullJob-jobId")));
- createMockRunner(createMockJob("testNullJob-projectId", "testNullJob-jobId", null))
- .run(TestPipeline.create());
- }
-
- @Test
- public void testToString() {
- DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
- options.setJobName("TestJobName");
- options.setProject("test-project");
- options.setTempLocation("gs://test/temp/location");
- options.setGcpCredential(new TestCredential());
- options.setPathValidatorClass(NoopPathValidator.class);
- assertEquals("BlockingDataflowPipelineRunner#testjobname",
- BlockingDataflowPipelineRunner.fromOptions(options).toString());
- }
-}