You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/07/07 00:25:28 UTC
[1/2] incubator-beam git commit: remove runInjectorPipeline from
DataflowExampleUtils
Repository: incubator-beam
Updated Branches:
refs/heads/master 5eb7d95f3 -> 1cb898f58
remove runInjectorPipeline from DataflowExampleUtils
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/51427f71
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/51427f71
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/51427f71
Branch: refs/heads/master
Commit: 51427f712d58a10f1de5a3db03030a6f920510cb
Parents: 5eb7d95
Author: Pei He <pe...@google.com>
Authored: Wed Jul 6 13:24:45 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Jul 6 17:25:24 2016 -0700
----------------------------------------------------------------------
.../examples/common/DataflowExampleUtils.java | 87 --------------------
1 file changed, 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51427f71/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java b/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java
index b436ed1..a90968a 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java
@@ -22,17 +22,11 @@ import org.apache.beam.runners.dataflow.DataflowPipelineJob;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.util.MonitoringUtil;
-import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.BigQueryOptions;
import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.transforms.IntraBundleParallelization;
-import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
import org.apache.beam.sdk.util.Transport;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.googleapis.services.AbstractGoogleClientRequest;
@@ -51,7 +45,6 @@ import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.pubsub.Pubsub;
import com.google.api.services.pubsub.model.Subscription;
import com.google.api.services.pubsub.model.Topic;
-import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Uninterruptibles;
@@ -301,20 +294,6 @@ public class DataflowExampleUtils {
}
/**
- * If this is an unbounded (streaming) pipeline, and both inputFile and pubsub topic are defined,
- * start an 'injector' pipeline that publishes the contents of the file to the given topic, first
- * creating the topic if necessary.
- */
- public void startInjectorIfNeeded(String inputFile) {
- ExamplePubsubTopicOptions pubsubTopicOptions = options.as(ExamplePubsubTopicOptions.class);
- if (pubsubTopicOptions.isStreaming()
- && !Strings.isNullOrEmpty(inputFile)
- && !Strings.isNullOrEmpty(pubsubTopicOptions.getPubsubTopic())) {
- runInjectorPipeline(inputFile, pubsubTopicOptions.getPubsubTopic());
- }
- }
-
- /**
* Do some runner setup: check that the DirectRunner is not used in conjunction with
* streaming, and if streaming is specified, use the DataflowRunner.
*/
@@ -330,72 +309,6 @@ public class DataflowExampleUtils {
}
/**
- * Runs a batch pipeline to inject data into the PubSubIO input topic.
- *
- * <p>The injector pipeline will read from the given text file, and inject data
- * into the Google Cloud Pub/Sub topic.
- */
- public void runInjectorPipeline(String inputFile, String topic) {
- runInjectorPipeline(TextIO.Read.from(inputFile), topic, null);
- }
-
- /**
- * Runs a batch pipeline to inject data into the PubSubIO input topic.
- *
- * <p>The injector pipeline will read from the given source, and inject data
- * into the Google Cloud Pub/Sub topic.
- */
- public void runInjectorPipeline(PTransform<? super PBegin, PCollection<String>> readSource,
- String topic,
- String pubsubTimestampTabelKey) {
- PubsubFileInjector.Bound injector;
- if (Strings.isNullOrEmpty(pubsubTimestampTabelKey)) {
- injector = PubsubFileInjector.publish(topic);
- } else {
- injector = PubsubFileInjector.withTimestampLabelKey(pubsubTimestampTabelKey).publish(topic);
- }
- DataflowPipelineOptions copiedOptions = options.cloneAs(DataflowPipelineOptions.class);
- if (options.getServiceAccountName() != null) {
- copiedOptions.setServiceAccountName(options.getServiceAccountName());
- }
- if (options.getServiceAccountKeyfile() != null) {
- copiedOptions.setServiceAccountKeyfile(options.getServiceAccountKeyfile());
- }
- copiedOptions.setStreaming(false);
- copiedOptions.setWorkerHarnessContainerImage(
- DataflowRunner.BATCH_WORKER_HARNESS_CONTAINER_IMAGE);
- copiedOptions.setNumWorkers(options.as(DataflowExampleOptions.class).getInjectorNumWorkers());
- copiedOptions.setJobName(options.getJobName() + "-injector");
- Pipeline injectorPipeline = Pipeline.create(copiedOptions);
- injectorPipeline.apply(readSource)
- .apply(IntraBundleParallelization
- .of(injector)
- .withMaxParallelism(20));
- PipelineResult result = injectorPipeline.run();
- if (result instanceof DataflowPipelineJob) {
- jobsToCancel.add(((DataflowPipelineJob) result));
- }
- }
-
- /**
- * Runs the provided pipeline to inject data into the PubSubIO input topic.
- */
- public void runInjectorPipeline(Pipeline injectorPipeline) {
- PipelineResult result = injectorPipeline.run();
- if (result instanceof DataflowPipelineJob) {
- jobsToCancel.add(((DataflowPipelineJob) result));
- }
- }
-
- /**
- * Start the auxiliary injector pipeline, then wait for this pipeline to finish.
- */
- public void mockUnboundedSource(String inputFile, PipelineResult result) {
- startInjectorIfNeeded(inputFile);
- waitToFinish(result);
- }
-
- /**
* If {@literal DataflowRunner} or {@literal BlockingDataflowRunner} is used,
* waits for the pipeline to finish and cancels it (and the injector) before the program exists.
*/
[2/2] incubator-beam git commit: Closes #594
Posted by dh...@apache.org.
Closes #594
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1cb898f5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1cb898f5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1cb898f5
Branch: refs/heads/master
Commit: 1cb898f58dc77a98ef73210e00f8aa71d65303ab
Parents: 5eb7d95 51427f7
Author: Dan Halperin <dh...@google.com>
Authored: Wed Jul 6 17:25:25 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Jul 6 17:25:25 2016 -0700
----------------------------------------------------------------------
.../examples/common/DataflowExampleUtils.java | 87 --------------------
1 file changed, 87 deletions(-)
----------------------------------------------------------------------