You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/07/07 00:25:28 UTC

[1/2] incubator-beam git commit: remove runInjectorPipeline from DataflowExampleUtils

Repository: incubator-beam
Updated Branches:
  refs/heads/master 5eb7d95f3 -> 1cb898f58


remove runInjectorPipeline from DataflowExampleUtils


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/51427f71
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/51427f71
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/51427f71

Branch: refs/heads/master
Commit: 51427f712d58a10f1de5a3db03030a6f920510cb
Parents: 5eb7d95
Author: Pei He <pe...@google.com>
Authored: Wed Jul 6 13:24:45 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Jul 6 17:25:24 2016 -0700

----------------------------------------------------------------------
 .../examples/common/DataflowExampleUtils.java   | 87 --------------------
 1 file changed, 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51427f71/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java b/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java
index b436ed1..a90968a 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java
@@ -22,17 +22,11 @@ import org.apache.beam.runners.dataflow.DataflowPipelineJob;
 import org.apache.beam.runners.dataflow.DataflowRunner;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.runners.dataflow.util.MonitoringUtil;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.options.BigQueryOptions;
 import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.transforms.IntraBundleParallelization;
-import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
 import org.apache.beam.sdk.util.Transport;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
 
 import com.google.api.client.googleapis.json.GoogleJsonResponseException;
 import com.google.api.client.googleapis.services.AbstractGoogleClientRequest;
@@ -51,7 +45,6 @@ import com.google.api.services.dataflow.Dataflow;
 import com.google.api.services.pubsub.Pubsub;
 import com.google.api.services.pubsub.model.Subscription;
 import com.google.api.services.pubsub.model.Topic;
-import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Uninterruptibles;
@@ -301,20 +294,6 @@ public class DataflowExampleUtils {
   }
 
   /**
-   * If this is an unbounded (streaming) pipeline, and both inputFile and pubsub topic are defined,
-   * start an 'injector' pipeline that publishes the contents of the file to the given topic, first
-   * creating the topic if necessary.
-   */
-  public void startInjectorIfNeeded(String inputFile) {
-    ExamplePubsubTopicOptions pubsubTopicOptions = options.as(ExamplePubsubTopicOptions.class);
-    if (pubsubTopicOptions.isStreaming()
-        && !Strings.isNullOrEmpty(inputFile)
-        && !Strings.isNullOrEmpty(pubsubTopicOptions.getPubsubTopic())) {
-      runInjectorPipeline(inputFile, pubsubTopicOptions.getPubsubTopic());
-    }
-  }
-
-  /**
    * Do some runner setup: check that the DirectRunner is not used in conjunction with
    * streaming, and if streaming is specified, use the DataflowRunner.
    */
@@ -330,72 +309,6 @@ public class DataflowExampleUtils {
   }
 
   /**
-   * Runs a batch pipeline to inject data into the PubSubIO input topic.
-   *
-   * <p>The injector pipeline will read from the given text file, and inject data
-   * into the Google Cloud Pub/Sub topic.
-   */
-  public void runInjectorPipeline(String inputFile, String topic) {
-    runInjectorPipeline(TextIO.Read.from(inputFile), topic, null);
-  }
-
-  /**
-   * Runs a batch pipeline to inject data into the PubSubIO input topic.
-   *
-   * <p>The injector pipeline will read from the given source, and inject data
-   * into the Google Cloud Pub/Sub topic.
-   */
-  public void runInjectorPipeline(PTransform<? super PBegin, PCollection<String>> readSource,
-                                  String topic,
-                                  String pubsubTimestampTabelKey) {
-    PubsubFileInjector.Bound injector;
-    if (Strings.isNullOrEmpty(pubsubTimestampTabelKey)) {
-      injector = PubsubFileInjector.publish(topic);
-    } else {
-      injector = PubsubFileInjector.withTimestampLabelKey(pubsubTimestampTabelKey).publish(topic);
-    }
-    DataflowPipelineOptions copiedOptions = options.cloneAs(DataflowPipelineOptions.class);
-    if (options.getServiceAccountName() != null) {
-      copiedOptions.setServiceAccountName(options.getServiceAccountName());
-    }
-    if (options.getServiceAccountKeyfile() != null) {
-      copiedOptions.setServiceAccountKeyfile(options.getServiceAccountKeyfile());
-    }
-    copiedOptions.setStreaming(false);
-    copiedOptions.setWorkerHarnessContainerImage(
-        DataflowRunner.BATCH_WORKER_HARNESS_CONTAINER_IMAGE);
-    copiedOptions.setNumWorkers(options.as(DataflowExampleOptions.class).getInjectorNumWorkers());
-    copiedOptions.setJobName(options.getJobName() + "-injector");
-    Pipeline injectorPipeline = Pipeline.create(copiedOptions);
-    injectorPipeline.apply(readSource)
-                    .apply(IntraBundleParallelization
-                        .of(injector)
-                        .withMaxParallelism(20));
-    PipelineResult result = injectorPipeline.run();
-    if (result instanceof DataflowPipelineJob) {
-      jobsToCancel.add(((DataflowPipelineJob) result));
-    }
-  }
-
-  /**
-   * Runs the provided pipeline to inject data into the PubSubIO input topic.
-   */
-  public void runInjectorPipeline(Pipeline injectorPipeline) {
-    PipelineResult result = injectorPipeline.run();
-    if (result instanceof DataflowPipelineJob) {
-      jobsToCancel.add(((DataflowPipelineJob) result));
-    }
-  }
-
-  /**
-   * Start the auxiliary injector pipeline, then wait for this pipeline to finish.
-   */
-  public void mockUnboundedSource(String inputFile, PipelineResult result) {
-    startInjectorIfNeeded(inputFile);
-    waitToFinish(result);
-  }
-
-  /**
    * If {@literal DataflowRunner} or {@literal BlockingDataflowRunner} is used,
    * waits for the pipeline to finish and cancels it (and the injector) before the program exists.
    */


[2/2] incubator-beam git commit: Closes #594

Posted by dh...@apache.org.
Closes #594


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1cb898f5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1cb898f5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1cb898f5

Branch: refs/heads/master
Commit: 1cb898f58dc77a98ef73210e00f8aa71d65303ab
Parents: 5eb7d95 51427f7
Author: Dan Halperin <dh...@google.com>
Authored: Wed Jul 6 17:25:25 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Jul 6 17:25:25 2016 -0700

----------------------------------------------------------------------
 .../examples/common/DataflowExampleUtils.java   | 87 --------------------
 1 file changed, 87 deletions(-)
----------------------------------------------------------------------