You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/04/12 01:42:39 UTC
[05/18] incubator-beam git commit: [BEAM-151] Move a large portion of
the Dataflow runner to separate maven module
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunner.java
deleted file mode 100644
index e961066..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunner.java
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.testing;
-
-import com.google.api.services.dataflow.model.JobMessage;
-import com.google.api.services.dataflow.model.JobMetrics;
-import com.google.api.services.dataflow.model.MetricUpdate;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.PipelineResult.State;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.runners.DataflowJobExecutionException;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineJob;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
-import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.util.MonitoringUtil;
-import com.google.cloud.dataflow.sdk.util.MonitoringUtil.JobMessagesHandler;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.cloud.dataflow.sdk.values.POutput;
-import com.google.common.base.Optional;
-import com.google.common.base.Throwables;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-/**
- * {@link TestDataflowPipelineRunner} is a pipeline runner that wraps a
- * {@link DataflowPipelineRunner} when running tests against the {@link TestPipeline}.
- *
- * @see TestPipeline
- */
-public class TestDataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob> {
- private static final String TENTATIVE_COUNTER = "tentative";
- private static final Logger LOG = LoggerFactory.getLogger(TestDataflowPipelineRunner.class);
-
- private final TestDataflowPipelineOptions options;
- private final DataflowPipelineRunner runner;
- private int expectedNumberOfAssertions = 0;
-
- TestDataflowPipelineRunner(TestDataflowPipelineOptions options) {
- this.options = options;
- this.runner = DataflowPipelineRunner.fromOptions(options);
- }
-
- /**
- * Constructs a runner from the provided options.
- */
- public static TestDataflowPipelineRunner fromOptions(
- PipelineOptions options) {
- TestDataflowPipelineOptions dataflowOptions = options.as(TestDataflowPipelineOptions.class);
-
- return new TestDataflowPipelineRunner(dataflowOptions);
- }
-
- @Override
- public DataflowPipelineJob run(Pipeline pipeline) {
- return run(pipeline, runner);
- }
-
- DataflowPipelineJob run(Pipeline pipeline, DataflowPipelineRunner runner) {
-
- final DataflowPipelineJob job;
- try {
- job = runner.run(pipeline);
- } catch (DataflowJobExecutionException ex) {
- throw new IllegalStateException("The dataflow failed.");
- }
-
- LOG.info("Running Dataflow job {} with {} expected assertions.",
- job.getJobId(), expectedNumberOfAssertions);
-
- CancelWorkflowOnError messageHandler = new CancelWorkflowOnError(
- job, new MonitoringUtil.PrintHandler(options.getJobMessageOutput()));
-
- try {
- final Optional<Boolean> result;
-
- if (options.isStreaming()) {
- Future<Optional<Boolean>> resultFuture = options.getExecutorService().submit(
- new Callable<Optional<Boolean>>() {
- @Override
- public Optional<Boolean> call() throws Exception {
- try {
- for (;;) {
- Optional<Boolean> result = checkForSuccess(job);
- if (result.isPresent()) {
- return result;
- }
- Thread.sleep(10000L);
- }
- } finally {
- LOG.info("Cancelling Dataflow job {}", job.getJobId());
- job.cancel();
- }
- }
- });
- State finalState = job.waitToFinish(10L, TimeUnit.MINUTES, messageHandler);
- if (finalState == null || finalState == State.RUNNING) {
- LOG.info("Dataflow job {} took longer than 10 minutes to complete, cancelling.",
- job.getJobId());
- job.cancel();
- }
- result = resultFuture.get();
- } else {
- job.waitToFinish(-1, TimeUnit.SECONDS, messageHandler);
- result = checkForSuccess(job);
- }
- if (!result.isPresent()) {
- throw new IllegalStateException(
- "The dataflow did not output a success or failure metric.");
- } else if (!result.get()) {
- throw new AssertionError(messageHandler.getErrorMessage() == null ?
- "The dataflow did not return a failure reason."
- : messageHandler.getErrorMessage());
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- } catch (ExecutionException e) {
- Throwables.propagateIfPossible(e.getCause());
- throw new RuntimeException(e.getCause());
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- return job;
- }
-
- @Override
- public <OutputT extends POutput, InputT extends PInput> OutputT apply(
- PTransform<InputT, OutputT> transform, InputT input) {
- if (transform instanceof PAssert.OneSideInputAssert
- || transform instanceof PAssert.TwoSideInputAssert) {
- expectedNumberOfAssertions += 1;
- }
-
- return runner.apply(transform, input);
- }
-
- Optional<Boolean> checkForSuccess(DataflowPipelineJob job)
- throws IOException {
- State state = job.getState();
- if (state == State.FAILED || state == State.CANCELLED) {
- LOG.info("The pipeline failed");
- return Optional.of(false);
- }
-
- JobMetrics metrics = job.getDataflowClient().projects().jobs()
- .getMetrics(job.getProjectId(), job.getJobId()).execute();
-
- if (metrics == null || metrics.getMetrics() == null) {
- LOG.warn("Metrics not present for Dataflow job {}.", job.getJobId());
- } else {
- int successes = 0;
- int failures = 0;
- for (MetricUpdate metric : metrics.getMetrics()) {
- if (metric.getName() == null || metric.getName().getContext() == null
- || !metric.getName().getContext().containsKey(TENTATIVE_COUNTER)) {
- // Don't double count using the non-tentative version of the metric.
- continue;
- }
- if (PAssert.SUCCESS_COUNTER.equals(metric.getName().getName())) {
- successes += ((BigDecimal) metric.getScalar()).intValue();
- } else if (PAssert.FAILURE_COUNTER.equals(metric.getName().getName())) {
- failures += ((BigDecimal) metric.getScalar()).intValue();
- }
- }
-
- if (failures > 0) {
- LOG.info("Found result while running Dataflow job {}. Found {} success, {} failures out of "
- + "{} expected assertions.", job.getJobId(), successes, failures,
- expectedNumberOfAssertions);
- return Optional.of(false);
- } else if (successes >= expectedNumberOfAssertions) {
- LOG.info("Found result while running Dataflow job {}. Found {} success, {} failures out of "
- + "{} expected assertions.", job.getJobId(), successes, failures,
- expectedNumberOfAssertions);
- return Optional.of(true);
- }
-
- LOG.info("Running Dataflow job {}. Found {} success, {} failures out of {} expected "
- + "assertions.", job.getJobId(), successes, failures, expectedNumberOfAssertions);
- }
-
- return Optional.<Boolean>absent();
- }
-
- @Override
- public String toString() {
- return "TestDataflowPipelineRunner#" + options.getAppName();
- }
-
- /**
- * Cancels the workflow on the first error message it sees.
- *
- * <p>Creates an error message representing the concatenation of all error messages seen.
- */
- private static class CancelWorkflowOnError implements JobMessagesHandler {
- private final DataflowPipelineJob job;
- private final JobMessagesHandler messageHandler;
- private final StringBuffer errorMessage;
- private CancelWorkflowOnError(DataflowPipelineJob job, JobMessagesHandler messageHandler) {
- this.job = job;
- this.messageHandler = messageHandler;
- this.errorMessage = new StringBuffer();
- }
-
- @Override
- public void process(List<JobMessage> messages) {
- messageHandler.process(messages);
- for (JobMessage message : messages) {
- if (message.getMessageImportance() != null
- && message.getMessageImportance().equals("JOB_MESSAGE_ERROR")) {
- LOG.info("Dataflow job {} threw exception. Failure message was: {}",
- job.getJobId(), message.getMessageText());
- errorMessage.append(message.getMessageText());
- }
- }
- if (errorMessage.length() > 0) {
- LOG.info("Cancelling Dataflow job {}", job.getJobId());
- try {
- job.cancel();
- } catch (Exception ignore) {
- // The TestDataflowPipelineRunner will thrown an AssertionError with the job failure
- // messages.
- }
- }
- }
-
- private String getErrorMessage() {
- return errorMessage.toString();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowPathValidator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowPathValidator.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowPathValidator.java
deleted file mode 100644
index aeb864a..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowPathValidator.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
-import com.google.common.base.Preconditions;
-
-import java.io.IOException;
-
-/**
- * GCP implementation of {@link PathValidator}. Only GCS paths are allowed.
- */
-public class DataflowPathValidator implements PathValidator {
-
- private DataflowPipelineOptions dataflowOptions;
-
- DataflowPathValidator(DataflowPipelineOptions options) {
- this.dataflowOptions = options;
- }
-
- public static DataflowPathValidator fromOptions(PipelineOptions options) {
- return new DataflowPathValidator(options.as(DataflowPipelineOptions.class));
- }
-
- /**
- * Validates the the input GCS path is accessible and that the path
- * is well formed.
- */
- @Override
- public String validateInputFilePatternSupported(String filepattern) {
- GcsPath gcsPath = getGcsPath(filepattern);
- Preconditions.checkArgument(
- dataflowOptions.getGcsUtil().isGcsPatternSupported(gcsPath.getObject()));
- String returnValue = verifyPath(filepattern);
- verifyPathIsAccessible(filepattern, "Could not find file %s");
- return returnValue;
- }
-
- /**
- * Validates the the output GCS path is accessible and that the path
- * is well formed.
- */
- @Override
- public String validateOutputFilePrefixSupported(String filePrefix) {
- String returnValue = verifyPath(filePrefix);
- verifyPathIsAccessible(filePrefix, "Output path does not exist or is not writeable: %s");
- return returnValue;
- }
-
- @Override
- public String verifyPath(String path) {
- GcsPath gcsPath = getGcsPath(path);
- Preconditions.checkArgument(gcsPath.isAbsolute(),
- "Must provide absolute paths for Dataflow");
- Preconditions.checkArgument(!gcsPath.getObject().contains("//"),
- "Dataflow Service does not allow objects with consecutive slashes");
- return gcsPath.toResourceName();
- }
-
- private void verifyPathIsAccessible(String path, String errorMessage) {
- GcsPath gcsPath = getGcsPath(path);
- try {
- Preconditions.checkArgument(dataflowOptions.getGcsUtil().bucketExists(gcsPath),
- errorMessage, path);
- } catch (IOException e) {
- throw new RuntimeException(
- String.format("Unable to verify that GCS bucket gs://%s exists.", gcsPath.getBucket()),
- e);
- }
- }
-
- private GcsPath getGcsPath(String path) {
- try {
- return GcsPath.fromUri(path);
- } catch (IllegalArgumentException e) {
- throw new IllegalArgumentException(String.format(
- "%s expected a valid 'gs://' path but was given '%s'",
- dataflowOptions.getRunner().getSimpleName(), path), e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowTransport.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowTransport.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowTransport.java
deleted file mode 100644
index 18e6654..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowTransport.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import static com.google.cloud.dataflow.sdk.util.Transport.getJsonFactory;
-import static com.google.cloud.dataflow.sdk.util.Transport.getTransport;
-
-import com.google.api.client.auth.oauth2.Credential;
-import com.google.api.client.http.HttpRequestInitializer;
-import com.google.api.services.clouddebugger.v2.Clouddebugger;
-import com.google.api.services.dataflow.Dataflow;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
-import com.google.common.collect.ImmutableList;
-
-import java.net.MalformedURLException;
-import java.net.URL;
-
-/**
- * Helpers for cloud communication.
- */
-public class DataflowTransport {
-
-
- private static class ApiComponents {
- public String rootUrl;
- public String servicePath;
-
- public ApiComponents(String root, String path) {
- this.rootUrl = root;
- this.servicePath = path;
- }
- }
-
- private static ApiComponents apiComponentsFromUrl(String urlString) {
- try {
- URL url = new URL(urlString);
- String rootUrl = url.getProtocol() + "://" + url.getHost() +
- (url.getPort() > 0 ? ":" + url.getPort() : "");
- return new ApiComponents(rootUrl, url.getPath());
- } catch (MalformedURLException e) {
- throw new RuntimeException("Invalid URL: " + urlString);
- }
- }
-
- /**
- * Returns a Google Cloud Dataflow client builder.
- */
- public static Dataflow.Builder newDataflowClient(DataflowPipelineOptions options) {
- String servicePath = options.getDataflowEndpoint();
- ApiComponents components;
- if (servicePath.contains("://")) {
- components = apiComponentsFromUrl(servicePath);
- } else {
- components = new ApiComponents(options.getApiRootUrl(), servicePath);
- }
-
- return new Dataflow.Builder(getTransport(),
- getJsonFactory(),
- chainHttpRequestInitializer(
- options.getGcpCredential(),
- // Do not log 404. It clutters the output and is possibly even required by the caller.
- new RetryHttpRequestInitializer(ImmutableList.of(404))))
- .setApplicationName(options.getAppName())
- .setRootUrl(components.rootUrl)
- .setServicePath(components.servicePath)
- .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
- }
-
- public static Clouddebugger.Builder newClouddebuggerClient(DataflowPipelineOptions options) {
- return new Clouddebugger.Builder(getTransport(),
- getJsonFactory(),
- chainHttpRequestInitializer(options.getGcpCredential(), new RetryHttpRequestInitializer()))
- .setApplicationName(options.getAppName())
- .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
- }
-
- /**
- * Returns a Dataflow client that does not automatically retry failed
- * requests.
- */
- public static Dataflow.Builder
- newRawDataflowClient(DataflowPipelineOptions options) {
- return newDataflowClient(options)
- .setHttpRequestInitializer(options.getGcpCredential())
- .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
- }
-
- private static HttpRequestInitializer chainHttpRequestInitializer(
- Credential credential, HttpRequestInitializer httpRequestInitializer) {
- if (credential == null) {
- return httpRequestInitializer;
- } else {
- return new ChainingHttpRequestInitializer(credential, httpRequestInitializer);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/GcsStager.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/GcsStager.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/GcsStager.java
deleted file mode 100644
index 7307e83..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/GcsStager.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.api.services.dataflow.model.DataflowPackage;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineDebugOptions;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.common.base.Preconditions;
-
-import java.util.List;
-
-/**
- * Utility class for staging files to GCS.
- */
-public class GcsStager implements Stager {
- private DataflowPipelineOptions options;
-
- private GcsStager(DataflowPipelineOptions options) {
- this.options = options;
- }
-
- public static GcsStager fromOptions(PipelineOptions options) {
- return new GcsStager(options.as(DataflowPipelineOptions.class));
- }
-
- @Override
- public List<DataflowPackage> stageFiles() {
- Preconditions.checkNotNull(options.getStagingLocation());
- List<String> filesToStage = options.getFilesToStage();
- String windmillBinary =
- options.as(DataflowPipelineDebugOptions.class).getOverrideWindmillBinary();
- if (windmillBinary != null) {
- filesToStage.add("windmill_main=" + windmillBinary);
- }
- return PackageUtil.stageClasspathElements(
- options.getFilesToStage(), options.getStagingLocation());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/MonitoringUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/MonitoringUtil.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/MonitoringUtil.java
deleted file mode 100644
index 2c06a92..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/MonitoringUtil.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import static com.google.cloud.dataflow.sdk.util.TimeUtil.fromCloudTime;
-
-import com.google.api.services.dataflow.Dataflow;
-import com.google.api.services.dataflow.Dataflow.Projects.Jobs.Messages;
-import com.google.api.services.dataflow.model.JobMessage;
-import com.google.api.services.dataflow.model.ListJobMessagesResponse;
-import com.google.cloud.dataflow.sdk.PipelineResult.State;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.ImmutableMap;
-
-import org.joda.time.Instant;
-
-import java.io.IOException;
-import java.io.PrintStream;
-import java.io.UnsupportedEncodingException;
-import java.net.URLEncoder;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-
-import javax.annotation.Nullable;
-
-/**
- * A helper class for monitoring jobs submitted to the service.
- */
-public final class MonitoringUtil {
-
- private static final String GCLOUD_DATAFLOW_PREFIX = "gcloud alpha dataflow";
- private static final String ENDPOINT_OVERRIDE_ENV_VAR =
- "CLOUDSDK_API_ENDPOINT_OVERRIDES_DATAFLOW";
-
- private static final Map<String, State> DATAFLOW_STATE_TO_JOB_STATE =
- ImmutableMap
- .<String, State>builder()
- .put("JOB_STATE_UNKNOWN", State.UNKNOWN)
- .put("JOB_STATE_STOPPED", State.STOPPED)
- .put("JOB_STATE_RUNNING", State.RUNNING)
- .put("JOB_STATE_DONE", State.DONE)
- .put("JOB_STATE_FAILED", State.FAILED)
- .put("JOB_STATE_CANCELLED", State.CANCELLED)
- .put("JOB_STATE_UPDATED", State.UPDATED)
- .build();
-
- private String projectId;
- private Messages messagesClient;
-
- /**
- * An interface that can be used for defining callbacks to receive a list
- * of JobMessages containing monitoring information.
- */
- public interface JobMessagesHandler {
- /** Process the rows. */
- void process(List<JobMessage> messages);
- }
-
- /** A handler that prints monitoring messages to a stream. */
- public static class PrintHandler implements JobMessagesHandler {
- private PrintStream out;
-
- /**
- * Construct the handler.
- *
- * @param stream The stream to write the messages to.
- */
- public PrintHandler(PrintStream stream) {
- out = stream;
- }
-
- @Override
- public void process(List<JobMessage> messages) {
- for (JobMessage message : messages) {
- if (message.getMessageText() == null || message.getMessageText().isEmpty()) {
- continue;
- }
- String importanceString = null;
- if (message.getMessageImportance() == null) {
- continue;
- } else if (message.getMessageImportance().equals("JOB_MESSAGE_ERROR")) {
- importanceString = "Error: ";
- } else if (message.getMessageImportance().equals("JOB_MESSAGE_WARNING")) {
- importanceString = "Warning: ";
- } else if (message.getMessageImportance().equals("JOB_MESSAGE_BASIC")) {
- importanceString = "Basic: ";
- } else if (message.getMessageImportance().equals("JOB_MESSAGE_DETAILED")) {
- importanceString = "Detail: ";
- } else {
- // TODO: Remove filtering here once getJobMessages supports minimum
- // importance.
- continue;
- }
- @Nullable Instant time = TimeUtil.fromCloudTime(message.getTime());
- if (time == null) {
- out.print("UNKNOWN TIMESTAMP: ");
- } else {
- out.print(time + ": ");
- }
- if (importanceString != null) {
- out.print(importanceString);
- }
- out.println(message.getMessageText());
- }
- out.flush();
- }
- }
-
- /** Construct a helper for monitoring. */
- public MonitoringUtil(String projectId, Dataflow dataflow) {
- this(projectId, dataflow.projects().jobs().messages());
- }
-
- // @VisibleForTesting
- MonitoringUtil(String projectId, Messages messagesClient) {
- this.projectId = projectId;
- this.messagesClient = messagesClient;
- }
-
- /**
- * Comparator for sorting rows in increasing order based on timestamp.
- */
- public static class TimeStampComparator implements Comparator<JobMessage> {
- @Override
- public int compare(JobMessage o1, JobMessage o2) {
- @Nullable Instant t1 = fromCloudTime(o1.getTime());
- if (t1 == null) {
- return -1;
- }
- @Nullable Instant t2 = fromCloudTime(o2.getTime());
- if (t2 == null) {
- return 1;
- }
- return t1.compareTo(t2);
- }
- }
-
- /**
- * Return job messages sorted in ascending order by timestamp.
- * @param jobId The id of the job to get the messages for.
- * @param startTimestampMs Return only those messages with a
- * timestamp greater than this value.
- * @return collection of messages
- * @throws IOException
- */
- public ArrayList<JobMessage> getJobMessages(
- String jobId, long startTimestampMs) throws IOException {
- // TODO: Allow filtering messages by importance
- Instant startTimestamp = new Instant(startTimestampMs);
- ArrayList<JobMessage> allMessages = new ArrayList<>();
- String pageToken = null;
- while (true) {
- Messages.List listRequest = messagesClient.list(projectId, jobId);
- if (pageToken != null) {
- listRequest.setPageToken(pageToken);
- }
- ListJobMessagesResponse response = listRequest.execute();
-
- if (response == null || response.getJobMessages() == null) {
- return allMessages;
- }
-
- for (JobMessage m : response.getJobMessages()) {
- @Nullable Instant timestamp = fromCloudTime(m.getTime());
- if (timestamp == null) {
- continue;
- }
- if (timestamp.isAfter(startTimestamp)) {
- allMessages.add(m);
- }
- }
-
- if (response.getNextPageToken() == null) {
- break;
- } else {
- pageToken = response.getNextPageToken();
- }
- }
-
- Collections.sort(allMessages, new TimeStampComparator());
- return allMessages;
- }
-
- public static String getJobMonitoringPageURL(String projectName, String jobId) {
- try {
- // Project name is allowed in place of the project id: the user will be redirected to a URL
- // that has the project name replaced with project id.
- return String.format(
- "https://console.developers.google.com/project/%s/dataflow/job/%s",
- URLEncoder.encode(projectName, "UTF-8"),
- URLEncoder.encode(jobId, "UTF-8"));
- } catch (UnsupportedEncodingException e) {
- // Should never happen.
- throw new AssertionError("UTF-8 encoding is not supported by the environment", e);
- }
- }
-
- public static String getGcloudCancelCommand(DataflowPipelineOptions options, String jobId) {
-
- // If using a different Dataflow API than default, prefix command with an API override.
- String dataflowApiOverridePrefix = "";
- String apiUrl = options.getDataflowClient().getBaseUrl();
- if (!apiUrl.equals(Dataflow.DEFAULT_BASE_URL)) {
- dataflowApiOverridePrefix = String.format("%s=%s ", ENDPOINT_OVERRIDE_ENV_VAR, apiUrl);
- }
-
- // Assemble cancel command from optional prefix and project/job parameters.
- return String.format("%s%s jobs --project=%s cancel %s",
- dataflowApiOverridePrefix, GCLOUD_DATAFLOW_PREFIX, options.getProject(), jobId);
- }
-
- public static State toState(String stateName) {
- return MoreObjects.firstNonNull(DATAFLOW_STATE_TO_JOB_STATE.get(stateName),
- State.UNKNOWN);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/PackageUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/PackageUtil.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/PackageUtil.java
deleted file mode 100644
index 0e234a8..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/PackageUtil.java
+++ /dev/null
@@ -1,328 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.Sleeper;
-import com.google.api.services.dataflow.model.DataflowPackage;
-import com.google.cloud.hadoop.util.ApiErrorExtractor;
-import com.google.common.hash.Funnels;
-import com.google.common.hash.Hasher;
-import com.google.common.hash.Hashing;
-import com.google.common.io.CountingOutputStream;
-import com.google.common.io.Files;
-
-import com.fasterxml.jackson.core.Base64Variants;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Objects;
-
-/** Helper routines for packages. */
-public class PackageUtil {
- private static final Logger LOG = LoggerFactory.getLogger(PackageUtil.class);
- /**
- * A reasonable upper bound on the number of jars required to launch a Dataflow job.
- */
- public static final int SANE_CLASSPATH_SIZE = 1000;
- /**
- * The initial interval to use between package staging attempts.
- */
- private static final long INITIAL_BACKOFF_INTERVAL_MS = 5000L;
- /**
- * The maximum number of attempts when staging a file.
- */
- private static final int MAX_ATTEMPTS = 5;
-
- /**
- * Translates exceptions from API calls.
- */
- private static final ApiErrorExtractor ERROR_EXTRACTOR = new ApiErrorExtractor();
-
- /**
- * Creates a DataflowPackage containing information about how a classpath element should be
- * staged, including the staging destination as well as its size and hash.
- *
- * @param classpathElement The local path for the classpath element.
- * @param stagingPath The base location for staged classpath elements.
- * @param overridePackageName If non-null, use the given value as the package name
- * instead of generating one automatically.
- * @return The package.
- */
- @Deprecated
- public static DataflowPackage createPackage(File classpathElement,
- String stagingPath, String overridePackageName) {
- return createPackageAttributes(classpathElement, stagingPath, overridePackageName)
- .getDataflowPackage();
- }
-
- /**
- * Compute and cache the attributes of a classpath element that we will need to stage it.
- *
- * @param classpathElement the file or directory to be staged.
- * @param stagingPath The base location for staged classpath elements.
- * @param overridePackageName If non-null, use the given value as the package name
- * instead of generating one automatically.
- * @return a {@link PackageAttributes} that containing metadata about the object to be staged.
- */
- static PackageAttributes createPackageAttributes(File classpathElement,
- String stagingPath, String overridePackageName) {
- try {
- boolean directory = classpathElement.isDirectory();
-
- // Compute size and hash in one pass over file or directory.
- Hasher hasher = Hashing.md5().newHasher();
- OutputStream hashStream = Funnels.asOutputStream(hasher);
- CountingOutputStream countingOutputStream = new CountingOutputStream(hashStream);
-
- if (!directory) {
- // Files are staged as-is.
- Files.asByteSource(classpathElement).copyTo(countingOutputStream);
- } else {
- // Directories are recursively zipped.
- ZipFiles.zipDirectory(classpathElement, countingOutputStream);
- }
-
- long size = countingOutputStream.getCount();
- String hash = Base64Variants.MODIFIED_FOR_URL.encode(hasher.hash().asBytes());
-
- // Create the DataflowPackage with staging name and location.
- String uniqueName = getUniqueContentName(classpathElement, hash);
- String resourcePath = IOChannelUtils.resolve(stagingPath, uniqueName);
- DataflowPackage target = new DataflowPackage();
- target.setName(overridePackageName != null ? overridePackageName : uniqueName);
- target.setLocation(resourcePath);
-
- return new PackageAttributes(size, hash, directory, target);
- } catch (IOException e) {
- throw new RuntimeException("Package setup failure for " + classpathElement, e);
- }
- }
-
- /**
- * Transfers the classpath elements to the staging location.
- *
- * @param classpathElements The elements to stage.
- * @param stagingPath The base location to stage the elements to.
- * @return A list of cloud workflow packages, each representing a classpath element.
- */
- public static List<DataflowPackage> stageClasspathElements(
- Collection<String> classpathElements, String stagingPath) {
- return stageClasspathElements(classpathElements, stagingPath, Sleeper.DEFAULT);
- }
-
- // Visible for testing.
- static List<DataflowPackage> stageClasspathElements(
- Collection<String> classpathElements, String stagingPath,
- Sleeper retrySleeper) {
- LOG.info("Uploading {} files from PipelineOptions.filesToStage to staging location to "
- + "prepare for execution.", classpathElements.size());
-
- if (classpathElements.size() > SANE_CLASSPATH_SIZE) {
- LOG.warn("Your classpath contains {} elements, which Google Cloud Dataflow automatically "
- + "copies to all workers. Having this many entries on your classpath may be indicative "
- + "of an issue in your pipeline. You may want to consider trimming the classpath to "
- + "necessary dependencies only, using --filesToStage pipeline option to override "
- + "what files are being staged, or bundling several dependencies into one.",
- classpathElements.size());
- }
-
- ArrayList<DataflowPackage> packages = new ArrayList<>();
-
- if (stagingPath == null) {
- throw new IllegalArgumentException(
- "Can't stage classpath elements on because no staging location has been provided");
- }
-
- int numUploaded = 0;
- int numCached = 0;
- for (String classpathElement : classpathElements) {
- String packageName = null;
- if (classpathElement.contains("=")) {
- String[] components = classpathElement.split("=", 2);
- packageName = components[0];
- classpathElement = components[1];
- }
-
- File file = new File(classpathElement);
- if (!file.exists()) {
- LOG.warn("Skipping non-existent classpath element {} that was specified.",
- classpathElement);
- continue;
- }
-
- PackageAttributes attributes = createPackageAttributes(file, stagingPath, packageName);
-
- DataflowPackage workflowPackage = attributes.getDataflowPackage();
- packages.add(workflowPackage);
- String target = workflowPackage.getLocation();
-
- // TODO: Should we attempt to detect the Mime type rather than
- // always using MimeTypes.BINARY?
- try {
- try {
- long remoteLength = IOChannelUtils.getSizeBytes(target);
- if (remoteLength == attributes.getSize()) {
- LOG.debug("Skipping classpath element already staged: {} at {}",
- classpathElement, target);
- numCached++;
- continue;
- }
- } catch (FileNotFoundException expected) {
- // If the file doesn't exist, it means we need to upload it.
- }
-
- // Upload file, retrying on failure.
- AttemptBoundedExponentialBackOff backoff = new AttemptBoundedExponentialBackOff(
- MAX_ATTEMPTS,
- INITIAL_BACKOFF_INTERVAL_MS);
- while (true) {
- try {
- LOG.debug("Uploading classpath element {} to {}", classpathElement, target);
- try (WritableByteChannel writer = IOChannelUtils.create(target, MimeTypes.BINARY)) {
- copyContent(classpathElement, writer);
- }
- numUploaded++;
- break;
- } catch (IOException e) {
- if (ERROR_EXTRACTOR.accessDenied(e)) {
- String errorMessage = String.format(
- "Uploaded failed due to permissions error, will NOT retry staging "
- + "of classpath %s. Please verify credentials are valid and that you have "
- + "write access to %s. Stale credentials can be resolved by executing "
- + "'gcloud auth login'.", classpathElement, target);
- LOG.error(errorMessage);
- throw new IOException(errorMessage, e);
- } else if (!backoff.atMaxAttempts()) {
- LOG.warn("Upload attempt failed, sleeping before retrying staging of classpath: {}",
- classpathElement, e);
- BackOffUtils.next(retrySleeper, backoff);
- } else {
- // Rethrow last error, to be included as a cause in the catch below.
- LOG.error("Upload failed, will NOT retry staging of classpath: {}",
- classpathElement, e);
- throw e;
- }
- }
- }
- } catch (Exception e) {
- throw new RuntimeException("Could not stage classpath element: " + classpathElement, e);
- }
- }
-
- LOG.info("Uploading PipelineOptions.filesToStage complete: {} files newly uploaded, "
- + "{} files cached",
- numUploaded, numCached);
-
- return packages;
- }
-
- /**
- * Returns a unique name for a file with a given content hash.
- *
- * <p>Directory paths are removed. Example:
- * <pre>
- * dir="a/b/c/d", contentHash="f000" => d-f000.jar
- * file="a/b/c/d.txt", contentHash="f000" => d-f000.txt
- * file="a/b/c/d", contentHash="f000" => d-f000
- * </pre>
- */
- static String getUniqueContentName(File classpathElement, String contentHash) {
- String fileName = Files.getNameWithoutExtension(classpathElement.getAbsolutePath());
- String fileExtension = Files.getFileExtension(classpathElement.getAbsolutePath());
- if (classpathElement.isDirectory()) {
- return fileName + "-" + contentHash + ".jar";
- } else if (fileExtension.isEmpty()) {
- return fileName + "-" + contentHash;
- }
- return fileName + "-" + contentHash + "." + fileExtension;
- }
-
- /**
- * Copies the contents of the classpathElement to the output channel.
- *
- * <p>If the classpathElement is a directory, a Zip stream is constructed on the fly,
- * otherwise the file contents are copied as-is.
- *
- * <p>The output channel is not closed.
- */
- private static void copyContent(String classpathElement, WritableByteChannel outputChannel)
- throws IOException {
- final File classpathElementFile = new File(classpathElement);
- if (classpathElementFile.isDirectory()) {
- ZipFiles.zipDirectory(classpathElementFile, Channels.newOutputStream(outputChannel));
- } else {
- Files.asByteSource(classpathElementFile).copyTo(Channels.newOutputStream(outputChannel));
- }
- }
- /**
- * Holds the metadata necessary to stage a file or confirm that a staged file has not changed.
- */
- static class PackageAttributes {
- private final boolean directory;
- private final long size;
- private final String hash;
- private DataflowPackage dataflowPackage;
-
- public PackageAttributes(long size, String hash, boolean directory,
- DataflowPackage dataflowPackage) {
- this.size = size;
- this.hash = Objects.requireNonNull(hash, "hash");
- this.directory = directory;
- this.dataflowPackage = Objects.requireNonNull(dataflowPackage, "dataflowPackage");
- }
-
- /**
- * @return the dataflowPackage
- */
- public DataflowPackage getDataflowPackage() {
- return dataflowPackage;
- }
-
- /**
- * @return the directory
- */
- public boolean isDirectory() {
- return directory;
- }
-
- /**
- * @return the size
- */
- public long getSize() {
- return size;
- }
-
- /**
- * @return the hash
- */
- public String getHash() {
- return hash;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Stager.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Stager.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Stager.java
deleted file mode 100644
index f6c6a71..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Stager.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.api.services.dataflow.model.DataflowPackage;
-
-import java.util.List;
-
-/**
- * Interface for staging files needed for running a Dataflow pipeline.
- */
-public interface Stager {
- /* Stage files and return a list of packages. */
- public List<DataflowPackage> stageFiles();
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/DataflowTextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/DataflowTextIOTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/DataflowTextIOTest.java
deleted file mode 100644
index 1bd8a85..0000000
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/DataflowTextIOTest.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.io;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
-import com.google.cloud.dataflow.sdk.testing.TestDataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.util.GcsUtil;
-import com.google.cloud.dataflow.sdk.util.TestCredential;
-import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
-import com.google.common.collect.ImmutableList;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.io.IOException;
-import java.nio.channels.FileChannel;
-import java.nio.channels.SeekableByteChannel;
-import java.nio.file.Files;
-import java.nio.file.StandardOpenOption;
-import java.util.List;
-
-/**
- * {@link DataflowPipelineRunner} specific tests for TextIO Read and Write transforms.
- */
-@RunWith(JUnit4.class)
-public class DataflowTextIOTest {
-
- private TestDataflowPipelineOptions buildTestPipelineOptions() {
- TestDataflowPipelineOptions options =
- PipelineOptionsFactory.as(TestDataflowPipelineOptions.class);
- options.setGcpCredential(new TestCredential());
- return options;
- }
-
- private GcsUtil buildMockGcsUtil() throws IOException {
- GcsUtil mockGcsUtil = Mockito.mock(GcsUtil.class);
-
- // Any request to open gets a new bogus channel
- Mockito
- .when(mockGcsUtil.open(Mockito.any(GcsPath.class)))
- .then(new Answer<SeekableByteChannel>() {
- @Override
- public SeekableByteChannel answer(InvocationOnMock invocation) throws Throwable {
- return FileChannel.open(
- Files.createTempFile("channel-", ".tmp"),
- StandardOpenOption.CREATE, StandardOpenOption.DELETE_ON_CLOSE);
- }
- });
-
- // Any request for expansion returns a list containing the original GcsPath
- // This is required to pass validation that occurs in TextIO during apply()
- Mockito
- .when(mockGcsUtil.expand(Mockito.any(GcsPath.class)))
- .then(new Answer<List<GcsPath>>() {
- @Override
- public List<GcsPath> answer(InvocationOnMock invocation) throws Throwable {
- return ImmutableList.of((GcsPath) invocation.getArguments()[0]);
- }
- });
-
- return mockGcsUtil;
- }
-
- /**
- * This tests a few corner cases that should not crash.
- */
- @Test
- public void testGoodWildcards() throws Exception {
- TestDataflowPipelineOptions options = buildTestPipelineOptions();
- options.setGcsUtil(buildMockGcsUtil());
-
- Pipeline pipeline = Pipeline.create(options);
-
- applyRead(pipeline, "gs://bucket/foo");
- applyRead(pipeline, "gs://bucket/foo/");
- applyRead(pipeline, "gs://bucket/foo/*");
- applyRead(pipeline, "gs://bucket/foo/?");
- applyRead(pipeline, "gs://bucket/foo/[0-9]");
- applyRead(pipeline, "gs://bucket/foo/*baz*");
- applyRead(pipeline, "gs://bucket/foo/*baz?");
- applyRead(pipeline, "gs://bucket/foo/[0-9]baz?");
- applyRead(pipeline, "gs://bucket/foo/baz/*");
- applyRead(pipeline, "gs://bucket/foo/baz/*wonka*");
- applyRead(pipeline, "gs://bucket/foo/*baz/wonka*");
- applyRead(pipeline, "gs://bucket/foo*/baz");
- applyRead(pipeline, "gs://bucket/foo?/baz");
- applyRead(pipeline, "gs://bucket/foo[0-9]/baz");
-
- // Check that running doesn't fail.
- pipeline.run();
- }
-
- private void applyRead(Pipeline pipeline, String path) {
- pipeline.apply("Read(" + path + ")", TextIO.Read.from(path));
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptionsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptionsTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptionsTest.java
deleted file mode 100644
index 1b5a3c7..0000000
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptionsTest.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.options;
-
-import static org.hamcrest.Matchers.hasEntry;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link DataflowPipelineDebugOptions}. */
-@RunWith(JUnit4.class)
-public class DataflowPipelineDebugOptionsTest {
- @Test
- public void testTransformNameMapping() throws Exception {
- DataflowPipelineDebugOptions options = PipelineOptionsFactory
- .fromArgs(new String[]{"--transformNameMapping={\"a\":\"b\",\"foo\":\"\",\"bar\":\"baz\"}"})
- .as(DataflowPipelineDebugOptions.class);
- assertEquals(3, options.getTransformNameMapping().size());
- assertThat(options.getTransformNameMapping(), hasEntry("a", "b"));
- assertThat(options.getTransformNameMapping(), hasEntry("foo", ""));
- assertThat(options.getTransformNameMapping(), hasEntry("bar", "baz"));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineOptionsTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineOptionsTest.java
deleted file mode 100644
index eff79bb..0000000
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineOptionsTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.options;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import com.google.cloud.dataflow.sdk.testing.ResetDateTimeProvider;
-import com.google.cloud.dataflow.sdk.testing.RestoreSystemProperties;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestRule;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link DataflowPipelineOptions}. */
-@RunWith(JUnit4.class)
-public class DataflowPipelineOptionsTest {
- @Rule public TestRule restoreSystemProperties = new RestoreSystemProperties();
- @Rule public ResetDateTimeProvider resetDateTimeProviderRule = new ResetDateTimeProvider();
-
- @Test
- public void testJobNameIsSet() {
- DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
- options.setJobName("TestJobName");
- assertEquals("TestJobName", options.getJobName());
- }
-
- @Test
- public void testUserNameIsNotSet() {
- resetDateTimeProviderRule.setDateTimeFixed("2014-12-08T19:07:06.698Z");
- System.getProperties().remove("user.name");
- DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
- options.setAppName("TestApplication");
- assertEquals("testapplication--1208190706", options.getJobName());
- assertTrue(options.getJobName().length() <= 40);
- }
-
- @Test
- public void testAppNameAndUserNameAreLong() {
- resetDateTimeProviderRule.setDateTimeFixed("2014-12-08T19:07:06.698Z");
- System.getProperties().put("user.name", "abcdeabcdeabcdeabcdeabcdeabcde");
- DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
- options.setAppName("1234567890123456789012345678901234567890");
- assertEquals(
- "a234567890123456789012345678901234567890-abcdeabcdeabcdeabcdeabcdeabcde-1208190706",
- options.getJobName());
- }
-
- @Test
- public void testAppNameIsLong() {
- resetDateTimeProviderRule.setDateTimeFixed("2014-12-08T19:07:06.698Z");
- System.getProperties().put("user.name", "abcde");
- DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
- options.setAppName("1234567890123456789012345678901234567890");
- assertEquals("a234567890123456789012345678901234567890-abcde-1208190706", options.getJobName());
- }
-
- @Test
- public void testUserNameIsLong() {
- resetDateTimeProviderRule.setDateTimeFixed("2014-12-08T19:07:06.698Z");
- System.getProperties().put("user.name", "abcdeabcdeabcdeabcdeabcdeabcde");
- DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
- options.setAppName("1234567890");
- assertEquals("a234567890-abcdeabcdeabcdeabcdeabcdeabcde-1208190706", options.getJobName());
- }
-
- @Test
- public void testUtf8UserNameAndApplicationNameIsNormalized() {
- resetDateTimeProviderRule.setDateTimeFixed("2014-12-08T19:07:06.698Z");
- System.getProperties().put("user.name", "ði ıntəˈnæʃənəl ");
- DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
- options.setAppName("fəˈnɛtık əsoʊsiˈeıʃn");
- assertEquals("f00n0t0k00so0si0e00n-0i00nt00n000n0l0-1208190706", options.getJobName());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/options/DataflowProfilingOptionsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/options/DataflowProfilingOptionsTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/options/DataflowProfilingOptionsTest.java
deleted file mode 100644
index 1420273..0000000
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/options/DataflowProfilingOptionsTest.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.options;
-
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import org.hamcrest.Matchers;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link DataflowProfilingOptions}.
- */
-@RunWith(JUnit4.class)
-public class DataflowProfilingOptionsTest {
-
- private static final ObjectMapper MAPPER = new ObjectMapper();
-
- @Test
- public void testOptionsObject() throws Exception {
- DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(new String[] {
- "--enableProfilingAgent", "--profilingAgentConfiguration={\"interval\": 21}"})
- .as(DataflowPipelineOptions.class);
- assertTrue(options.getEnableProfilingAgent());
-
- String json = MAPPER.writeValueAsString(options);
- assertThat(json, Matchers.containsString(
- "\"profilingAgentConfiguration\":{\"interval\":21}"));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/options/DataflowWorkerLoggingOptionsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/options/DataflowWorkerLoggingOptionsTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/options/DataflowWorkerLoggingOptionsTest.java
deleted file mode 100644
index b752f3d..0000000
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/options/DataflowWorkerLoggingOptionsTest.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.options;
-
-import static com.google.cloud.dataflow.sdk.options.DataflowWorkerLoggingOptions.Level.WARN;
-import static org.junit.Assert.assertEquals;
-
-import com.google.cloud.dataflow.sdk.options.DataflowWorkerLoggingOptions.WorkerLogLevelOverrides;
-import com.google.common.collect.ImmutableMap;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link DataflowWorkerLoggingOptions}. */
-@RunWith(JUnit4.class)
-public class DataflowWorkerLoggingOptionsTest {
- private static final ObjectMapper MAPPER = new ObjectMapper();
- @Rule public ExpectedException expectedException = ExpectedException.none();
-
- @Test
- public void testWorkerLogLevelOverrideWithInvalidLogLevel() {
- expectedException.expect(IllegalArgumentException.class);
- expectedException.expectMessage("Unsupported log level");
- WorkerLogLevelOverrides.from(ImmutableMap.of("Name", "FakeLevel"));
- }
-
- @Test
- public void testWorkerLogLevelOverrideForClass() throws Exception {
- assertEquals("{\"org.junit.Test\":\"WARN\"}",
- MAPPER.writeValueAsString(
- new WorkerLogLevelOverrides().addOverrideForClass(Test.class, WARN)));
- }
-
- @Test
- public void testWorkerLogLevelOverrideForPackage() throws Exception {
- assertEquals("{\"org.junit\":\"WARN\"}",
- MAPPER.writeValueAsString(
- new WorkerLogLevelOverrides().addOverrideForPackage(Test.class.getPackage(), WARN)));
- }
-
- @Test
- public void testWorkerLogLevelOverrideForName() throws Exception {
- assertEquals("{\"A\":\"WARN\"}",
- MAPPER.writeValueAsString(
- new WorkerLogLevelOverrides().addOverrideForName("A", WARN)));
- }
-
- @Test
- public void testSerializationAndDeserializationOf() throws Exception {
- String testValue = "{\"A\":\"WARN\"}";
- assertEquals(testValue,
- MAPPER.writeValueAsString(
- MAPPER.readValue(testValue, WorkerLogLevelOverrides.class)));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/options/PipelineOptionsFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/options/PipelineOptionsFactoryTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/options/PipelineOptionsFactoryTest.java
index 6ba1e00..2672a39 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/options/PipelineOptionsFactoryTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/options/PipelineOptionsFactoryTest.java
@@ -792,11 +792,11 @@ public class PipelineOptionsFactoryTest {
@Test
public void testSetASingularAttributeUsingAListThrowsAnError() {
String[] args = new String[] {
- "--diskSizeGb=100",
- "--diskSizeGb=200"};
+ "--string=100",
+ "--string=200"};
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("expected one element but was");
- PipelineOptionsFactory.fromArgs(args).create();
+ PipelineOptionsFactory.fromArgs(args).as(Objects.class);
}
@Test
@@ -923,18 +923,18 @@ public class PipelineOptionsFactoryTest {
public void testEmptyArgumentIsIgnored() {
String[] args =
new String[] {
- "", "--diskSizeGb=100", "", "", "--runner=" + DEFAULT_RUNNER_CLASS.getSimpleName()
+ "", "--string=100", "", "", "--runner=" + DEFAULT_RUNNER_CLASS.getSimpleName()
};
- PipelineOptionsFactory.fromArgs(args).create();
+ PipelineOptionsFactory.fromArgs(args).as(Objects.class);
}
@Test
public void testNullArgumentIsIgnored() {
String[] args =
new String[] {
- "--diskSizeGb=100", null, null, "--runner=" + DEFAULT_RUNNER_CLASS.getSimpleName()
+ "--string=100", null, null, "--runner=" + DEFAULT_RUNNER_CLASS.getSimpleName()
};
- PipelineOptionsFactory.fromArgs(args).create();
+ PipelineOptionsFactory.fromArgs(args).as(Objects.class);
}
@Test
@@ -1020,22 +1020,35 @@ public class PipelineOptionsFactoryTest {
containsString("The pipeline runner that will be used to execute the pipeline."));
}
- /** Used for a name collision test with the other DataflowPipelineOptions. */
- private interface DataflowPipelineOptions extends PipelineOptions {
+ /** Used for a name collision test with the other NameConflict interfaces. */
+ private static class NameConflictClassA {
+ /** Used for a name collision test with the other NameConflict interfaces. */
+ private interface NameConflict extends PipelineOptions {
+ }
+ }
+
+ /** Used for a name collision test with the other NameConflict interfaces. */
+ private static class NameConflictClassB {
+ /** Used for a name collision test with the other NameConflict interfaces. */
+ private interface NameConflict extends PipelineOptions {
+ }
}
@Test
public void testShortnameSpecificHelpHasMultipleMatches() {
- PipelineOptionsFactory.register(DataflowPipelineOptions.class);
+ PipelineOptionsFactory.register(NameConflictClassA.NameConflict.class);
+ PipelineOptionsFactory.register(NameConflictClassB.NameConflict.class);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ListMultimap<String, String> arguments = ArrayListMultimap.create();
- arguments.put("help", "DataflowPipelineOptions");
+ arguments.put("help", "NameConflict");
assertTrue(PipelineOptionsFactory.printHelpUsageAndExitIfNeeded(
arguments, new PrintStream(baos), false /* exit */));
String output = new String(baos.toByteArray());
- assertThat(output, containsString("Multiple matches found for DataflowPipelineOptions"));
+ assertThat(output, containsString("Multiple matches found for NameConflict"));
+ assertThat(output, containsString("com.google.cloud.dataflow.sdk.options."
+ + "PipelineOptionsFactoryTest$NameConflictClassA$NameConflict"));
assertThat(output, containsString("com.google.cloud.dataflow.sdk.options."
- + "PipelineOptionsFactoryTest$DataflowPipelineOptions"));
+ + "PipelineOptionsFactoryTest$NameConflictClassB$NameConflict"));
assertThat(output, containsString("The set of registered options are:"));
assertThat(output, containsString("com.google.cloud.dataflow.sdk.options.PipelineOptions"));
}
@@ -1044,11 +1057,11 @@ public class PipelineOptionsFactoryTest {
public void testHelpWithOptionThatOutputsValidEnumTypes() {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ListMultimap<String, String> arguments = ArrayListMultimap.create();
- arguments.put("help", "com.google.cloud.dataflow.sdk.options.DataflowWorkerLoggingOptions");
+ arguments.put("help", Objects.class.getName());
assertTrue(PipelineOptionsFactory.printHelpUsageAndExitIfNeeded(
arguments, new PrintStream(baos), false /* exit */));
String output = new String(baos.toByteArray());
- assertThat(output, containsString("<DEBUG | ERROR | INFO | TRACE | WARN>"));
+ assertThat(output, containsString("<Value | Value2>"));
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/BlockingDataflowPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/BlockingDataflowPipelineRunnerTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/BlockingDataflowPipelineRunnerTest.java
deleted file mode 100644
index 0322426..0000000
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/BlockingDataflowPipelineRunnerTest.java
+++ /dev/null
@@ -1,302 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners;
-
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.isA;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.PipelineResult.State;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.testing.ExpectedLogs;
-import com.google.cloud.dataflow.sdk.testing.TestDataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.util.MonitoringUtil;
-import com.google.cloud.dataflow.sdk.util.NoopPathValidator;
-import com.google.cloud.dataflow.sdk.util.TestCredential;
-
-import org.hamcrest.Description;
-import org.hamcrest.Factory;
-import org.hamcrest.Matcher;
-import org.hamcrest.TypeSafeMatcher;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * Tests for BlockingDataflowPipelineRunner.
- */
-@RunWith(JUnit4.class)
-public class BlockingDataflowPipelineRunnerTest {
-
- @Rule
- public ExpectedLogs expectedLogs = ExpectedLogs.none(BlockingDataflowPipelineRunner.class);
-
- @Rule
- public ExpectedException expectedThrown = ExpectedException.none();
-
- /**
- * A {@link Matcher} for a {@link DataflowJobException} that applies an underlying {@link Matcher}
- * to the {@link DataflowPipelineJob} returned by {@link DataflowJobException#getJob()}.
- */
- private static class DataflowJobExceptionMatcher<T extends DataflowJobException>
- extends TypeSafeMatcher<T> {
-
- private final Matcher<DataflowPipelineJob> matcher;
-
- public DataflowJobExceptionMatcher(Matcher<DataflowPipelineJob> matcher) {
- this.matcher = matcher;
- }
-
- @Override
- public boolean matchesSafely(T ex) {
- return matcher.matches(ex.getJob());
- }
-
- @Override
- protected void describeMismatchSafely(T item, Description description) {
- description.appendText("job ");
- matcher.describeMismatch(item.getMessage(), description);
- }
-
- @Override
- public void describeTo(Description description) {
- description.appendText("exception with job matching ");
- description.appendDescriptionOf(matcher);
- }
-
- @Factory
- public static <T extends DataflowJobException> Matcher<T> expectJob(
- Matcher<DataflowPipelineJob> matcher) {
- return new DataflowJobExceptionMatcher<T>(matcher);
- }
- }
-
- /**
- * A {@link Matcher} for a {@link DataflowPipelineJob} that applies an underlying {@link Matcher}
- * to the return value of {@link DataflowPipelineJob#getJobId()}.
- */
- private static class JobIdMatcher<T extends DataflowPipelineJob> extends TypeSafeMatcher<T> {
-
- private final Matcher<String> matcher;
-
- public JobIdMatcher(Matcher<String> matcher) {
- this.matcher = matcher;
- }
-
- @Override
- public boolean matchesSafely(T job) {
- return matcher.matches(job.getJobId());
- }
-
- @Override
- protected void describeMismatchSafely(T item, Description description) {
- description.appendText("jobId ");
- matcher.describeMismatch(item.getJobId(), description);
- }
-
- @Override
- public void describeTo(Description description) {
- description.appendText("job with jobId ");
- description.appendDescriptionOf(matcher);
- }
-
- @Factory
- public static <T extends DataflowPipelineJob> Matcher<T> expectJobId(final String jobId) {
- return new JobIdMatcher<T>(equalTo(jobId));
- }
- }
-
- /**
- * A {@link Matcher} for a {@link DataflowJobUpdatedException} that applies an underlying
- * {@link Matcher} to the {@link DataflowPipelineJob} returned by
- * {@link DataflowJobUpdatedException#getReplacedByJob()}.
- */
- private static class ReplacedByJobMatcher<T extends DataflowJobUpdatedException>
- extends TypeSafeMatcher<T> {
-
- private final Matcher<DataflowPipelineJob> matcher;
-
- public ReplacedByJobMatcher(Matcher<DataflowPipelineJob> matcher) {
- this.matcher = matcher;
- }
-
- @Override
- public boolean matchesSafely(T ex) {
- return matcher.matches(ex.getReplacedByJob());
- }
-
- @Override
- protected void describeMismatchSafely(T item, Description description) {
- description.appendText("job ");
- matcher.describeMismatch(item.getMessage(), description);
- }
-
- @Override
- public void describeTo(Description description) {
- description.appendText("exception with replacedByJob() ");
- description.appendDescriptionOf(matcher);
- }
-
- @Factory
- public static <T extends DataflowJobUpdatedException> Matcher<T> expectReplacedBy(
- Matcher<DataflowPipelineJob> matcher) {
- return new ReplacedByJobMatcher<T>(matcher);
- }
- }
-
- /**
- * Creates a mocked {@link DataflowPipelineJob} with the given {@code projectId} and {@code jobId}
- * that will immediately terminate in the provided {@code terminalState}.
- *
- * <p>The return value may be further mocked.
- */
- private DataflowPipelineJob createMockJob(
- String projectId, String jobId, State terminalState) throws Exception {
- DataflowPipelineJob mockJob = mock(DataflowPipelineJob.class);
- when(mockJob.getProjectId()).thenReturn(projectId);
- when(mockJob.getJobId()).thenReturn(jobId);
- when(mockJob.waitToFinish(
- anyLong(), isA(TimeUnit.class), isA(MonitoringUtil.JobMessagesHandler.class)))
- .thenReturn(terminalState);
- return mockJob;
- }
-
- /**
- * Returns a {@link BlockingDataflowPipelineRunner} that will return the provided a job to return.
- * Some {@link PipelineOptions} will be extracted from the job, such as the project ID.
- */
- private BlockingDataflowPipelineRunner createMockRunner(DataflowPipelineJob job)
- throws Exception {
- DataflowPipelineRunner mockRunner = mock(DataflowPipelineRunner.class);
- TestDataflowPipelineOptions options =
- PipelineOptionsFactory.as(TestDataflowPipelineOptions.class);
- options.setProject(job.getProjectId());
-
- when(mockRunner.run(isA(Pipeline.class))).thenReturn(job);
-
- return new BlockingDataflowPipelineRunner(mockRunner, options);
- }
-
- /**
- * Tests that the {@link BlockingDataflowPipelineRunner} returns normally when a job terminates in
- * the {@link State#DONE DONE} state.
- */
- @Test
- public void testJobDoneComplete() throws Exception {
- createMockRunner(createMockJob("testJobDone-projectId", "testJobDone-jobId", State.DONE))
- .run(DirectPipeline.createForTest());
- expectedLogs.verifyInfo("Job finished with status DONE");
- }
-
- /**
- * Tests that the {@link BlockingDataflowPipelineRunner} throws the appropriate exception
- * when a job terminates in the {@link State#FAILED FAILED} state.
- */
- @Test
- public void testFailedJobThrowsException() throws Exception {
- expectedThrown.expect(DataflowJobExecutionException.class);
- expectedThrown.expect(DataflowJobExceptionMatcher.expectJob(
- JobIdMatcher.expectJobId("testFailedJob-jobId")));
- createMockRunner(createMockJob("testFailedJob-projectId", "testFailedJob-jobId", State.FAILED))
- .run(DirectPipeline.createForTest());
- }
-
- /**
- * Tests that the {@link BlockingDataflowPipelineRunner} throws the appropriate exception
- * when a job terminates in the {@link State#CANCELLED CANCELLED} state.
- */
- @Test
- public void testCancelledJobThrowsException() throws Exception {
- expectedThrown.expect(DataflowJobCancelledException.class);
- expectedThrown.expect(DataflowJobExceptionMatcher.expectJob(
- JobIdMatcher.expectJobId("testCancelledJob-jobId")));
- createMockRunner(
- createMockJob("testCancelledJob-projectId", "testCancelledJob-jobId", State.CANCELLED))
- .run(DirectPipeline.createForTest());
- }
-
- /**
- * Tests that the {@link BlockingDataflowPipelineRunner} throws the appropriate exception
- * when a job terminates in the {@link State#UPDATED UPDATED} state.
- */
- @Test
- public void testUpdatedJobThrowsException() throws Exception {
- expectedThrown.expect(DataflowJobUpdatedException.class);
- expectedThrown.expect(DataflowJobExceptionMatcher.expectJob(
- JobIdMatcher.expectJobId("testUpdatedJob-jobId")));
- expectedThrown.expect(ReplacedByJobMatcher.expectReplacedBy(
- JobIdMatcher.expectJobId("testUpdatedJob-replacedByJobId")));
- DataflowPipelineJob job =
- createMockJob("testUpdatedJob-projectId", "testUpdatedJob-jobId", State.UPDATED);
- DataflowPipelineJob replacedByJob =
- createMockJob("testUpdatedJob-projectId", "testUpdatedJob-replacedByJobId", State.DONE);
- when(job.getReplacedByJob()).thenReturn(replacedByJob);
- createMockRunner(job).run(DirectPipeline.createForTest());
- }
-
- /**
- * Tests that the {@link BlockingDataflowPipelineRunner} throws the appropriate exception
- * when a job terminates in the {@link State#UNKNOWN UNKNOWN} state, indicating that the
- * Dataflow service returned a state that the SDK is unfamiliar with (possibly because it
- * is an old SDK relative the service).
- */
- @Test
- public void testUnknownJobThrowsException() throws Exception {
- expectedThrown.expect(IllegalStateException.class);
- createMockRunner(
- createMockJob("testUnknownJob-projectId", "testUnknownJob-jobId", State.UNKNOWN))
- .run(DirectPipeline.createForTest());
- }
-
- /**
- * Tests that the {@link BlockingDataflowPipelineRunner} throws the appropriate exception
- * when a job returns a {@code null} state, indicating that it failed to contact the service,
- * including all of its built-in resilience logic.
- */
- @Test
- public void testNullJobThrowsException() throws Exception {
- expectedThrown.expect(DataflowServiceException.class);
- expectedThrown.expect(DataflowJobExceptionMatcher.expectJob(
- JobIdMatcher.expectJobId("testNullJob-jobId")));
- createMockRunner(
- createMockJob("testNullJob-projectId", "testNullJob-jobId", null))
- .run(DirectPipeline.createForTest());
- }
-
- @Test
- public void testToString() {
- DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
- options.setJobName("TestJobName");
- options.setProject("test-project");
- options.setTempLocation("gs://test/temp/location");
- options.setGcpCredential(new TestCredential());
- options.setPathValidatorClass(NoopPathValidator.class);
- assertEquals("BlockingDataflowPipelineRunner#TestJobName",
- BlockingDataflowPipelineRunner.fromOptions(options).toString());
- }
-}