You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/08/12 16:48:00 UTC
[1/2] incubator-beam git commit: Remove DataflowPipelineJob from
examples
Repository: incubator-beam
Updated Branches:
refs/heads/master 2bcbf379f -> def2526e4
Remove DataflowPipelineJob from examples
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a83738ba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a83738ba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a83738ba
Branch: refs/heads/master
Commit: a83738ba5fc631cc9be8c5294963e2ac2e82429d
Parents: 2bcbf37
Author: Pei He <pe...@google.com>
Authored: Mon Aug 1 13:41:59 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Aug 12 09:47:41 2016 -0700
----------------------------------------------------------------------
examples/java/pom.xml | 7 +-
.../beam/examples/common/ExampleUtils.java | 74 +++++---------------
.../runners/dataflow/DataflowPipelineJob.java | 14 +++-
3 files changed, 31 insertions(+), 64 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a83738ba/examples/java/pom.xml
----------------------------------------------------------------------
diff --git a/examples/java/pom.xml b/examples/java/pom.xml
index cdf80cb..6efbc54 100644
--- a/examples/java/pom.xml
+++ b/examples/java/pom.xml
@@ -221,11 +221,6 @@
<dependency>
<groupId>com.google.apis</groupId>
- <artifactId>google-api-services-dataflow</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.google.apis</groupId>
<artifactId>google-api-services-bigquery</artifactId>
</dependency>
@@ -286,6 +281,8 @@
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>${project.version}</version>
+ <scope>runtime</scope>
+ <optional>true</optional>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a83738ba/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
index 8b66861..7f03fc0 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
@@ -17,17 +17,10 @@
*/
package org.apache.beam.examples.common;
-import org.apache.beam.runners.dataflow.BlockingDataflowRunner;
-import org.apache.beam.runners.dataflow.DataflowPipelineJob;
-import org.apache.beam.runners.dataflow.DataflowRunner;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import org.apache.beam.runners.dataflow.util.MonitoringUtil;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.BigQueryOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PubsubOptions;
-import org.apache.beam.sdk.options.StreamingOptions;
-import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
import org.apache.beam.sdk.util.Transport;
@@ -44,7 +37,6 @@ import com.google.api.services.bigquery.model.DatasetReference;
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableSchema;
-import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.pubsub.Pubsub;
import com.google.api.services.pubsub.model.Subscription;
import com.google.api.services.pubsub.model.Topic;
@@ -71,8 +63,7 @@ public class ExampleUtils {
private final PipelineOptions options;
private Bigquery bigQueryClient = null;
private Pubsub pubsubClient = null;
- private Dataflow dataflowClient = null;
- private Set<DataflowPipelineJob> jobsToCancel = Sets.newHashSet();
+ private Set<PipelineResult> pipelinesToCancel = Sets.newHashSet();
private List<String> pendingMessages = Lists.newArrayList();
/**
@@ -80,7 +71,6 @@ public class ExampleUtils {
*/
public ExampleUtils(PipelineOptions options) {
this.options = options;
- setupRunner();
}
/**
@@ -281,71 +271,46 @@ public class ExampleUtils {
}
/**
- * Do some runner setup: check that the DirectRunner is not used in conjunction with
- * streaming, and if streaming is specified, use the DataflowRunner.
- */
- private void setupRunner() {
- Class<? extends PipelineRunner<?>> runner = options.getRunner();
- if (options.as(StreamingOptions.class).isStreaming()
- && runner.equals(BlockingDataflowRunner.class)) {
- // In order to cancel the pipelines automatically,
- // {@literal DataflowRunner} is forced to be used.
- options.setRunner(DataflowRunner.class);
- }
- }
-
- /**
* If {@literal DataflowRunner} or {@literal BlockingDataflowRunner} is used,
* waits for the pipeline to finish and cancels it (and the injector) before the program exists.
*/
public void waitToFinish(PipelineResult result) {
- if (result instanceof DataflowPipelineJob) {
- final DataflowPipelineJob job = (DataflowPipelineJob) result;
- jobsToCancel.add(job);
- if (!options.as(ExampleOptions.class).getKeepJobsRunning()) {
- addShutdownHook(jobsToCancel);
- }
- try {
- job.waitUntilFinish();
- } catch (Exception e) {
- throw new RuntimeException("Failed to wait for job to finish: " + job.getJobId());
- }
- } else {
+ pipelinesToCancel.add(result);
+ if (!options.as(ExampleOptions.class).getKeepJobsRunning()) {
+ addShutdownHook(pipelinesToCancel);
+ }
+ try {
+ result.waitUntilFinish();
+ } catch (UnsupportedOperationException e) {
// Do nothing if the given PipelineResult doesn't support waitUntilFinish(),
// such as EvaluationResults returned by DirectRunner.
tearDown();
printPendingMessages();
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to wait the pipeline until finish: " + result);
}
}
- private void addShutdownHook(final Collection<DataflowPipelineJob> jobs) {
- if (dataflowClient == null) {
- dataflowClient = options.as(DataflowPipelineOptions.class).getDataflowClient();
- }
-
+ private void addShutdownHook(final Collection<PipelineResult> pipelineResults) {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
tearDown();
printPendingMessages();
- for (DataflowPipelineJob job : jobs) {
- System.out.println("Canceling example pipeline: " + job.getJobId());
+ for (PipelineResult pipelineResult : pipelineResults) {
try {
- job.cancel();
+ pipelineResult.cancel();
} catch (IOException e) {
- System.out.println("Failed to cancel the job,"
- + " please go to the Developers Console to cancel it manually");
- System.out.println(
- MonitoringUtil.getJobMonitoringPageURL(job.getProjectId(), job.getJobId()));
+ System.out.println("Failed to cancel the job.");
+ System.out.println(e.getMessage());
}
}
- for (DataflowPipelineJob job : jobs) {
+ for (PipelineResult pipelineResult : pipelineResults) {
boolean cancellationVerified = false;
for (int retryAttempts = 6; retryAttempts > 0; retryAttempts--) {
- if (job.getState().isTerminal()) {
+ if (pipelineResult.getState().isTerminal()) {
cancellationVerified = true;
- System.out.println("Canceled example pipeline: " + job.getJobId());
break;
} else {
System.out.println(
@@ -354,10 +319,7 @@ public class ExampleUtils {
Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
}
if (!cancellationVerified) {
- System.out.println("Failed to verify the cancellation for job: " + job.getJobId());
- System.out.println("Please go to the Developers Console to verify manually:");
- System.out.println(
- MonitoringUtil.getJobMonitoringPageURL(job.getProjectId(), job.getJobId()));
+ System.out.println("Failed to verify the cancellation for job: " + pipelineResult);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a83738ba/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
index e043e23..3d0f145 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
@@ -271,9 +271,17 @@ public class DataflowPipelineJob implements PipelineResult {
content.setProjectId(projectId);
content.setId(jobId);
content.setRequestedState("JOB_STATE_CANCELLED");
- dataflowOptions.getDataflowClient().projects().jobs()
- .update(projectId, jobId, content)
- .execute();
+ try {
+ dataflowOptions.getDataflowClient().projects().jobs()
+ .update(projectId, jobId, content)
+ .execute();
+ } catch (IOException e) {
+ String errorMsg = String.format(
+ "Failed to cancel the job, please go to the Developers Console to cancel it manually: %s",
+ MonitoringUtil.getJobMonitoringPageURL(getProjectId(), getJobId()));
+ LOG.warn(errorMsg);
+ throw new IOException(errorMsg, e);
+ }
return State.CANCELLED;
}
[2/2] incubator-beam git commit: Closes #771
Posted by dh...@apache.org.
Closes #771
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/def2526e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/def2526e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/def2526e
Branch: refs/heads/master
Commit: def2526e4512ff31b727192007bf3410b69bcc5d
Parents: 2bcbf37 a83738b
Author: Dan Halperin <dh...@google.com>
Authored: Fri Aug 12 09:47:43 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Aug 12 09:47:43 2016 -0700
----------------------------------------------------------------------
examples/java/pom.xml | 7 +-
.../beam/examples/common/ExampleUtils.java | 74 +++++---------------
.../runners/dataflow/DataflowPipelineJob.java | 14 +++-
3 files changed, 31 insertions(+), 64 deletions(-)
----------------------------------------------------------------------