You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/04/12 01:42:50 UTC
[16/18] incubator-beam git commit: [BEAM-151] Move a large portion of
the Dataflow runner to separate maven module
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyExistsException.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyExistsException.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyExistsException.java
new file mode 100644
index 0000000..4a4f100
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyExistsException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.runners;
+
+/**
+ * An exception that is thrown if the unique job name constraint of the Dataflow
+ * service is broken because an existing job with the same job name is currently active.
+ * The {@link DataflowPipelineJob} contained within this exception contains information
+ * about the pre-existing job.
+ */
+public class DataflowJobAlreadyExistsException extends DataflowJobException {
+ /**
+ * Create a new {@code DataflowJobAlreadyExistsException} with the specified {@link
+ * DataflowPipelineJob} and message.
+ */
+ public DataflowJobAlreadyExistsException(
+ DataflowPipelineJob job, String message) {
+ super(job, message, null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyUpdatedException.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyUpdatedException.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyUpdatedException.java
new file mode 100644
index 0000000..1f52c6a
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyUpdatedException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.runners;
+
+/**
+ * An exception that is thrown if the existing job has already been updated within the Dataflow
+ * service and is no longer able to be updated. The {@link DataflowPipelineJob} contained within
+ * this exception contains information about the pre-existing updated job.
+ */
+public class DataflowJobAlreadyUpdatedException extends DataflowJobException {
+ /**
+ * Create a new {@code DataflowJobAlreadyUpdatedException} with the specified {@link
+ * DataflowPipelineJob} and message.
+ */
+ public DataflowJobAlreadyUpdatedException(
+ DataflowPipelineJob job, String message) {
+ super(job, message, null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobCancelledException.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobCancelledException.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobCancelledException.java
new file mode 100644
index 0000000..495ca5a
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobCancelledException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.runners;
+
+/**
+ * Signals that a job run by a {@link BlockingDataflowPipelineRunner} was updated during execution.
+ */
+public class DataflowJobCancelledException extends DataflowJobException {
+ /**
+ * Create a new {@code DataflowJobAlreadyUpdatedException} with the specified {@link
+ * DataflowPipelineJob} and message.
+ */
+ public DataflowJobCancelledException(DataflowPipelineJob job, String message) {
+ super(job, message, null);
+ }
+
+ /**
+ * Create a new {@code DataflowJobAlreadyUpdatedException} with the specified {@link
+ * DataflowPipelineJob}, message, and cause.
+ */
+ public DataflowJobCancelledException(DataflowPipelineJob job, String message, Throwable cause) {
+ super(job, message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobException.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobException.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobException.java
new file mode 100644
index 0000000..a22d13c
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.runners;
+
+import java.util.Objects;
+
+import javax.annotation.Nullable;
+
+/**
+ * A {@link RuntimeException} that contains information about a {@link DataflowPipelineJob}.
+ */
+public abstract class DataflowJobException extends RuntimeException {
+ private final DataflowPipelineJob job;
+
+ DataflowJobException(DataflowPipelineJob job, String message, @Nullable Throwable cause) {
+ super(message, cause);
+ this.job = Objects.requireNonNull(job);
+ }
+
+ /**
+ * Returns the failed job.
+ */
+ public DataflowPipelineJob getJob() {
+ return job;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobExecutionException.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobExecutionException.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobExecutionException.java
new file mode 100644
index 0000000..3dbb007
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobExecutionException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.runners;
+
+import javax.annotation.Nullable;
+
+/**
+ * Signals that a job run by a {@link BlockingDataflowPipelineRunner} fails during execution, and
+ * provides access to the failed job.
+ */
+public class DataflowJobExecutionException extends DataflowJobException {
+ DataflowJobExecutionException(DataflowPipelineJob job, String message) {
+ this(job, message, null);
+ }
+
+ DataflowJobExecutionException(
+ DataflowPipelineJob job, String message, @Nullable Throwable cause) {
+ super(job, message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobUpdatedException.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobUpdatedException.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobUpdatedException.java
new file mode 100644
index 0000000..72deb45
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobUpdatedException.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.runners;
+
+/**
+ * Signals that a job run by a {@link BlockingDataflowPipelineRunner} was updated during execution.
+ */
+public class DataflowJobUpdatedException extends DataflowJobException {
+ private DataflowPipelineJob replacedByJob;
+
+ /**
+ * Create a new {@code DataflowJobUpdatedException} with the specified original {@link
+ * DataflowPipelineJob}, message, and replacement {@link DataflowPipelineJob}.
+ */
+ public DataflowJobUpdatedException(
+ DataflowPipelineJob job, String message, DataflowPipelineJob replacedByJob) {
+ this(job, message, replacedByJob, null);
+ }
+
+ /**
+ * Create a new {@code DataflowJobUpdatedException} with the specified original {@link
+ * DataflowPipelineJob}, message, replacement {@link DataflowPipelineJob}, and cause.
+ */
+ public DataflowJobUpdatedException(
+ DataflowPipelineJob job, String message, DataflowPipelineJob replacedByJob, Throwable cause) {
+ super(job, message, cause);
+ this.replacedByJob = replacedByJob;
+ }
+
+ /**
+ * The new job that replaces the job terminated with this exception.
+ */
+ public DataflowPipelineJob getReplacedByJob() {
+ return replacedByJob;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipeline.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipeline.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipeline.java
new file mode 100644
index 0000000..7b97c7d
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipeline.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.runners;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+
+/**
+ * A {@link DataflowPipeline} is a {@link Pipeline} that returns a
+ * {@link DataflowPipelineJob} when it is
+ * {@link com.google.cloud.dataflow.sdk.Pipeline#run()}.
+ *
+ * <p>This is not intended for use by users of Cloud Dataflow.
+ * Instead, use {@link Pipeline#create(PipelineOptions)} to initialize a
+ * {@link Pipeline}.
+ */
+public class DataflowPipeline extends Pipeline {
+
+ /**
+ * Creates and returns a new {@link DataflowPipeline} instance for tests.
+ */
+ public static DataflowPipeline create(DataflowPipelineOptions options) {
+ return new DataflowPipeline(options);
+ }
+
+ private DataflowPipeline(DataflowPipelineOptions options) {
+ super(DataflowPipelineRunner.fromOptions(options), options);
+ }
+
+ @Override
+ public DataflowPipelineJob run() {
+ return (DataflowPipelineJob) super.run();
+ }
+
+ @Override
+ public DataflowPipelineRunner getRunner() {
+ return (DataflowPipelineRunner) super.getRunner();
+ }
+
+ @Override
+ public String toString() {
+ return "DataflowPipeline#" + getOptions().as(DataflowPipelineOptions.class).getJobName();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineJob.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineJob.java
new file mode 100644
index 0000000..632be6d
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineJob.java
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.runners;
+
+import static com.google.cloud.dataflow.sdk.util.TimeUtil.fromCloudTime;
+
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.BackOffUtils;
+import com.google.api.client.util.NanoClock;
+import com.google.api.client.util.Sleeper;
+import com.google.api.services.dataflow.Dataflow;
+import com.google.api.services.dataflow.model.Job;
+import com.google.api.services.dataflow.model.JobMessage;
+import com.google.api.services.dataflow.model.JobMetrics;
+import com.google.api.services.dataflow.model.MetricUpdate;
+import com.google.cloud.dataflow.sdk.PipelineResult;
+import com.google.cloud.dataflow.sdk.runners.dataflow.DataflowAggregatorTransforms;
+import com.google.cloud.dataflow.sdk.runners.dataflow.DataflowMetricUpdateExtractor;
+import com.google.cloud.dataflow.sdk.transforms.Aggregator;
+import com.google.cloud.dataflow.sdk.util.AttemptAndTimeBoundedExponentialBackOff;
+import com.google.cloud.dataflow.sdk.util.AttemptBoundedExponentialBackOff;
+import com.google.cloud.dataflow.sdk.util.MapAggregatorValues;
+import com.google.cloud.dataflow.sdk.util.MonitoringUtil;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Throwables;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.SocketTimeoutException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nullable;
+
+/**
+ * A DataflowPipelineJob represents a job submitted to Dataflow using
+ * {@link DataflowPipelineRunner}.
+ */
+public class DataflowPipelineJob implements PipelineResult {
+ private static final Logger LOG = LoggerFactory.getLogger(DataflowPipelineJob.class);
+
+ /**
+ * The id for the job.
+ */
+ private String jobId;
+
+ /**
+ * Google cloud project to associate this pipeline with.
+ */
+ private String projectId;
+
+ /**
+ * Client for the Dataflow service. This can be used to query the service
+ * for information about the job.
+ */
+ private Dataflow dataflowClient;
+
+ /**
+ * The state the job terminated in or {@code null} if the job has not terminated.
+ */
+ @Nullable
+ private State terminalState = null;
+
+ /**
+ * The job that replaced this one or {@code null} if the job has not been replaced.
+ */
+ @Nullable
+ private DataflowPipelineJob replacedByJob = null;
+
+ private DataflowAggregatorTransforms aggregatorTransforms;
+
+ /**
+ * The Metric Updates retrieved after the job was in a terminal state.
+ */
+ private List<MetricUpdate> terminalMetricUpdates;
+
+ /**
+ * The polling interval for job status and messages information.
+ */
+ static final long MESSAGES_POLLING_INTERVAL = TimeUnit.SECONDS.toMillis(2);
+ static final long STATUS_POLLING_INTERVAL = TimeUnit.SECONDS.toMillis(2);
+
+ /**
+ * The amount of polling attempts for job status and messages information.
+ */
+ static final int MESSAGES_POLLING_ATTEMPTS = 10;
+ static final int STATUS_POLLING_ATTEMPTS = 5;
+
+ /**
+ * Constructs the job.
+ *
+ * @param projectId the project id
+ * @param jobId the job id
+ * @param dataflowClient the client for the Dataflow Service
+ */
+ public DataflowPipelineJob(String projectId, String jobId, Dataflow dataflowClient,
+ DataflowAggregatorTransforms aggregatorTransforms) {
+ this.projectId = projectId;
+ this.jobId = jobId;
+ this.dataflowClient = dataflowClient;
+ this.aggregatorTransforms = aggregatorTransforms;
+ }
+
+ /**
+ * Get the id of this job.
+ */
+ public String getJobId() {
+ return jobId;
+ }
+
+ /**
+ * Get the project this job exists in.
+ */
+ public String getProjectId() {
+ return projectId;
+ }
+
+ /**
+ * Returns a new {@link DataflowPipelineJob} for the job that replaced this one, if applicable.
+ *
+ * @throws IllegalStateException if called before the job has terminated or if the job terminated
+ * but was not updated
+ */
+ public DataflowPipelineJob getReplacedByJob() {
+ if (terminalState == null) {
+ throw new IllegalStateException("getReplacedByJob() called before job terminated");
+ }
+ if (replacedByJob == null) {
+ throw new IllegalStateException("getReplacedByJob() called for job that was not replaced");
+ }
+ return replacedByJob;
+ }
+
+ /**
+ * Get the Cloud Dataflow API Client used by this job.
+ */
+ public Dataflow getDataflowClient() {
+ return dataflowClient;
+ }
+
+ /**
+ * Waits for the job to finish and return the final status.
+ *
+ * @param timeToWait The time to wait in units timeUnit for the job to finish.
+ * Provide a value less than 1 ms for an infinite wait.
+ * @param timeUnit The unit of time for timeToWait.
+ * @param messageHandler If non null this handler will be invoked for each
+ * batch of messages received.
+ * @return The final state of the job or null on timeout or if the
+ * thread is interrupted.
+ * @throws IOException If there is a persistent problem getting job
+ * information.
+ * @throws InterruptedException
+ */
+ @Nullable
+ public State waitToFinish(
+ long timeToWait,
+ TimeUnit timeUnit,
+ MonitoringUtil.JobMessagesHandler messageHandler)
+ throws IOException, InterruptedException {
+ return waitToFinish(timeToWait, timeUnit, messageHandler, Sleeper.DEFAULT, NanoClock.SYSTEM);
+ }
+
+ /**
+ * Wait for the job to finish and return the final status.
+ *
+ * @param timeToWait The time to wait in units timeUnit for the job to finish.
+ * Provide a value less than 1 ms for an infinite wait.
+ * @param timeUnit The unit of time for timeToWait.
+ * @param messageHandler If non null this handler will be invoked for each
+ * batch of messages received.
+ * @param sleeper A sleeper to use to sleep between attempts.
+ * @param nanoClock A nanoClock used to time the total time taken.
+ * @return The final state of the job or null on timeout or if the
+ * thread is interrupted.
+ * @throws IOException If there is a persistent problem getting job
+ * information.
+ * @throws InterruptedException
+ */
+ @Nullable
+ @VisibleForTesting
+ State waitToFinish(
+ long timeToWait,
+ TimeUnit timeUnit,
+ MonitoringUtil.JobMessagesHandler messageHandler,
+ Sleeper sleeper,
+ NanoClock nanoClock)
+ throws IOException, InterruptedException {
+ MonitoringUtil monitor = new MonitoringUtil(projectId, dataflowClient);
+
+ long lastTimestamp = 0;
+ BackOff backoff =
+ timeUnit.toMillis(timeToWait) > 0
+ ? new AttemptAndTimeBoundedExponentialBackOff(
+ MESSAGES_POLLING_ATTEMPTS,
+ MESSAGES_POLLING_INTERVAL,
+ timeUnit.toMillis(timeToWait),
+ AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.ATTEMPTS,
+ nanoClock)
+ : new AttemptBoundedExponentialBackOff(
+ MESSAGES_POLLING_ATTEMPTS, MESSAGES_POLLING_INTERVAL);
+ State state;
+ do {
+ // Get the state of the job before listing messages. This ensures we always fetch job
+ // messages after the job finishes to ensure we have all them.
+ state = getStateWithRetries(1, sleeper);
+ boolean hasError = state == State.UNKNOWN;
+
+ if (messageHandler != null && !hasError) {
+ // Process all the job messages that have accumulated so far.
+ try {
+ List<JobMessage> allMessages = monitor.getJobMessages(
+ jobId, lastTimestamp);
+
+ if (!allMessages.isEmpty()) {
+ lastTimestamp =
+ fromCloudTime(allMessages.get(allMessages.size() - 1).getTime()).getMillis();
+ messageHandler.process(allMessages);
+ }
+ } catch (GoogleJsonResponseException | SocketTimeoutException e) {
+ hasError = true;
+ LOG.warn("There were problems getting current job messages: {}.", e.getMessage());
+ LOG.debug("Exception information:", e);
+ }
+ }
+
+ if (!hasError) {
+ backoff.reset();
+ // Check if the job is done.
+ if (state.isTerminal()) {
+ return state;
+ }
+ }
+ } while(BackOffUtils.next(sleeper, backoff));
+ LOG.warn("No terminal state was returned. State value {}", state);
+ return null; // Timed out.
+ }
+
+ /**
+ * Cancels the job.
+ * @throws IOException if there is a problem executing the cancel request.
+ */
+ public void cancel() throws IOException {
+ Job content = new Job();
+ content.setProjectId(projectId);
+ content.setId(jobId);
+ content.setRequestedState("JOB_STATE_CANCELLED");
+ dataflowClient.projects().jobs()
+ .update(projectId, jobId, content)
+ .execute();
+ }
+
+ @Override
+ public State getState() {
+ if (terminalState != null) {
+ return terminalState;
+ }
+
+ return getStateWithRetries(STATUS_POLLING_ATTEMPTS, Sleeper.DEFAULT);
+ }
+
+ /**
+ * Attempts to get the state. Uses exponential backoff on failure up to the maximum number
+ * of passed in attempts.
+ *
+ * @param attempts The amount of attempts to make.
+ * @param sleeper Object used to do the sleeps between attempts.
+ * @return The state of the job or State.UNKNOWN in case of failure.
+ */
+ @VisibleForTesting
+ State getStateWithRetries(int attempts, Sleeper sleeper) {
+ if (terminalState != null) {
+ return terminalState;
+ }
+ try {
+ Job job = getJobWithRetries(attempts, sleeper);
+ return MonitoringUtil.toState(job.getCurrentState());
+ } catch (IOException exn) {
+ // The only IOException that getJobWithRetries is permitted to throw is the final IOException
+ // that caused the failure of retry. Other exceptions are wrapped in an unchecked exceptions
+ // and will propagate.
+ return State.UNKNOWN;
+ }
+ }
+
+ /**
+ * Attempts to get the underlying {@link Job}. Uses exponential backoff on failure up to the
+ * maximum number of passed in attempts.
+ *
+ * @param attempts The amount of attempts to make.
+ * @param sleeper Object used to do the sleeps between attempts.
+ * @return The underlying {@link Job} object.
+ * @throws IOException When the maximum number of retries is exhausted, the last exception is
+ * thrown.
+ */
+ @VisibleForTesting
+ Job getJobWithRetries(int attempts, Sleeper sleeper) throws IOException {
+ AttemptBoundedExponentialBackOff backoff =
+ new AttemptBoundedExponentialBackOff(attempts, STATUS_POLLING_INTERVAL);
+
+ // Retry loop ends in return or throw
+ while (true) {
+ try {
+ Job job = dataflowClient
+ .projects()
+ .jobs()
+ .get(projectId, jobId)
+ .execute();
+ State currentState = MonitoringUtil.toState(job.getCurrentState());
+ if (currentState.isTerminal()) {
+ terminalState = currentState;
+ replacedByJob = new DataflowPipelineJob(
+ getProjectId(), job.getReplacedByJobId(), dataflowClient, aggregatorTransforms);
+ }
+ return job;
+ } catch (IOException exn) {
+ LOG.warn("There were problems getting current job status: {}.", exn.getMessage());
+ LOG.debug("Exception information:", exn);
+
+ if (!nextBackOff(sleeper, backoff)) {
+ throw exn;
+ }
+ }
+ }
+ }
+
+ /**
+ * Identical to {@link BackOffUtils#next} but without checked exceptions.
+ */
+ private boolean nextBackOff(Sleeper sleeper, BackOff backoff) {
+ try {
+ return BackOffUtils.next(sleeper, backoff);
+ } catch (InterruptedException | IOException e) {
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ throw Throwables.propagate(e);
+ }
+ }
+
+ @Override
+ public <OutputT> AggregatorValues<OutputT> getAggregatorValues(Aggregator<?, OutputT> aggregator)
+ throws AggregatorRetrievalException {
+ try {
+ return new MapAggregatorValues<>(fromMetricUpdates(aggregator));
+ } catch (IOException e) {
+ throw new AggregatorRetrievalException(
+ "IOException when retrieving Aggregator values for Aggregator " + aggregator, e);
+ }
+ }
+
+ private <OutputT> Map<String, OutputT> fromMetricUpdates(Aggregator<?, OutputT> aggregator)
+ throws IOException {
+ if (aggregatorTransforms.contains(aggregator)) {
+ List<MetricUpdate> metricUpdates;
+ if (terminalMetricUpdates != null) {
+ metricUpdates = terminalMetricUpdates;
+ } else {
+ boolean terminal = getState().isTerminal();
+ JobMetrics jobMetrics =
+ dataflowClient.projects().jobs().getMetrics(projectId, jobId).execute();
+ metricUpdates = jobMetrics.getMetrics();
+ if (terminal && jobMetrics.getMetrics() != null) {
+ terminalMetricUpdates = metricUpdates;
+ }
+ }
+
+ return DataflowMetricUpdateExtractor.fromMetricUpdates(
+ aggregator, aggregatorTransforms, metricUpdates);
+ } else {
+ throw new IllegalArgumentException(
+ "Aggregator " + aggregator + " is not used in this pipeline");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRegistrar.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRegistrar.java
new file mode 100644
index 0000000..8aaa7cc
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRegistrar.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.runners;
+
+import com.google.auto.service.AutoService;
+import com.google.cloud.dataflow.sdk.options.BlockingDataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar;
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Contains the {@link PipelineOptionsRegistrar} and {@link PipelineRunnerRegistrar} for
+ * the {@link DataflowPipeline}.
+ */
+public class DataflowPipelineRegistrar {
+ private DataflowPipelineRegistrar() { }
+
+ /**
+ * Register the {@link DataflowPipelineOptions} and {@link BlockingDataflowPipelineOptions}.
+ */
+ @AutoService(PipelineOptionsRegistrar.class)
+ public static class Options implements PipelineOptionsRegistrar {
+ @Override
+ public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+ return ImmutableList.<Class<? extends PipelineOptions>>of(
+ DataflowPipelineOptions.class,
+ BlockingDataflowPipelineOptions.class);
+ }
+ }
+
+ /**
+ * Register the {@link DataflowPipelineRunner} and {@link BlockingDataflowPipelineRunner}.
+ */
+ @AutoService(PipelineRunnerRegistrar.class)
+ public static class Runner implements PipelineRunnerRegistrar {
+ @Override
+ public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
+ return ImmutableList.<Class<? extends PipelineRunner<?>>>of(
+ DataflowPipelineRunner.class,
+ BlockingDataflowPipelineRunner.class);
+ }
+ }
+}