You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/24 03:48:23 UTC
[59/67] incubator-beam git commit: Directory reorganization
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/java/src/main/java/com/google/cloud/dataflow/examples/common/DataflowExampleUtils.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/com/google/cloud/dataflow/examples/common/DataflowExampleUtils.java b/examples/java/src/main/java/com/google/cloud/dataflow/examples/common/DataflowExampleUtils.java
new file mode 100644
index 0000000..4dfdd85
--- /dev/null
+++ b/examples/java/src/main/java/com/google/cloud/dataflow/examples/common/DataflowExampleUtils.java
@@ -0,0 +1,485 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples.common;
+
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.client.googleapis.services.AbstractGoogleClientRequest;
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.BackOffUtils;
+import com.google.api.client.util.Sleeper;
+import com.google.api.services.bigquery.Bigquery;
+import com.google.api.services.bigquery.Bigquery.Datasets;
+import com.google.api.services.bigquery.Bigquery.Tables;
+import com.google.api.services.bigquery.model.Dataset;
+import com.google.api.services.bigquery.model.DatasetReference;
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.api.services.dataflow.Dataflow;
+import com.google.api.services.pubsub.Pubsub;
+import com.google.api.services.pubsub.model.Subscription;
+import com.google.api.services.pubsub.model.Topic;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.PipelineResult;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.options.BigQueryOptions;
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.runners.DataflowPipelineJob;
+import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
+import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
+import com.google.cloud.dataflow.sdk.transforms.IntraBundleParallelization;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.util.AttemptBoundedExponentialBackOff;
+import com.google.cloud.dataflow.sdk.util.MonitoringUtil;
+import com.google.cloud.dataflow.sdk.util.Transport;
+import com.google.cloud.dataflow.sdk.values.PBegin;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.common.base.Strings;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import javax.servlet.http.HttpServletResponse;
+
+/**
+ * The utility class that sets up and tears down external resources, starts the Google Cloud Pub/Sub
+ * injector, and cancels the streaming and the injector pipelines once the program terminates.
+ *
+ * <p>It is used to run Dataflow examples, such as TrafficMaxLaneFlow and TrafficRoutes.
+ */
+public class DataflowExampleUtils {
+
+ private final DataflowPipelineOptions options;
+ private Bigquery bigQueryClient = null;
+ private Pubsub pubsubClient = null;
+ private Dataflow dataflowClient = null;
+ private Set<DataflowPipelineJob> jobsToCancel = Sets.newHashSet();
+ private List<String> pendingMessages = Lists.newArrayList();
+
+ public DataflowExampleUtils(DataflowPipelineOptions options) {
+ this.options = options;
+ }
+
+ /**
+ * Do resources and runner options setup.
+ */
+ public DataflowExampleUtils(DataflowPipelineOptions options, boolean isUnbounded)
+ throws IOException {
+ this.options = options;
+ setupResourcesAndRunner(isUnbounded);
+ }
+
+ /**
+ * Sets up external resources that are required by the example,
+ * such as Pub/Sub topics and BigQuery tables.
+ *
+ * @throws IOException if there is a problem setting up the resources
+ */
+ public void setup() throws IOException {
+ Sleeper sleeper = Sleeper.DEFAULT;
+ BackOff backOff = new AttemptBoundedExponentialBackOff(3, 200);
+ Throwable lastException = null;
+ try {
+ do {
+ try {
+ setupPubsub();
+ setupBigQueryTable();
+ return;
+ } catch (GoogleJsonResponseException e) {
+ lastException = e;
+ }
+ } while (BackOffUtils.next(sleeper, backOff));
+ } catch (InterruptedException e) {
+ // Ignore InterruptedException
+ }
+ Throwables.propagate(lastException);
+ }
+
+ /**
+ * Set up external resources, and configure the runner appropriately.
+ */
+ public void setupResourcesAndRunner(boolean isUnbounded) throws IOException {
+ if (isUnbounded) {
+ options.setStreaming(true);
+ }
+ setup();
+ setupRunner();
+ }
+
+ /**
+ * Sets up the Google Cloud Pub/Sub topic.
+ *
+ * <p>If the topic doesn't exist, a new topic with the given name will be created.
+ *
+ * @throws IOException if there is a problem setting up the Pub/Sub topic
+ */
+ public void setupPubsub() throws IOException {
+ ExamplePubsubTopicAndSubscriptionOptions pubsubOptions =
+ options.as(ExamplePubsubTopicAndSubscriptionOptions.class);
+ if (!pubsubOptions.getPubsubTopic().isEmpty()) {
+ pendingMessages.add("**********************Set Up Pubsub************************");
+ setupPubsubTopic(pubsubOptions.getPubsubTopic());
+ pendingMessages.add("The Pub/Sub topic has been set up for this example: "
+ + pubsubOptions.getPubsubTopic());
+
+ if (!pubsubOptions.getPubsubSubscription().isEmpty()) {
+ setupPubsubSubscription(
+ pubsubOptions.getPubsubTopic(), pubsubOptions.getPubsubSubscription());
+ pendingMessages.add("The Pub/Sub subscription has been set up for this example: "
+ + pubsubOptions.getPubsubSubscription());
+ }
+ }
+ }
+
+ /**
+ * Sets up the BigQuery table with the given schema.
+ *
+ * <p>If the table already exists, the schema has to match the given one. Otherwise, the example
+ * will throw a RuntimeException. If the table doesn't exist, a new table with the given schema
+ * will be created.
+ *
+ * @throws IOException if there is a problem setting up the BigQuery table
+ */
+ public void setupBigQueryTable() throws IOException {
+ ExampleBigQueryTableOptions bigQueryTableOptions =
+ options.as(ExampleBigQueryTableOptions.class);
+ if (bigQueryTableOptions.getBigQueryDataset() != null
+ && bigQueryTableOptions.getBigQueryTable() != null
+ && bigQueryTableOptions.getBigQuerySchema() != null) {
+ pendingMessages.add("******************Set Up Big Query Table*******************");
+ setupBigQueryTable(bigQueryTableOptions.getProject(),
+ bigQueryTableOptions.getBigQueryDataset(),
+ bigQueryTableOptions.getBigQueryTable(),
+ bigQueryTableOptions.getBigQuerySchema());
+ pendingMessages.add("The BigQuery table has been set up for this example: "
+ + bigQueryTableOptions.getProject()
+ + ":" + bigQueryTableOptions.getBigQueryDataset()
+ + "." + bigQueryTableOptions.getBigQueryTable());
+ }
+ }
+
+ /**
+ * Tears down external resources that can be deleted upon the example's completion.
+ */
+ private void tearDown() {
+ pendingMessages.add("*************************Tear Down*************************");
+ ExamplePubsubTopicAndSubscriptionOptions pubsubOptions =
+ options.as(ExamplePubsubTopicAndSubscriptionOptions.class);
+ if (!pubsubOptions.getPubsubTopic().isEmpty()) {
+ try {
+ deletePubsubTopic(pubsubOptions.getPubsubTopic());
+ pendingMessages.add("The Pub/Sub topic has been deleted: "
+ + pubsubOptions.getPubsubTopic());
+ } catch (IOException e) {
+ pendingMessages.add("Failed to delete the Pub/Sub topic : "
+ + pubsubOptions.getPubsubTopic());
+ }
+ if (!pubsubOptions.getPubsubSubscription().isEmpty()) {
+ try {
+ deletePubsubSubscription(pubsubOptions.getPubsubSubscription());
+ pendingMessages.add("The Pub/Sub subscription has been deleted: "
+ + pubsubOptions.getPubsubSubscription());
+ } catch (IOException e) {
+ pendingMessages.add("Failed to delete the Pub/Sub subscription : "
+ + pubsubOptions.getPubsubSubscription());
+ }
+ }
+ }
+
+ ExampleBigQueryTableOptions bigQueryTableOptions =
+ options.as(ExampleBigQueryTableOptions.class);
+ if (bigQueryTableOptions.getBigQueryDataset() != null
+ && bigQueryTableOptions.getBigQueryTable() != null
+ && bigQueryTableOptions.getBigQuerySchema() != null) {
+ pendingMessages.add("The BigQuery table might contain the example's output, "
+ + "and it is not deleted automatically: "
+ + bigQueryTableOptions.getProject()
+ + ":" + bigQueryTableOptions.getBigQueryDataset()
+ + "." + bigQueryTableOptions.getBigQueryTable());
+ pendingMessages.add("Please go to the Developers Console to delete it manually."
+ + " Otherwise, you may be charged for its usage.");
+ }
+ }
+
+ private void setupBigQueryTable(String projectId, String datasetId, String tableId,
+ TableSchema schema) throws IOException {
+ if (bigQueryClient == null) {
+ bigQueryClient = Transport.newBigQueryClient(options.as(BigQueryOptions.class)).build();
+ }
+
+ Datasets datasetService = bigQueryClient.datasets();
+ if (executeNullIfNotFound(datasetService.get(projectId, datasetId)) == null) {
+ Dataset newDataset = new Dataset().setDatasetReference(
+ new DatasetReference().setProjectId(projectId).setDatasetId(datasetId));
+ datasetService.insert(projectId, newDataset).execute();
+ }
+
+ Tables tableService = bigQueryClient.tables();
+ Table table = executeNullIfNotFound(tableService.get(projectId, datasetId, tableId));
+ if (table == null) {
+ Table newTable = new Table().setSchema(schema).setTableReference(
+ new TableReference().setProjectId(projectId).setDatasetId(datasetId).setTableId(tableId));
+ tableService.insert(projectId, datasetId, newTable).execute();
+ } else if (!table.getSchema().equals(schema)) {
+ throw new RuntimeException(
+ "Table exists and schemas do not match, expecting: " + schema.toPrettyString()
+ + ", actual: " + table.getSchema().toPrettyString());
+ }
+ }
+
+ private void setupPubsubTopic(String topic) throws IOException {
+ if (pubsubClient == null) {
+ pubsubClient = Transport.newPubsubClient(options).build();
+ }
+ if (executeNullIfNotFound(pubsubClient.projects().topics().get(topic)) == null) {
+ pubsubClient.projects().topics().create(topic, new Topic().setName(topic)).execute();
+ }
+ }
+
+ private void setupPubsubSubscription(String topic, String subscription) throws IOException {
+ if (pubsubClient == null) {
+ pubsubClient = Transport.newPubsubClient(options).build();
+ }
+ if (executeNullIfNotFound(pubsubClient.projects().subscriptions().get(subscription)) == null) {
+ Subscription subInfo = new Subscription()
+ .setAckDeadlineSeconds(60)
+ .setTopic(topic);
+ pubsubClient.projects().subscriptions().create(subscription, subInfo).execute();
+ }
+ }
+
+ /**
+ * Deletes the Google Cloud Pub/Sub topic.
+ *
+ * @throws IOException if there is a problem deleting the Pub/Sub topic
+ */
+ private void deletePubsubTopic(String topic) throws IOException {
+ if (pubsubClient == null) {
+ pubsubClient = Transport.newPubsubClient(options).build();
+ }
+ if (executeNullIfNotFound(pubsubClient.projects().topics().get(topic)) != null) {
+ pubsubClient.projects().topics().delete(topic).execute();
+ }
+ }
+
+ /**
+ * Deletes the Google Cloud Pub/Sub subscription.
+ *
+ * @throws IOException if there is a problem deleting the Pub/Sub subscription
+ */
+ private void deletePubsubSubscription(String subscription) throws IOException {
+ if (pubsubClient == null) {
+ pubsubClient = Transport.newPubsubClient(options).build();
+ }
+ if (executeNullIfNotFound(pubsubClient.projects().subscriptions().get(subscription)) != null) {
+ pubsubClient.projects().subscriptions().delete(subscription).execute();
+ }
+ }
+
+ /**
+ * If this is an unbounded (streaming) pipeline, and both inputFile and pubsub topic are defined,
+ * start an 'injector' pipeline that publishes the contents of the file to the given topic, first
+ * creating the topic if necessary.
+ */
+ public void startInjectorIfNeeded(String inputFile) {
+ ExamplePubsubTopicOptions pubsubTopicOptions = options.as(ExamplePubsubTopicOptions.class);
+ if (pubsubTopicOptions.isStreaming()
+ && !Strings.isNullOrEmpty(inputFile)
+ && !Strings.isNullOrEmpty(pubsubTopicOptions.getPubsubTopic())) {
+ runInjectorPipeline(inputFile, pubsubTopicOptions.getPubsubTopic());
+ }
+ }
+
+ /**
+ * Do some runner setup: check that the DirectPipelineRunner is not used in conjunction with
+ * streaming, and if streaming is specified, use the DataflowPipelineRunner. Return the streaming
+ * flag value.
+ */
+ public void setupRunner() {
+ if (options.isStreaming() && options.getRunner() != DirectPipelineRunner.class) {
+ // In order to cancel the pipelines automatically,
+ // {@literal DataflowPipelineRunner} is forced to be used.
+ options.setRunner(DataflowPipelineRunner.class);
+ }
+ }
+
+ /**
+ * Runs a batch pipeline to inject data into the PubSubIO input topic.
+ *
+ * <p>The injector pipeline will read from the given text file, and inject data
+ * into the Google Cloud Pub/Sub topic.
+ */
+ public void runInjectorPipeline(String inputFile, String topic) {
+ runInjectorPipeline(TextIO.Read.from(inputFile), topic, null);
+ }
+
+ /**
+ * Runs a batch pipeline to inject data into the PubSubIO input topic.
+ *
+ * <p>The injector pipeline will read from the given source, and inject data
+ * into the Google Cloud Pub/Sub topic.
+ */
+ public void runInjectorPipeline(PTransform<? super PBegin, PCollection<String>> readSource,
+ String topic,
+ String pubsubTimestampTabelKey) {
+ PubsubFileInjector.Bound injector;
+ if (Strings.isNullOrEmpty(pubsubTimestampTabelKey)) {
+ injector = PubsubFileInjector.publish(topic);
+ } else {
+ injector = PubsubFileInjector.withTimestampLabelKey(pubsubTimestampTabelKey).publish(topic);
+ }
+ DataflowPipelineOptions copiedOptions = options.cloneAs(DataflowPipelineOptions.class);
+ if (options.getServiceAccountName() != null) {
+ copiedOptions.setServiceAccountName(options.getServiceAccountName());
+ }
+ if (options.getServiceAccountKeyfile() != null) {
+ copiedOptions.setServiceAccountKeyfile(options.getServiceAccountKeyfile());
+ }
+ copiedOptions.setStreaming(false);
+ copiedOptions.setNumWorkers(options.as(DataflowExampleOptions.class).getInjectorNumWorkers());
+ copiedOptions.setJobName(options.getJobName() + "-injector");
+ Pipeline injectorPipeline = Pipeline.create(copiedOptions);
+ injectorPipeline.apply(readSource)
+ .apply(IntraBundleParallelization
+ .of(injector)
+ .withMaxParallelism(20));
+ PipelineResult result = injectorPipeline.run();
+ if (result instanceof DataflowPipelineJob) {
+ jobsToCancel.add(((DataflowPipelineJob) result));
+ }
+ }
+
+ /**
+ * Runs the provided pipeline to inject data into the PubSubIO input topic.
+ */
+ public void runInjectorPipeline(Pipeline injectorPipeline) {
+ PipelineResult result = injectorPipeline.run();
+ if (result instanceof DataflowPipelineJob) {
+ jobsToCancel.add(((DataflowPipelineJob) result));
+ }
+ }
+
+ /**
+ * Start the auxiliary injector pipeline, then wait for this pipeline to finish.
+ */
+ public void mockUnboundedSource(String inputFile, PipelineResult result) {
+ startInjectorIfNeeded(inputFile);
+ waitToFinish(result);
+ }
+
+ /**
+ * If {@literal DataflowPipelineRunner} or {@literal BlockingDataflowPipelineRunner} is used,
+ * waits for the pipeline to finish and cancels it (and the injector) before the program exists.
+ */
+ public void waitToFinish(PipelineResult result) {
+ if (result instanceof DataflowPipelineJob) {
+ final DataflowPipelineJob job = (DataflowPipelineJob) result;
+ jobsToCancel.add(job);
+ if (!options.as(DataflowExampleOptions.class).getKeepJobsRunning()) {
+ addShutdownHook(jobsToCancel);
+ }
+ try {
+ job.waitToFinish(-1, TimeUnit.SECONDS, new MonitoringUtil.PrintHandler(System.out));
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to wait for job to finish: " + job.getJobId());
+ }
+ } else {
+ // Do nothing if the given PipelineResult doesn't support waitToFinish(),
+ // such as EvaluationResults returned by DirectPipelineRunner.
+ tearDown();
+ printPendingMessages();
+ }
+ }
+
+ private void addShutdownHook(final Collection<DataflowPipelineJob> jobs) {
+ if (dataflowClient == null) {
+ dataflowClient = options.getDataflowClient();
+ }
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ tearDown();
+ printPendingMessages();
+ for (DataflowPipelineJob job : jobs) {
+ System.out.println("Canceling example pipeline: " + job.getJobId());
+ try {
+ job.cancel();
+ } catch (IOException e) {
+ System.out.println("Failed to cancel the job,"
+ + " please go to the Developers Console to cancel it manually");
+ System.out.println(
+ MonitoringUtil.getJobMonitoringPageURL(job.getProjectId(), job.getJobId()));
+ }
+ }
+
+ for (DataflowPipelineJob job : jobs) {
+ boolean cancellationVerified = false;
+ for (int retryAttempts = 6; retryAttempts > 0; retryAttempts--) {
+ if (job.getState().isTerminal()) {
+ cancellationVerified = true;
+ System.out.println("Canceled example pipeline: " + job.getJobId());
+ break;
+ } else {
+ System.out.println(
+ "The example pipeline is still running. Verifying the cancellation.");
+ }
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ }
+ if (!cancellationVerified) {
+ System.out.println("Failed to verify the cancellation for job: " + job.getJobId());
+ System.out.println("Please go to the Developers Console to verify manually:");
+ System.out.println(
+ MonitoringUtil.getJobMonitoringPageURL(job.getProjectId(), job.getJobId()));
+ }
+ }
+ }
+ });
+ }
+
+ private void printPendingMessages() {
+ System.out.println();
+ System.out.println("***********************************************************");
+ System.out.println("***********************************************************");
+ for (String message : pendingMessages) {
+ System.out.println(message);
+ }
+ System.out.println("***********************************************************");
+ System.out.println("***********************************************************");
+ }
+
+ private static <T> T executeNullIfNotFound(
+ AbstractGoogleClientRequest<T> request) throws IOException {
+ try {
+ return request.execute();
+ } catch (GoogleJsonResponseException e) {
+ if (e.getStatusCode() == HttpServletResponse.SC_NOT_FOUND) {
+ return null;
+ } else {
+ throw e;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/java/src/main/java/com/google/cloud/dataflow/examples/common/ExampleBigQueryTableOptions.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/com/google/cloud/dataflow/examples/common/ExampleBigQueryTableOptions.java b/examples/java/src/main/java/com/google/cloud/dataflow/examples/common/ExampleBigQueryTableOptions.java
new file mode 100644
index 0000000..7c213b5
--- /dev/null
+++ b/examples/java/src/main/java/com/google/cloud/dataflow/examples/common/ExampleBigQueryTableOptions.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples.common;
+
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.DefaultValueFactory;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+
+/**
+ * Options that can be used to configure BigQuery tables in Dataflow examples.
+ * The project defaults to the project being used to run the example.
+ */
+public interface ExampleBigQueryTableOptions extends DataflowPipelineOptions {
+ @Description("BigQuery dataset name")
+ @Default.String("dataflow_examples")
+ String getBigQueryDataset();
+ void setBigQueryDataset(String dataset);
+
+ @Description("BigQuery table name")
+ @Default.InstanceFactory(BigQueryTableFactory.class)
+ String getBigQueryTable();
+ void setBigQueryTable(String table);
+
+ @Description("BigQuery table schema")
+ TableSchema getBigQuerySchema();
+ void setBigQuerySchema(TableSchema schema);
+
+ /**
+ * Returns the job name as the default BigQuery table name.
+ */
+ static class BigQueryTableFactory implements DefaultValueFactory<String> {
+ @Override
+ public String create(PipelineOptions options) {
+ return options.as(DataflowPipelineOptions.class).getJobName()
+ .replace('-', '_');
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/java/src/main/java/com/google/cloud/dataflow/examples/common/ExamplePubsubTopicAndSubscriptionOptions.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/com/google/cloud/dataflow/examples/common/ExamplePubsubTopicAndSubscriptionOptions.java b/examples/java/src/main/java/com/google/cloud/dataflow/examples/common/ExamplePubsubTopicAndSubscriptionOptions.java
new file mode 100644
index 0000000..d7bd4b8
--- /dev/null
+++ b/examples/java/src/main/java/com/google/cloud/dataflow/examples/common/ExamplePubsubTopicAndSubscriptionOptions.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples.common;
+
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.DefaultValueFactory;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+
+/**
+ * Options that can be used to configure Pub/Sub topic/subscription in Dataflow examples.
+ */
+public interface ExamplePubsubTopicAndSubscriptionOptions extends ExamplePubsubTopicOptions {
+ @Description("Pub/Sub subscription")
+ @Default.InstanceFactory(PubsubSubscriptionFactory.class)
+ String getPubsubSubscription();
+ void setPubsubSubscription(String subscription);
+
+ /**
+ * Returns a default Pub/Sub subscription based on the project and the job names.
+ */
+ static class PubsubSubscriptionFactory implements DefaultValueFactory<String> {
+ @Override
+ public String create(PipelineOptions options) {
+ DataflowPipelineOptions dataflowPipelineOptions =
+ options.as(DataflowPipelineOptions.class);
+ return "projects/" + dataflowPipelineOptions.getProject()
+ + "/subscriptions/" + dataflowPipelineOptions.getJobName();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/java/src/main/java/com/google/cloud/dataflow/examples/common/ExamplePubsubTopicOptions.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/com/google/cloud/dataflow/examples/common/ExamplePubsubTopicOptions.java b/examples/java/src/main/java/com/google/cloud/dataflow/examples/common/ExamplePubsubTopicOptions.java
new file mode 100644
index 0000000..4bedf31
--- /dev/null
+++ b/examples/java/src/main/java/com/google/cloud/dataflow/examples/common/ExamplePubsubTopicOptions.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples.common;
+
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.DefaultValueFactory;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+
+/**
+ * Options that can be used to configure Pub/Sub topic in Dataflow examples.
+ */
+public interface ExamplePubsubTopicOptions extends DataflowPipelineOptions {
+ @Description("Pub/Sub topic")
+ @Default.InstanceFactory(PubsubTopicFactory.class)
+ String getPubsubTopic();
+ void setPubsubTopic(String topic);
+
+ /**
+ * Returns a default Pub/Sub topic based on the project and the job names.
+ */
+ static class PubsubTopicFactory implements DefaultValueFactory<String> {
+ @Override
+ public String create(PipelineOptions options) {
+ DataflowPipelineOptions dataflowPipelineOptions =
+ options.as(DataflowPipelineOptions.class);
+ return "projects/" + dataflowPipelineOptions.getProject()
+ + "/topics/" + dataflowPipelineOptions.getJobName();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/java/src/main/java/com/google/cloud/dataflow/examples/common/PubsubFileInjector.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/com/google/cloud/dataflow/examples/common/PubsubFileInjector.java b/examples/java/src/main/java/com/google/cloud/dataflow/examples/common/PubsubFileInjector.java
new file mode 100644
index 0000000..4a82ae6
--- /dev/null
+++ b/examples/java/src/main/java/com/google/cloud/dataflow/examples/common/PubsubFileInjector.java
@@ -0,0 +1,153 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples.common;
+
+import com.google.api.services.pubsub.Pubsub;
+import com.google.api.services.pubsub.model.PublishRequest;
+import com.google.api.services.pubsub.model.PubsubMessage;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.options.Validation;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.IntraBundleParallelization;
+import com.google.cloud.dataflow.sdk.util.Transport;
+import com.google.common.collect.ImmutableMap;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+/**
+ * A batch Dataflow pipeline for injecting a set of GCS files into
+ * a PubSub topic line by line. Empty lines are skipped.
+ *
+ * <p>This is useful for testing streaming
+ * pipelines. Note that since batch pipelines might retry chunks, this
+ * does _not_ guarantee exactly-once injection of file data. Some lines may
+ * be published multiple times.
+ * </p>
+ */
+public class PubsubFileInjector {
+
+ /**
+ * An incomplete {@code PubsubFileInjector} transform with unbound output topic.
+ */
+ public static class Unbound {
+ private final String timestampLabelKey;
+
+ Unbound() {
+ this.timestampLabelKey = null;
+ }
+
+ Unbound(String timestampLabelKey) {
+ this.timestampLabelKey = timestampLabelKey;
+ }
+
+ Unbound withTimestampLabelKey(String timestampLabelKey) {
+ return new Unbound(timestampLabelKey);
+ }
+
+ public Bound publish(String outputTopic) {
+ return new Bound(outputTopic, timestampLabelKey);
+ }
+ }
+
+ /** A DoFn that publishes non-empty lines to Google Cloud PubSub. */
+ public static class Bound extends DoFn<String, Void> {
+ private final String outputTopic;
+ private final String timestampLabelKey;
+ public transient Pubsub pubsub;
+
+ public Bound(String outputTopic, String timestampLabelKey) {
+ this.outputTopic = outputTopic;
+ this.timestampLabelKey = timestampLabelKey;
+ }
+
+ @Override
+ public void startBundle(Context context) {
+ this.pubsub =
+ Transport.newPubsubClient(context.getPipelineOptions().as(DataflowPipelineOptions.class))
+ .build();
+ }
+
+ @Override
+ public void processElement(ProcessContext c) throws IOException {
+ if (c.element().isEmpty()) {
+ return;
+ }
+ PubsubMessage pubsubMessage = new PubsubMessage();
+ pubsubMessage.encodeData(c.element().getBytes());
+ if (timestampLabelKey != null) {
+ pubsubMessage.setAttributes(
+ ImmutableMap.of(timestampLabelKey, Long.toString(c.timestamp().getMillis())));
+ }
+ PublishRequest publishRequest = new PublishRequest();
+ publishRequest.setMessages(Arrays.asList(pubsubMessage));
+ this.pubsub.projects().topics().publish(outputTopic, publishRequest).execute();
+ }
+ }
+
+ /**
+ * Creates a {@code PubsubFileInjector} transform with the given timestamp label key.
+ */
+ public static Unbound withTimestampLabelKey(String timestampLabelKey) {
+ return new Unbound(timestampLabelKey);
+ }
+
+ /**
+ * Creates a {@code PubsubFileInjector} transform that publishes to the given output topic.
+ */
+ public static Bound publish(String outputTopic) {
+ return new Unbound().publish(outputTopic);
+ }
+
+ /**
+ * Command line parameter options.
+ */
+ private interface PubsubFileInjectorOptions extends PipelineOptions {
+ @Description("GCS location of files.")
+ @Validation.Required
+ String getInput();
+ void setInput(String value);
+
+ @Description("Topic to publish on.")
+ @Validation.Required
+ String getOutputTopic();
+ void setOutputTopic(String value);
+ }
+
+ /**
+ * Sets up and starts streaming pipeline.
+ */
+ public static void main(String[] args) {
+ PubsubFileInjectorOptions options = PipelineOptionsFactory.fromArgs(args)
+ .withValidation()
+ .as(PubsubFileInjectorOptions.class);
+
+ Pipeline pipeline = Pipeline.create(options);
+
+ pipeline
+ .apply(TextIO.Read.from(options.getInput()))
+ .apply(IntraBundleParallelization.of(PubsubFileInjector.publish(options.getOutputTopic()))
+ .withMaxParallelism(20));
+
+ pipeline.run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/java/src/main/java/com/google/cloud/dataflow/examples/complete/AutoComplete.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/com/google/cloud/dataflow/examples/complete/AutoComplete.java b/examples/java/src/main/java/com/google/cloud/dataflow/examples/complete/AutoComplete.java
new file mode 100644
index 0000000..f897338
--- /dev/null
+++ b/examples/java/src/main/java/com/google/cloud/dataflow/examples/complete/AutoComplete.java
@@ -0,0 +1,516 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples.complete;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.api.services.datastore.DatastoreV1.Entity;
+import com.google.api.services.datastore.DatastoreV1.Key;
+import com.google.api.services.datastore.DatastoreV1.Value;
+import com.google.api.services.datastore.client.DatastoreHelper;
+import com.google.cloud.dataflow.examples.common.DataflowExampleUtils;
+import com.google.cloud.dataflow.examples.common.ExampleBigQueryTableOptions;
+import com.google.cloud.dataflow.examples.common.ExamplePubsubTopicOptions;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.PipelineResult;
+import com.google.cloud.dataflow.sdk.coders.AvroCoder;
+import com.google.cloud.dataflow.sdk.coders.DefaultCoder;
+import com.google.cloud.dataflow.sdk.io.BigQueryIO;
+import com.google.cloud.dataflow.sdk.io.DatastoreIO;
+import com.google.cloud.dataflow.sdk.io.PubsubIO;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
+import com.google.cloud.dataflow.sdk.transforms.Count;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.Filter;
+import com.google.cloud.dataflow.sdk.transforms.Flatten;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.Partition;
+import com.google.cloud.dataflow.sdk.transforms.Partition.PartitionFn;
+import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
+import com.google.cloud.dataflow.sdk.transforms.Top;
+import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.SlidingWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PBegin;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PCollectionList;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
+
+import org.joda.time.Duration;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * An example that computes the most popular hash tags
+ * for every prefix, which can be used for auto-completion.
+ *
+ * <p>Concepts: Using the same pipeline in both streaming and batch, combiners,
+ * composite transforms.
+ *
+ * <p>To execute this pipeline using the Dataflow service in batch mode,
+ * specify pipeline configuration:
+ * <pre>{@code
+ * --project=YOUR_PROJECT_ID
+ * --stagingLocation=gs://YOUR_STAGING_DIRECTORY
+ * --runner=DataflowPipelineRunner
+ * --inputFile=gs://path/to/input*.txt
+ * }</pre>
+ *
+ * <p>To execute this pipeline using the Dataflow service in streaming mode,
+ * specify pipeline configuration:
+ * <pre>{@code
+ * --project=YOUR_PROJECT_ID
+ * --stagingLocation=gs://YOUR_STAGING_DIRECTORY
+ * --runner=DataflowPipelineRunner
+ * --inputFile=gs://YOUR_INPUT_DIRECTORY/*.txt
+ * --streaming
+ * }</pre>
+ *
+ * <p>This will update the datastore every 10 seconds based on the last
+ * 30 minutes of data received.
+ */
+public class AutoComplete {
+
+ /**
+ * A PTransform that takes as input a list of tokens and returns
+ * the most common tokens per prefix.
+ */
+ public static class ComputeTopCompletions
+ extends PTransform<PCollection<String>, PCollection<KV<String, List<CompletionCandidate>>>> {
+ private final int candidatesPerPrefix;
+ private final boolean recursive;
+
+ protected ComputeTopCompletions(int candidatesPerPrefix, boolean recursive) {
+ this.candidatesPerPrefix = candidatesPerPrefix;
+ this.recursive = recursive;
+ }
+
+ public static ComputeTopCompletions top(int candidatesPerPrefix, boolean recursive) {
+ return new ComputeTopCompletions(candidatesPerPrefix, recursive);
+ }
+
+ @Override
+ public PCollection<KV<String, List<CompletionCandidate>>> apply(PCollection<String> input) {
+ PCollection<CompletionCandidate> candidates = input
+ // First count how often each token appears.
+ .apply(new Count.PerElement<String>())
+
+ // Map the KV outputs of Count into our own CompletionCandiate class.
+ .apply(ParDo.named("CreateCompletionCandidates").of(
+ new DoFn<KV<String, Long>, CompletionCandidate>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ c.output(new CompletionCandidate(c.element().getKey(), c.element().getValue()));
+ }
+ }));
+
+ // Compute the top via either a flat or recursive algorithm.
+ if (recursive) {
+ return candidates
+ .apply(new ComputeTopRecursive(candidatesPerPrefix, 1))
+ .apply(Flatten.<KV<String, List<CompletionCandidate>>>pCollections());
+ } else {
+ return candidates
+ .apply(new ComputeTopFlat(candidatesPerPrefix, 1));
+ }
+ }
+ }
+
+ /**
+ * Lower latency, but more expensive.
+ */
+ private static class ComputeTopFlat
+ extends PTransform<PCollection<CompletionCandidate>,
+ PCollection<KV<String, List<CompletionCandidate>>>> {
+ private final int candidatesPerPrefix;
+ private final int minPrefix;
+
+ public ComputeTopFlat(int candidatesPerPrefix, int minPrefix) {
+ this.candidatesPerPrefix = candidatesPerPrefix;
+ this.minPrefix = minPrefix;
+ }
+
+ @Override
+ public PCollection<KV<String, List<CompletionCandidate>>> apply(
+ PCollection<CompletionCandidate> input) {
+ return input
+ // For each completion candidate, map it to all prefixes.
+ .apply(ParDo.of(new AllPrefixes(minPrefix)))
+
+ // Find and return the top candiates for each prefix.
+ .apply(Top.<String, CompletionCandidate>largestPerKey(candidatesPerPrefix)
+ .withHotKeyFanout(new HotKeyFanout()));
+ }
+
+ private static class HotKeyFanout implements SerializableFunction<String, Integer> {
+ @Override
+ public Integer apply(String input) {
+ return (int) Math.pow(4, 5 - input.length());
+ }
+ }
+ }
+
+ /**
+ * Cheaper but higher latency.
+ *
+ * <p>Returns two PCollections, the first is top prefixes of size greater
+ * than minPrefix, and the second is top prefixes of size exactly
+ * minPrefix.
+ */
+ private static class ComputeTopRecursive
+ extends PTransform<PCollection<CompletionCandidate>,
+ PCollectionList<KV<String, List<CompletionCandidate>>>> {
+ private final int candidatesPerPrefix;
+ private final int minPrefix;
+
+ public ComputeTopRecursive(int candidatesPerPrefix, int minPrefix) {
+ this.candidatesPerPrefix = candidatesPerPrefix;
+ this.minPrefix = minPrefix;
+ }
+
+ private class KeySizePartitionFn implements PartitionFn<KV<String, List<CompletionCandidate>>> {
+ @Override
+ public int partitionFor(KV<String, List<CompletionCandidate>> elem, int numPartitions) {
+ return elem.getKey().length() > minPrefix ? 0 : 1;
+ }
+ }
+
+ private static class FlattenTops
+ extends DoFn<KV<String, List<CompletionCandidate>>, CompletionCandidate> {
+ @Override
+ public void processElement(ProcessContext c) {
+ for (CompletionCandidate cc : c.element().getValue()) {
+ c.output(cc);
+ }
+ }
+ }
+
+ @Override
+ public PCollectionList<KV<String, List<CompletionCandidate>>> apply(
+ PCollection<CompletionCandidate> input) {
+ if (minPrefix > 10) {
+ // Base case, partitioning to return the output in the expected format.
+ return input
+ .apply(new ComputeTopFlat(candidatesPerPrefix, minPrefix))
+ .apply(Partition.of(2, new KeySizePartitionFn()));
+ } else {
+ // If a candidate is in the top N for prefix a...b, it must also be in the top
+ // N for a...bX for every X, which is typlically a much smaller set to consider.
+ // First, compute the top candidate for prefixes of size at least minPrefix + 1.
+ PCollectionList<KV<String, List<CompletionCandidate>>> larger = input
+ .apply(new ComputeTopRecursive(candidatesPerPrefix, minPrefix + 1));
+ // Consider the top candidates for each prefix of length minPrefix + 1...
+ PCollection<KV<String, List<CompletionCandidate>>> small =
+ PCollectionList
+ .of(larger.get(1).apply(ParDo.of(new FlattenTops())))
+ // ...together with those (previously excluded) candidates of length
+ // exactly minPrefix...
+ .and(input.apply(Filter.byPredicate(
+ new SerializableFunction<CompletionCandidate, Boolean>() {
+ @Override
+ public Boolean apply(CompletionCandidate c) {
+ return c.getValue().length() == minPrefix;
+ }
+ })))
+ .apply("FlattenSmall", Flatten.<CompletionCandidate>pCollections())
+ // ...set the key to be the minPrefix-length prefix...
+ .apply(ParDo.of(new AllPrefixes(minPrefix, minPrefix)))
+ // ...and (re)apply the Top operator to all of them together.
+ .apply(Top.<String, CompletionCandidate>largestPerKey(candidatesPerPrefix));
+
+ PCollection<KV<String, List<CompletionCandidate>>> flattenLarger = larger
+ .apply("FlattenLarge", Flatten.<KV<String, List<CompletionCandidate>>>pCollections());
+
+ return PCollectionList.of(flattenLarger).and(small);
+ }
+ }
+ }
+
+ /**
+ * A DoFn that keys each candidate by all its prefixes.
+ */
+ private static class AllPrefixes
+ extends DoFn<CompletionCandidate, KV<String, CompletionCandidate>> {
+ private final int minPrefix;
+ private final int maxPrefix;
+ public AllPrefixes(int minPrefix) {
+ this(minPrefix, Integer.MAX_VALUE);
+ }
+ public AllPrefixes(int minPrefix, int maxPrefix) {
+ this.minPrefix = minPrefix;
+ this.maxPrefix = maxPrefix;
+ }
+ @Override
+ public void processElement(ProcessContext c) {
+ String word = c.element().value;
+ for (int i = minPrefix; i <= Math.min(word.length(), maxPrefix); i++) {
+ c.output(KV.of(word.substring(0, i), c.element()));
+ }
+ }
+ }
+
+ /**
+ * Class used to store tag-count pairs.
+ */
+ @DefaultCoder(AvroCoder.class)
+ static class CompletionCandidate implements Comparable<CompletionCandidate> {
+ private long count;
+ private String value;
+
+ public CompletionCandidate(String value, long count) {
+ this.value = value;
+ this.count = count;
+ }
+
+ public long getCount() {
+ return count;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ // Empty constructor required for Avro decoding.
+ public CompletionCandidate() {}
+
+ @Override
+ public int compareTo(CompletionCandidate o) {
+ if (this.count < o.count) {
+ return -1;
+ } else if (this.count == o.count) {
+ return this.value.compareTo(o.value);
+ } else {
+ return 1;
+ }
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other instanceof CompletionCandidate) {
+ CompletionCandidate that = (CompletionCandidate) other;
+ return this.count == that.count && this.value.equals(that.value);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return Long.valueOf(count).hashCode() ^ value.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return "CompletionCandidate[" + value + ", " + count + "]";
+ }
+ }
+
+ /**
+ * Takes as input a set of strings, and emits each #hashtag found therein.
+ */
+ static class ExtractHashtags extends DoFn<String, String> {
+ @Override
+ public void processElement(ProcessContext c) {
+ Matcher m = Pattern.compile("#\\S+").matcher(c.element());
+ while (m.find()) {
+ c.output(m.group().substring(1));
+ }
+ }
+ }
+
+ static class FormatForBigquery extends DoFn<KV<String, List<CompletionCandidate>>, TableRow> {
+ @Override
+ public void processElement(ProcessContext c) {
+ List<TableRow> completions = new ArrayList<>();
+ for (CompletionCandidate cc : c.element().getValue()) {
+ completions.add(new TableRow()
+ .set("count", cc.getCount())
+ .set("tag", cc.getValue()));
+ }
+ TableRow row = new TableRow()
+ .set("prefix", c.element().getKey())
+ .set("tags", completions);
+ c.output(row);
+ }
+
+ /**
+ * Defines the BigQuery schema used for the output.
+ */
+ static TableSchema getSchema() {
+ List<TableFieldSchema> tagFields = new ArrayList<>();
+ tagFields.add(new TableFieldSchema().setName("count").setType("INTEGER"));
+ tagFields.add(new TableFieldSchema().setName("tag").setType("STRING"));
+ List<TableFieldSchema> fields = new ArrayList<>();
+ fields.add(new TableFieldSchema().setName("prefix").setType("STRING"));
+ fields.add(new TableFieldSchema()
+ .setName("tags").setType("RECORD").setMode("REPEATED").setFields(tagFields));
+ return new TableSchema().setFields(fields);
+ }
+ }
+
+ /**
+ * Takes as input a the top candidates per prefix, and emits an entity
+ * suitable for writing to Datastore.
+ */
+ static class FormatForDatastore extends DoFn<KV<String, List<CompletionCandidate>>, Entity> {
+ private String kind;
+
+ public FormatForDatastore(String kind) {
+ this.kind = kind;
+ }
+
+ @Override
+ public void processElement(ProcessContext c) {
+ Entity.Builder entityBuilder = Entity.newBuilder();
+ Key key = DatastoreHelper.makeKey(kind, c.element().getKey()).build();
+
+ entityBuilder.setKey(key);
+ List<Value> candidates = new ArrayList<>();
+ for (CompletionCandidate tag : c.element().getValue()) {
+ Entity.Builder tagEntity = Entity.newBuilder();
+ tagEntity.addProperty(
+ DatastoreHelper.makeProperty("tag", DatastoreHelper.makeValue(tag.value)));
+ tagEntity.addProperty(
+ DatastoreHelper.makeProperty("count", DatastoreHelper.makeValue(tag.count)));
+ candidates.add(DatastoreHelper.makeValue(tagEntity).setIndexed(false).build());
+ }
+ entityBuilder.addProperty(
+ DatastoreHelper.makeProperty("candidates", DatastoreHelper.makeValue(candidates)));
+ c.output(entityBuilder.build());
+ }
+ }
+
+ /**
+ * Options supported by this class.
+ *
+ * <p>Inherits standard Dataflow configuration options.
+ */
+ private static interface Options extends ExamplePubsubTopicOptions, ExampleBigQueryTableOptions {
+ @Description("Input text file")
+ String getInputFile();
+ void setInputFile(String value);
+
+ @Description("Whether to use the recursive algorithm")
+ @Default.Boolean(true)
+ Boolean getRecursive();
+ void setRecursive(Boolean value);
+
+ @Description("Dataset entity kind")
+ @Default.String("autocomplete-demo")
+ String getKind();
+ void setKind(String value);
+
+ @Description("Whether output to BigQuery")
+ @Default.Boolean(true)
+ Boolean getOutputToBigQuery();
+ void setOutputToBigQuery(Boolean value);
+
+ @Description("Whether output to Datastore")
+ @Default.Boolean(false)
+ Boolean getOutputToDatastore();
+ void setOutputToDatastore(Boolean value);
+
+ @Description("Datastore output dataset ID, defaults to project ID")
+ String getOutputDataset();
+ void setOutputDataset(String value);
+ }
+
+ public static void main(String[] args) throws IOException {
+ Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+
+ if (options.isStreaming()) {
+ // In order to cancel the pipelines automatically,
+ // {@literal DataflowPipelineRunner} is forced to be used.
+ options.setRunner(DataflowPipelineRunner.class);
+ }
+
+ options.setBigQuerySchema(FormatForBigquery.getSchema());
+ DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options);
+
+ // We support running the same pipeline in either
+ // batch or windowed streaming mode.
+ PTransform<? super PBegin, PCollection<String>> readSource;
+ WindowFn<Object, ?> windowFn;
+ if (options.isStreaming()) {
+ Preconditions.checkArgument(
+ !options.getOutputToDatastore(), "DatastoreIO is not supported in streaming.");
+ dataflowUtils.setupPubsub();
+
+ readSource = PubsubIO.Read.topic(options.getPubsubTopic());
+ windowFn = SlidingWindows.of(Duration.standardMinutes(30)).every(Duration.standardSeconds(5));
+ } else {
+ readSource = TextIO.Read.from(options.getInputFile());
+ windowFn = new GlobalWindows();
+ }
+
+ // Create the pipeline.
+ Pipeline p = Pipeline.create(options);
+ PCollection<KV<String, List<CompletionCandidate>>> toWrite = p
+ .apply(readSource)
+ .apply(ParDo.of(new ExtractHashtags()))
+ .apply(Window.<String>into(windowFn))
+ .apply(ComputeTopCompletions.top(10, options.getRecursive()));
+
+ if (options.getOutputToDatastore()) {
+ toWrite
+ .apply(ParDo.named("FormatForDatastore").of(new FormatForDatastore(options.getKind())))
+ .apply(DatastoreIO.writeTo(MoreObjects.firstNonNull(
+ options.getOutputDataset(), options.getProject())));
+ }
+ if (options.getOutputToBigQuery()) {
+ dataflowUtils.setupBigQueryTable();
+
+ TableReference tableRef = new TableReference();
+ tableRef.setProjectId(options.getProject());
+ tableRef.setDatasetId(options.getBigQueryDataset());
+ tableRef.setTableId(options.getBigQueryTable());
+
+ toWrite
+ .apply(ParDo.of(new FormatForBigquery()))
+ .apply(BigQueryIO.Write
+ .to(tableRef)
+ .withSchema(FormatForBigquery.getSchema())
+ .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
+ .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
+ }
+
+ // Run the pipeline.
+ PipelineResult result = p.run();
+
+ if (options.isStreaming() && !options.getInputFile().isEmpty()) {
+ // Inject the data into the Pub/Sub topic with a Dataflow batch pipeline.
+ dataflowUtils.runInjectorPipeline(options.getInputFile(), options.getPubsubTopic());
+ }
+
+ // dataflowUtils will try to cancel the pipeline and the injector before the program exists.
+ dataflowUtils.waitToFinish(result);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/java/src/main/java/com/google/cloud/dataflow/examples/complete/README.md
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/com/google/cloud/dataflow/examples/complete/README.md b/examples/java/src/main/java/com/google/cloud/dataflow/examples/complete/README.md
new file mode 100644
index 0000000..5fba154
--- /dev/null
+++ b/examples/java/src/main/java/com/google/cloud/dataflow/examples/complete/README.md
@@ -0,0 +1,44 @@
+
+# "Complete" Examples
+
+This directory contains end-to-end example pipelines that perform complex data processing tasks. They include:
+
+<ul>
+ <li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/complete/AutoComplete.java">AutoComplete</a>
+ — An example that computes the most popular hash tags for every
+ prefix, which can be used for auto-completion. Demonstrates how to use the
+ same pipeline in both streaming and batch, combiners, and composite
+ transforms.</li>
+ <li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/complete/StreamingWordExtract.java">StreamingWordExtract</a>
+ — A streaming pipeline example that inputs lines of text from a Cloud
+ Pub/Sub topic, splits each line into individual words, capitalizes those
+ words, and writes the output to a BigQuery table.
+ </li>
+ <li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/complete/TfIdf.java">TfIdf</a>
+ — An example that computes a basic TF-IDF search table for a directory or
+ Cloud Storage prefix. Demonstrates joining data, side inputs, and logging.
+ </li>
+ <li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/complete/TopWikipediaSessions.java">TopWikipediaSessions</a>
+ — An example that reads Wikipedia edit data from Cloud Storage and
+ computes the user with the longest string of edits separated by no more than
+ an hour within each month. Demonstrates using Cloud Dataflow
+ <code>Windowing</code> to perform time-based aggregations of data.
+ </li>
+ <li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/complete/TrafficMaxLaneFlow.java">TrafficMaxLaneFlow</a>
+ — A streaming Cloud Dataflow example using BigQuery output in the
+ <code>traffic sensor</code> domain. Demonstrates the Cloud Dataflow streaming
+ runner, sliding windows, Cloud Pub/Sub topic ingestion, the use of the
+ <code>AvroCoder</code> to encode a custom class, and custom
+ <code>Combine</code> transforms.
+ </li>
+ <li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/complete/TrafficRoutes.java">TrafficRoutes</a>
+ — A streaming Cloud Dataflow example using BigQuery output in the
+ <code>traffic sensor</code> domain. Demonstrates the Cloud Dataflow streaming
+ runner, <code>GroupByKey</code>, keyed state, sliding windows, and Cloud
+ Pub/Sub topic ingestion.
+ </li>
+ </ul>
+
+See the [documentation](https://cloud.google.com/dataflow/getting-started) and the [Examples
+README](../../../../../../../../../README.md) for
+information about how to run these examples.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/java/src/main/java/com/google/cloud/dataflow/examples/complete/StreamingWordExtract.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/com/google/cloud/dataflow/examples/complete/StreamingWordExtract.java b/examples/java/src/main/java/com/google/cloud/dataflow/examples/complete/StreamingWordExtract.java
new file mode 100644
index 0000000..99c5249
--- /dev/null
+++ b/examples/java/src/main/java/com/google/cloud/dataflow/examples/complete/StreamingWordExtract.java
@@ -0,0 +1,163 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples.complete;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.cloud.dataflow.examples.common.DataflowExampleUtils;
+import com.google.cloud.dataflow.examples.common.ExampleBigQueryTableOptions;
+import com.google.cloud.dataflow.examples.common.ExamplePubsubTopicOptions;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.PipelineResult;
+import com.google.cloud.dataflow.sdk.io.BigQueryIO;
+import com.google.cloud.dataflow.sdk.io.PubsubIO;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+/**
+ * A streaming Dataflow Example using BigQuery output.
+ *
+ * <p>This pipeline example reads lines of text from a PubSub topic, splits each line
+ * into individual words, capitalizes those words, and writes the output to
+ * a BigQuery table.
+ *
+ * <p>By default, the example will run a separate pipeline to inject the data from the default
+ * {@literal --inputFile} to the Pub/Sub {@literal --pubsubTopic}. It will make it available for
+ * the streaming pipeline to process. You may override the default {@literal --inputFile} with the
+ * file of your choosing. You may also set {@literal --inputFile} to an empty string, which will
+ * disable the automatic Pub/Sub injection, and allow you to use separate tool to control the input
+ * to this example.
+ *
+ * <p>The example is configured to use the default Pub/Sub topic and the default BigQuery table
+ * from the example common package (there are no defaults for a general Dataflow pipeline).
+ * You can override them by using the {@literal --pubsubTopic}, {@literal --bigQueryDataset}, and
+ * {@literal --bigQueryTable} options. If the Pub/Sub topic or the BigQuery table do not exist,
+ * the example will try to create them.
+ *
+ * <p>The example will try to cancel the pipelines on the signal to terminate the process (CTRL-C)
+ * and then exits.
+ */
+public class StreamingWordExtract {
+
+ /** A DoFn that tokenizes lines of text into individual words. */
+ static class ExtractWords extends DoFn<String, String> {
+ @Override
+ public void processElement(ProcessContext c) {
+ String[] words = c.element().split("[^a-zA-Z']+");
+ for (String word : words) {
+ if (!word.isEmpty()) {
+ c.output(word);
+ }
+ }
+ }
+ }
+
+ /** A DoFn that uppercases a word. */
+ static class Uppercase extends DoFn<String, String> {
+ @Override
+ public void processElement(ProcessContext c) {
+ c.output(c.element().toUpperCase());
+ }
+ }
+
+ /**
+ * Converts strings into BigQuery rows.
+ */
+ static class StringToRowConverter extends DoFn<String, TableRow> {
+ /**
+ * In this example, put the whole string into single BigQuery field.
+ */
+ @Override
+ public void processElement(ProcessContext c) {
+ c.output(new TableRow().set("string_field", c.element()));
+ }
+
+ static TableSchema getSchema() {
+ return new TableSchema().setFields(new ArrayList<TableFieldSchema>() {
+ // Compose the list of TableFieldSchema from tableSchema.
+ {
+ add(new TableFieldSchema().setName("string_field").setType("STRING"));
+ }
+ });
+ }
+ }
+
+ /**
+ * Options supported by {@link StreamingWordExtract}.
+ *
+ * <p>Inherits standard configuration options.
+ */
+ private interface StreamingWordExtractOptions
+ extends ExamplePubsubTopicOptions, ExampleBigQueryTableOptions {
+ @Description("Input file to inject to Pub/Sub topic")
+ @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
+ String getInputFile();
+ void setInputFile(String value);
+ }
+
+ /**
+ * Sets up and starts streaming pipeline.
+ *
+ * @throws IOException if there is a problem setting up resources
+ */
+ public static void main(String[] args) throws IOException {
+ StreamingWordExtractOptions options = PipelineOptionsFactory.fromArgs(args)
+ .withValidation()
+ .as(StreamingWordExtractOptions.class);
+ options.setStreaming(true);
+ // In order to cancel the pipelines automatically,
+ // {@literal DataflowPipelineRunner} is forced to be used.
+ options.setRunner(DataflowPipelineRunner.class);
+
+ options.setBigQuerySchema(StringToRowConverter.getSchema());
+ DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options);
+ dataflowUtils.setup();
+
+ Pipeline pipeline = Pipeline.create(options);
+
+ String tableSpec = new StringBuilder()
+ .append(options.getProject()).append(":")
+ .append(options.getBigQueryDataset()).append(".")
+ .append(options.getBigQueryTable())
+ .toString();
+ pipeline
+ .apply(PubsubIO.Read.topic(options.getPubsubTopic()))
+ .apply(ParDo.of(new ExtractWords()))
+ .apply(ParDo.of(new Uppercase()))
+ .apply(ParDo.of(new StringToRowConverter()))
+ .apply(BigQueryIO.Write.to(tableSpec)
+ .withSchema(StringToRowConverter.getSchema()));
+
+ PipelineResult result = pipeline.run();
+
+ if (!options.getInputFile().isEmpty()) {
+ // Inject the data into the Pub/Sub topic with a Dataflow batch pipeline.
+ dataflowUtils.runInjectorPipeline(options.getInputFile(), options.getPubsubTopic());
+ }
+
+ // dataflowUtils will try to cancel the pipeline and the injector before the program exists.
+ dataflowUtils.waitToFinish(result);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/java/src/main/java/com/google/cloud/dataflow/examples/complete/TfIdf.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/com/google/cloud/dataflow/examples/complete/TfIdf.java b/examples/java/src/main/java/com/google/cloud/dataflow/examples/complete/TfIdf.java
new file mode 100644
index 0000000..65ac753
--- /dev/null
+++ b/examples/java/src/main/java/com/google/cloud/dataflow/examples/complete/TfIdf.java
@@ -0,0 +1,431 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples.complete;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.KvCoder;
+import com.google.cloud.dataflow.sdk.coders.StringDelegateCoder;
+import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.GcsOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.options.Validation;
+import com.google.cloud.dataflow.sdk.transforms.Count;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.Flatten;
+import com.google.cloud.dataflow.sdk.transforms.Keys;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates;
+import com.google.cloud.dataflow.sdk.transforms.Values;
+import com.google.cloud.dataflow.sdk.transforms.View;
+import com.google.cloud.dataflow.sdk.transforms.WithKeys;
+import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResult;
+import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey;
+import com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple;
+import com.google.cloud.dataflow.sdk.util.GcsUtil;
+import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PCollectionList;
+import com.google.cloud.dataflow.sdk.values.PCollectionView;
+import com.google.cloud.dataflow.sdk.values.PDone;
+import com.google.cloud.dataflow.sdk.values.PInput;
+import com.google.cloud.dataflow.sdk.values.TupleTag;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * An example that computes a basic TF-IDF search table for a directory or GCS prefix.
+ *
+ * <p>Concepts: joining data; side inputs; logging
+ *
+ * <p>To execute this pipeline locally, specify general pipeline configuration:
+ * <pre>{@code
+ * --project=YOUR_PROJECT_ID
+ * }</pre>
+ * and a local output file or output prefix on GCS:
+ * <pre>{@code
+ * --output=[YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PREFIX]
+ * }</pre>
+ *
+ * <p>To execute this pipeline using the Dataflow service, specify pipeline configuration:
+ * <pre>{@code
+ * --project=YOUR_PROJECT_ID
+ * --stagingLocation=gs://YOUR_STAGING_DIRECTORY
+ * --runner=BlockingDataflowPipelineRunner
+ * and an output prefix on GCS:
+ * --output=gs://YOUR_OUTPUT_PREFIX
+ * }</pre>
+ *
+ * <p>The default input is {@code gs://dataflow-samples/shakespeare/} and can be overridden with
+ * {@code --input}.
+ */
+public class TfIdf {
+ /**
+ * Options supported by {@link TfIdf}.
+ *
+ * <p>Inherits standard configuration options.
+ */
+ private static interface Options extends PipelineOptions {
+ @Description("Path to the directory or GCS prefix containing files to read from")
+ @Default.String("gs://dataflow-samples/shakespeare/")
+ String getInput();
+ void setInput(String value);
+
+ @Description("Prefix of output URI to write to")
+ @Validation.Required
+ String getOutput();
+ void setOutput(String value);
+ }
+
+ /**
+ * Lists documents contained beneath the {@code options.input} prefix/directory.
+ */
+ public static Set<URI> listInputDocuments(Options options)
+ throws URISyntaxException, IOException {
+ URI baseUri = new URI(options.getInput());
+
+ // List all documents in the directory or GCS prefix.
+ URI absoluteUri;
+ if (baseUri.getScheme() != null) {
+ absoluteUri = baseUri;
+ } else {
+ absoluteUri = new URI(
+ "file",
+ baseUri.getAuthority(),
+ baseUri.getPath(),
+ baseUri.getQuery(),
+ baseUri.getFragment());
+ }
+
+ Set<URI> uris = new HashSet<>();
+ if (absoluteUri.getScheme().equals("file")) {
+ File directory = new File(absoluteUri);
+ for (String entry : directory.list()) {
+ File path = new File(directory, entry);
+ uris.add(path.toURI());
+ }
+ } else if (absoluteUri.getScheme().equals("gs")) {
+ GcsUtil gcsUtil = options.as(GcsOptions.class).getGcsUtil();
+ URI gcsUriGlob = new URI(
+ absoluteUri.getScheme(),
+ absoluteUri.getAuthority(),
+ absoluteUri.getPath() + "*",
+ absoluteUri.getQuery(),
+ absoluteUri.getFragment());
+ for (GcsPath entry : gcsUtil.expand(GcsPath.fromUri(gcsUriGlob))) {
+ uris.add(entry.toUri());
+ }
+ }
+
+ return uris;
+ }
+
+ /**
+ * Reads the documents at the provided uris and returns all lines
+ * from the documents tagged with which document they are from.
+ */
+ public static class ReadDocuments
+ extends PTransform<PInput, PCollection<KV<URI, String>>> {
+ private Iterable<URI> uris;
+
+ public ReadDocuments(Iterable<URI> uris) {
+ this.uris = uris;
+ }
+
+ @Override
+ public Coder<?> getDefaultOutputCoder() {
+ return KvCoder.of(StringDelegateCoder.of(URI.class), StringUtf8Coder.of());
+ }
+
+ @Override
+ public PCollection<KV<URI, String>> apply(PInput input) {
+ Pipeline pipeline = input.getPipeline();
+
+ // Create one TextIO.Read transform for each document
+ // and add its output to a PCollectionList
+ PCollectionList<KV<URI, String>> urisToLines =
+ PCollectionList.empty(pipeline);
+
+ // TextIO.Read supports:
+ // - file: URIs and paths locally
+ // - gs: URIs on the service
+ for (final URI uri : uris) {
+ String uriString;
+ if (uri.getScheme().equals("file")) {
+ uriString = new File(uri).getPath();
+ } else {
+ uriString = uri.toString();
+ }
+
+ PCollection<KV<URI, String>> oneUriToLines = pipeline
+ .apply(TextIO.Read.from(uriString)
+ .named("TextIO.Read(" + uriString + ")"))
+ .apply("WithKeys(" + uriString + ")", WithKeys.<URI, String>of(uri));
+
+ urisToLines = urisToLines.and(oneUriToLines);
+ }
+
+ return urisToLines.apply(Flatten.<KV<URI, String>>pCollections());
+ }
+ }
+
+ /**
+ * A transform containing a basic TF-IDF pipeline. The input consists of KV objects
+ * where the key is the document's URI and the value is a piece
+ * of the document's content. The output is mapping from terms to
+ * scores for each document URI.
+ */
+ public static class ComputeTfIdf
+ extends PTransform<PCollection<KV<URI, String>>, PCollection<KV<String, KV<URI, Double>>>> {
+ public ComputeTfIdf() { }
+
+ @Override
+ public PCollection<KV<String, KV<URI, Double>>> apply(
+ PCollection<KV<URI, String>> uriToContent) {
+
+ // Compute the total number of documents, and
+ // prepare this singleton PCollectionView for
+ // use as a side input.
+ final PCollectionView<Long> totalDocuments =
+ uriToContent
+ .apply("GetURIs", Keys.<URI>create())
+ .apply("RemoveDuplicateDocs", RemoveDuplicates.<URI>create())
+ .apply(Count.<URI>globally())
+ .apply(View.<Long>asSingleton());
+
+ // Create a collection of pairs mapping a URI to each
+ // of the words in the document associated with that that URI.
+ PCollection<KV<URI, String>> uriToWords = uriToContent
+ .apply(ParDo.named("SplitWords").of(
+ new DoFn<KV<URI, String>, KV<URI, String>>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ URI uri = c.element().getKey();
+ String line = c.element().getValue();
+ for (String word : line.split("\\W+")) {
+ // Log INFO messages when the word “love” is found.
+ if (word.toLowerCase().equals("love")) {
+ LOG.info("Found {}", word.toLowerCase());
+ }
+
+ if (!word.isEmpty()) {
+ c.output(KV.of(uri, word.toLowerCase()));
+ }
+ }
+ }
+ }));
+
+ // Compute a mapping from each word to the total
+ // number of documents in which it appears.
+ PCollection<KV<String, Long>> wordToDocCount = uriToWords
+ .apply("RemoveDuplicateWords", RemoveDuplicates.<KV<URI, String>>create())
+ .apply(Values.<String>create())
+ .apply("CountDocs", Count.<String>perElement());
+
+ // Compute a mapping from each URI to the total
+ // number of words in the document associated with that URI.
+ PCollection<KV<URI, Long>> uriToWordTotal = uriToWords
+ .apply("GetURIs2", Keys.<URI>create())
+ .apply("CountWords", Count.<URI>perElement());
+
+ // Count, for each (URI, word) pair, the number of
+ // occurrences of that word in the document associated
+ // with the URI.
+ PCollection<KV<KV<URI, String>, Long>> uriAndWordToCount = uriToWords
+ .apply("CountWordDocPairs", Count.<KV<URI, String>>perElement());
+
+ // Adjust the above collection to a mapping from
+ // (URI, word) pairs to counts into an isomorphic mapping
+ // from URI to (word, count) pairs, to prepare for a join
+ // by the URI key.
+ PCollection<KV<URI, KV<String, Long>>> uriToWordAndCount = uriAndWordToCount
+ .apply(ParDo.named("ShiftKeys").of(
+ new DoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ URI uri = c.element().getKey().getKey();
+ String word = c.element().getKey().getValue();
+ Long occurrences = c.element().getValue();
+ c.output(KV.of(uri, KV.of(word, occurrences)));
+ }
+ }));
+
+ // Prepare to join the mapping of URI to (word, count) pairs with
+ // the mapping of URI to total word counts, by associating
+ // each of the input PCollection<KV<URI, ...>> with
+ // a tuple tag. Each input must have the same key type, URI
+ // in this case. The type parameter of the tuple tag matches
+ // the types of the values for each collection.
+ final TupleTag<Long> wordTotalsTag = new TupleTag<Long>();
+ final TupleTag<KV<String, Long>> wordCountsTag = new TupleTag<KV<String, Long>>();
+ KeyedPCollectionTuple<URI> coGbkInput = KeyedPCollectionTuple
+ .of(wordTotalsTag, uriToWordTotal)
+ .and(wordCountsTag, uriToWordAndCount);
+
+ // Perform a CoGroupByKey (a sort of pre-join) on the prepared
+ // inputs. This yields a mapping from URI to a CoGbkResult
+ // (CoGroupByKey Result). The CoGbkResult is a mapping
+ // from the above tuple tags to the values in each input
+ // associated with a particular URI. In this case, each
+ // KV<URI, CoGbkResult> group a URI with the total number of
+ // words in that document as well as all the (word, count)
+ // pairs for particular words.
+ PCollection<KV<URI, CoGbkResult>> uriToWordAndCountAndTotal = coGbkInput
+ .apply("CoGroupByUri", CoGroupByKey.<URI>create());
+
+ // Compute a mapping from each word to a (URI, term frequency)
+ // pair for each URI. A word's term frequency for a document
+ // is simply the number of times that word occurs in the document
+ // divided by the total number of words in the document.
+ PCollection<KV<String, KV<URI, Double>>> wordToUriAndTf = uriToWordAndCountAndTotal
+ .apply(ParDo.named("ComputeTermFrequencies").of(
+ new DoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ URI uri = c.element().getKey();
+ Long wordTotal = c.element().getValue().getOnly(wordTotalsTag);
+
+ for (KV<String, Long> wordAndCount
+ : c.element().getValue().getAll(wordCountsTag)) {
+ String word = wordAndCount.getKey();
+ Long wordCount = wordAndCount.getValue();
+ Double termFrequency = wordCount.doubleValue() / wordTotal.doubleValue();
+ c.output(KV.of(word, KV.of(uri, termFrequency)));
+ }
+ }
+ }));
+
+ // Compute a mapping from each word to its document frequency.
+ // A word's document frequency in a corpus is the number of
+ // documents in which the word appears divided by the total
+ // number of documents in the corpus. Note how the total number of
+ // documents is passed as a side input; the same value is
+ // presented to each invocation of the DoFn.
+ PCollection<KV<String, Double>> wordToDf = wordToDocCount
+ .apply(ParDo
+ .named("ComputeDocFrequencies")
+ .withSideInputs(totalDocuments)
+ .of(new DoFn<KV<String, Long>, KV<String, Double>>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ String word = c.element().getKey();
+ Long documentCount = c.element().getValue();
+ Long documentTotal = c.sideInput(totalDocuments);
+ Double documentFrequency = documentCount.doubleValue()
+ / documentTotal.doubleValue();
+
+ c.output(KV.of(word, documentFrequency));
+ }
+ }));
+
+ // Join the term frequency and document frequency
+ // collections, each keyed on the word.
+ final TupleTag<KV<URI, Double>> tfTag = new TupleTag<KV<URI, Double>>();
+ final TupleTag<Double> dfTag = new TupleTag<Double>();
+ PCollection<KV<String, CoGbkResult>> wordToUriAndTfAndDf = KeyedPCollectionTuple
+ .of(tfTag, wordToUriAndTf)
+ .and(dfTag, wordToDf)
+ .apply(CoGroupByKey.<String>create());
+
+ // Compute a mapping from each word to a (URI, TF-IDF) score
+ // for each URI. There are a variety of definitions of TF-IDF
+ // ("term frequency - inverse document frequency") score;
+ // here we use a basic version that is the term frequency
+ // divided by the log of the document frequency.
+ PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf = wordToUriAndTfAndDf
+ .apply(ParDo.named("ComputeTfIdf").of(
+ new DoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ String word = c.element().getKey();
+ Double df = c.element().getValue().getOnly(dfTag);
+
+ for (KV<URI, Double> uriAndTf : c.element().getValue().getAll(tfTag)) {
+ URI uri = uriAndTf.getKey();
+ Double tf = uriAndTf.getValue();
+ Double tfIdf = tf * Math.log(1 / df);
+ c.output(KV.of(word, KV.of(uri, tfIdf)));
+ }
+ }
+ }));
+
+ return wordToUriAndTfIdf;
+ }
+
+ // Instantiate Logger.
+ // It is suggested that the user specify the class name of the containing class
+ // (in this case ComputeTfIdf).
+ private static final Logger LOG = LoggerFactory.getLogger(ComputeTfIdf.class);
+ }
+
+ /**
+ * A {@link PTransform} to write, in CSV format, a mapping from term and URI
+ * to score.
+ */
+ public static class WriteTfIdf
+ extends PTransform<PCollection<KV<String, KV<URI, Double>>>, PDone> {
+ private String output;
+
+ public WriteTfIdf(String output) {
+ this.output = output;
+ }
+
+ @Override
+ public PDone apply(PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf) {
+ return wordToUriAndTfIdf
+ .apply(ParDo.named("Format").of(new DoFn<KV<String, KV<URI, Double>>, String>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ c.output(String.format("%s,\t%s,\t%f",
+ c.element().getKey(),
+ c.element().getValue().getKey(),
+ c.element().getValue().getValue()));
+ }
+ }))
+ .apply(TextIO.Write
+ .to(output)
+ .withSuffix(".csv"));
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+ Pipeline pipeline = Pipeline.create(options);
+ pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class));
+
+ pipeline
+ .apply(new ReadDocuments(listInputDocuments(options)))
+ .apply(new ComputeTfIdf())
+ .apply(new WriteTfIdf(options.getOutput()));
+
+ pipeline.run();
+ }
+}