You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/24 03:48:20 UTC
[56/67] incubator-beam git commit: Directory reorganization
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/MaxPerKeyExamplesTest.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/MaxPerKeyExamplesTest.java b/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/MaxPerKeyExamplesTest.java
new file mode 100644
index 0000000..3deff2a
--- /dev/null
+++ b/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/MaxPerKeyExamplesTest.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples.cookbook;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.cloud.dataflow.examples.cookbook.MaxPerKeyExamples.ExtractTempFn;
+import com.google.cloud.dataflow.examples.cookbook.MaxPerKeyExamples.FormatMaxesFn;
+import com.google.cloud.dataflow.sdk.transforms.DoFnTester;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.common.collect.ImmutableList;
+
+import org.hamcrest.CoreMatchers;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.List;
+
+/** Unit tests for {@link MaxPerKeyExamples}. */
+@RunWith(JUnit4.class)
+public class MaxPerKeyExamplesTest {
+
+ private static final TableRow row1 = new TableRow()
+ .set("month", "6").set("day", "21")
+ .set("year", "2014").set("mean_temp", "85.3")
+ .set("tornado", true);
+ private static final TableRow row2 = new TableRow()
+ .set("month", "7").set("day", "20")
+ .set("year", "2014").set("mean_temp", "75.4")
+ .set("tornado", false);
+ private static final TableRow row3 = new TableRow()
+ .set("month", "6").set("day", "18")
+ .set("year", "2014").set("mean_temp", "45.3")
+ .set("tornado", true);
+ private static final List<TableRow> TEST_ROWS = ImmutableList.of(row1, row2, row3);
+
+ private static final KV<Integer, Double> kv1 = KV.of(6, 85.3);
+ private static final KV<Integer, Double> kv2 = KV.of(6, 45.3);
+ private static final KV<Integer, Double> kv3 = KV.of(7, 75.4);
+
+ private static final List<KV<Integer, Double>> TEST_KVS = ImmutableList.of(kv1, kv2, kv3);
+
+ private static final TableRow resultRow1 = new TableRow()
+ .set("month", 6)
+ .set("max_mean_temp", 85.3);
+ private static final TableRow resultRow2 = new TableRow()
+ .set("month", 7)
+ .set("max_mean_temp", 75.4);
+
+
+ @Test
+ public void testExtractTempFn() {
+ DoFnTester<TableRow, KV<Integer, Double>> extractTempFn =
+ DoFnTester.of(new ExtractTempFn());
+ List<KV<Integer, Double>> results = extractTempFn.processBatch(TEST_ROWS);
+ Assert.assertThat(results, CoreMatchers.hasItem(kv1));
+ Assert.assertThat(results, CoreMatchers.hasItem(kv2));
+ Assert.assertThat(results, CoreMatchers.hasItem(kv3));
+ }
+
+ @Test
+ public void testFormatMaxesFn() {
+ DoFnTester<KV<Integer, Double>, TableRow> formatMaxesFnFn =
+ DoFnTester.of(new FormatMaxesFn());
+ List<TableRow> results = formatMaxesFnFn.processBatch(TEST_KVS);
+ Assert.assertThat(results, CoreMatchers.hasItem(resultRow1));
+ Assert.assertThat(results, CoreMatchers.hasItem(resultRow2));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/TriggerExampleTest.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/TriggerExampleTest.java b/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/TriggerExampleTest.java
new file mode 100644
index 0000000..209ea52
--- /dev/null
+++ b/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/TriggerExampleTest.java
@@ -0,0 +1,139 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples.cookbook;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.cloud.dataflow.examples.cookbook.TriggerExample.ExtractFlowInfo;
+import com.google.cloud.dataflow.examples.cookbook.TriggerExample.TotalFlow;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
+import com.google.cloud.dataflow.sdk.testing.RunnableOnService;
+import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.DoFnTester;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.TimestampedValue;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Unit Tests for {@link TriggerExample}.
+ * The results generated by triggers are by definition non-deterministic and hence hard to test.
+ * The unit test does not test all aspects of the example.
+ */
+@RunWith(JUnit4.class)
+public class TriggerExampleTest {
+
+ private static final String[] INPUT =
+ {"01/01/2010 00:00:00,1108302,94,E,ML,36,100,29,0.0065,66,9,1,0.001,74.8,1,9,3,0.0028,71,1,9,"
+ + "12,0.0099,67.4,1,9,13,0.0121,99.0,1,,,,,0,,,,,0,,,,,0,,,,,0", "01/01/2010 00:00:00,"
+ + "1100333,5,N,FR,9,0,39,,,9,,,,0,,,,,0,,,,,0,,,,,0,,,,,0,,,,,0,,,,,0,,,,"};
+
+ private static final List<TimestampedValue<String>> TIME_STAMPED_INPUT = Arrays.asList(
+ TimestampedValue.of("01/01/2010 00:00:00,1108302,5,W,ML,36,100,30,0.0065,66,9,1,0.001,"
+ + "74.8,1,9,3,0.0028,71,1,9,12,0.0099,87.4,1,9,13,0.0121,99.0,1,,,,,0,,,,,0,,,,,0,,,"
+ + ",,0", new Instant(60000)),
+ TimestampedValue.of("01/01/2010 00:00:00,1108302,110,E,ML,36,100,40,0.0065,66,9,1,0.001,"
+ + "74.8,1,9,3,0.0028,71,1,9,12,0.0099,67.4,1,9,13,0.0121,99.0,1,,,,,0,,,,,0,,,,,0,,,"
+ + ",,0", new Instant(1)),
+ TimestampedValue.of("01/01/2010 00:00:00,1108302,110,E,ML,36,100,50,0.0065,66,9,1,"
+ + "0.001,74.8,1,9,3,0.0028,71,1,9,12,0.0099,97.4,1,9,13,0.0121,50.0,1,,,,,0,,,,,0"
+ + ",,,,,0,,,,,0", new Instant(1)));
+
+ private static final TableRow OUT_ROW_1 = new TableRow()
+ .set("trigger_type", "default")
+ .set("freeway", "5").set("total_flow", 30)
+ .set("number_of_records", 1)
+ .set("isFirst", true).set("isLast", true)
+ .set("timing", "ON_TIME")
+ .set("window", "[1970-01-01T00:01:00.000Z..1970-01-01T00:02:00.000Z)");
+
+ private static final TableRow OUT_ROW_2 = new TableRow()
+ .set("trigger_type", "default")
+ .set("freeway", "110").set("total_flow", 90)
+ .set("number_of_records", 2)
+ .set("isFirst", true).set("isLast", true)
+ .set("timing", "ON_TIME")
+ .set("window", "[1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z)");
+
+ @Test
+ public void testExtractTotalFlow() {
+ DoFnTester<String, KV<String, Integer>> extractFlowInfow = DoFnTester
+ .of(new ExtractFlowInfo());
+
+ List<KV<String, Integer>> results = extractFlowInfow.processBatch(INPUT);
+ Assert.assertEquals(results.size(), 1);
+ Assert.assertEquals(results.get(0).getKey(), "94");
+ Assert.assertEquals(results.get(0).getValue(), new Integer(29));
+
+ List<KV<String, Integer>> output = extractFlowInfow.processBatch("");
+ Assert.assertEquals(output.size(), 0);
+ }
+
+ @Test
+ @Category(RunnableOnService.class)
+ public void testTotalFlow () {
+ Pipeline pipeline = TestPipeline.create();
+ PCollection<KV<String, Integer>> flow = pipeline
+ .apply(Create.timestamped(TIME_STAMPED_INPUT))
+ .apply(ParDo.of(new ExtractFlowInfo()));
+
+ PCollection<TableRow> totalFlow = flow
+ .apply(Window.<KV<String, Integer>>into(FixedWindows.of(Duration.standardMinutes(1))))
+ .apply(new TotalFlow("default"));
+
+ PCollection<TableRow> results = totalFlow.apply(ParDo.of(new FormatResults()));
+
+
+ DataflowAssert.that(results).containsInAnyOrder(OUT_ROW_1, OUT_ROW_2);
+ pipeline.run();
+
+ }
+
+ static class FormatResults extends DoFn<TableRow, TableRow> {
+ @Override
+ public void processElement(ProcessContext c) throws Exception {
+ TableRow element = c.element();
+ TableRow row = new TableRow()
+ .set("trigger_type", element.get("trigger_type"))
+ .set("freeway", element.get("freeway"))
+ .set("total_flow", element.get("total_flow"))
+ .set("number_of_records", element.get("number_of_records"))
+ .set("isFirst", element.get("isFirst"))
+ .set("isLast", element.get("isLast"))
+ .set("timing", element.get("timing"))
+ .set("window", element.get("window"));
+ c.output(row);
+ }
+ }
+}
+
+
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/pom.xml b/examples/pom.xml
deleted file mode 100644
index 8b17dfe..0000000
--- a/examples/pom.xml
+++ /dev/null
@@ -1,394 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.beam</groupId>
- <artifactId>parent</artifactId>
- <version>0.1.0-incubating-SNAPSHOT</version>
- <relativePath>../pom.xml</relativePath>
- </parent>
-
- <artifactId>java-examples-all</artifactId>
- <name>Apache Beam :: Examples :: Java All</name>
- <description>Apache Beam SDK provides a simple, Java-based
- interface for processing virtually any size data. This
- artifact includes all Apache Beam Java SDK examples.</description>
-
- <packaging>jar</packaging>
-
- <profiles>
- <profile>
- <id>DataflowPipelineTests</id>
- <properties>
- <runIntegrationTestOnService>true</runIntegrationTestOnService>
- <testGroups>com.google.cloud.dataflow.sdk.testing.RunnableOnService</testGroups>
- <testParallelValue>both</testParallelValue>
- </properties>
- </profile>
- </profiles>
-
- <build>
- <plugins>
- <plugin>
- <artifactId>maven-compiler-plugin</artifactId>
- </plugin>
-
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-dependency-plugin</artifactId>
- </plugin>
-
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-checkstyle-plugin</artifactId>
- <version>2.12</version>
- <dependencies>
- <dependency>
- <groupId>com.puppycrawl.tools</groupId>
- <artifactId>checkstyle</artifactId>
- <version>6.6</version>
- </dependency>
- </dependencies>
- <configuration>
- <configLocation>../checkstyle.xml</configLocation>
- <consoleOutput>true</consoleOutput>
- <failOnViolation>true</failOnViolation>
- <includeTestSourceDirectory>true</includeTestSourceDirectory>
- <includeResources>false</includeResources>
- </configuration>
- <executions>
- <execution>
- <goals>
- <goal>check</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
-
- <!-- Source plugin for generating source and test-source JARs. -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-source-plugin</artifactId>
- <version>2.4</version>
- <executions>
- <execution>
- <id>attach-sources</id>
- <phase>compile</phase>
- <goals>
- <goal>jar</goal>
- </goals>
- </execution>
- <execution>
- <id>attach-test-sources</id>
- <phase>test-compile</phase>
- <goals>
- <goal>test-jar</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-javadoc-plugin</artifactId>
- <configuration>
- <windowtitle>Apache Beam Examples</windowtitle>
- <doctitle>Apache Beam Examples</doctitle>
-
- <subpackages>com.google.cloud.dataflow.examples</subpackages>
- <additionalparam>-exclude com.google.cloud.dataflow.sdk.runners.worker:com.google.cloud.dataflow.sdk.runners.dataflow:com.google.cloud.dataflow.sdk.util ${dataflow.javadoc_opts}</additionalparam>
- <use>false</use>
- <quiet>true</quiet>
- <bottom><![CDATA[<br>]]></bottom>
-
- <offlineLinks>
- <!-- The Dataflow SDK docs -->
- <offlineLink>
- <url>https://cloud.google.com/dataflow/java-sdk/JavaDoc/</url>
- <location>${basedir}/../javadoc/dataflow-sdk-docs</location>
- </offlineLink>
- <!-- Other dependencies -->
- <offlineLink>
- <url>https://developers.google.com/api-client-library/java/google-api-java-client/reference/1.20.0/</url>
- <location>${basedir}/../javadoc/apiclient-docs</location>
- </offlineLink>
- <offlineLink>
- <url>http://avro.apache.org/docs/1.7.7/api/java/</url>
- <location>${basedir}/../javadoc/avro-docs</location>
- </offlineLink>
- <offlineLink>
- <url>https://developers.google.com/resources/api-libraries/documentation/bigquery/v2/java/latest/</url>
- <location>${basedir}/../javadoc/bq-docs</location>
- </offlineLink>
- <offlineLink>
- <url>https://cloud.google.com/datastore/docs/apis/javadoc/</url>
- <location>${basedir}/../javadoc/datastore-docs</location>
- </offlineLink>
- <offlineLink>
- <url>http://docs.guava-libraries.googlecode.com/git-history/release18/javadoc/</url>
- <location>${basedir}/../javadoc/guava-docs</location>
- </offlineLink>
- <offlineLink>
- <url>http://fasterxml.github.io/jackson-annotations/javadoc/2.7/</url>
- <location>${basedir}/../javadoc/jackson-annotations-docs</location>
- </offlineLink>
- <offlineLink>
- <url>http://fasterxml.github.io/jackson-databind/javadoc/2.7/</url>
- <location>${basedir}/../javadoc/jackson-databind-docs</location>
- </offlineLink>
- <offlineLink>
- <url>http://www.joda.org/joda-time/apidocs</url>
- <location>${basedir}/../javadoc/joda-docs</location>
- </offlineLink>
- <offlineLink>
- <url>https://developers.google.com/api-client-library/java/google-oauth-java-client/reference/1.20.0/</url>
- <location>${basedir}/../javadoc/oauth-docs</location>
- </offlineLink>
- </offlineLinks>
- </configuration>
- <executions>
- <execution>
- <goals>
- <goal>jar</goal>
- </goals>
- <phase>package</phase>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <version>2.4.1</version>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
- <finalName>${project.artifactId}-bundled-${project.version}</finalName>
- <artifactSet>
- <includes>
- <include>*:*</include>
- </includes>
- </artifactSet>
- <filters>
- <filter>
- <artifact>*:*</artifact>
- <excludes>
- <exclude>META-INF/*.SF</exclude>
- <exclude>META-INF/*.DSA</exclude>
- <exclude>META-INF/*.RSA</exclude>
- </excludes>
- </filter>
- </filters>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- <executions>
- <execution>
- <id>default-jar</id>
- <goals>
- <goal>jar</goal>
- </goals>
- </execution>
- <execution>
- <id>default-test-jar</id>
- <goals>
- <goal>test-jar</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
-
- <!-- Coverage analysis for unit tests. -->
- <plugin>
- <groupId>org.jacoco</groupId>
- <artifactId>jacoco-maven-plugin</artifactId>
- </plugin>
- </plugins>
- </build>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.beam</groupId>
- <artifactId>java-sdk-all</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>com.google.api-client</groupId>
- <artifactId>google-api-client</artifactId>
- <version>${google-clients.version}</version>
- <exclusions>
- <!-- Exclude an old version of guava that is being pulled
- in by a transitive dependency of google-api-client -->
- <exclusion>
- <groupId>com.google.guava</groupId>
- <artifactId>guava-jdk5</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>com.google.apis</groupId>
- <artifactId>google-api-services-dataflow</artifactId>
- <version>${dataflow.version}</version>
- <exclusions>
- <!-- Exclude an old version of guava that is being pulled
- in by a transitive dependency of google-api-client -->
- <exclusion>
- <groupId>com.google.guava</groupId>
- <artifactId>guava-jdk5</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>com.google.apis</groupId>
- <artifactId>google-api-services-bigquery</artifactId>
- <version>${bigquery.version}</version>
- <exclusions>
- <!-- Exclude an old version of guava that is being pulled
- in by a transitive dependency of google-api-client -->
- <exclusion>
- <groupId>com.google.guava</groupId>
- <artifactId>guava-jdk5</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>com.google.http-client</groupId>
- <artifactId>google-http-client</artifactId>
- <version>${google-clients.version}</version>
- <exclusions>
- <!-- Exclude an old version of guava that is being pulled
- in by a transitive dependency of google-api-client -->
- <exclusion>
- <groupId>com.google.guava</groupId>
- <artifactId>guava-jdk5</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro</artifactId>
- <version>${avro.version}</version>
- </dependency>
-
- <dependency>
- <groupId>com.google.apis</groupId>
- <artifactId>google-api-services-datastore-protobuf</artifactId>
- <version>${datastore.version}</version>
- <exclusions>
- <!-- Exclude an old version of guava that is being pulled
- in by a transitive dependency of google-api-client -->
- <exclusion>
- <groupId>com.google.guava</groupId>
- <artifactId>guava-jdk5</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>com.google.apis</groupId>
- <artifactId>google-api-services-pubsub</artifactId>
- <version>${pubsub.version}</version>
- <exclusions>
- <!-- Exclude an old version of guava that is being pulled
- in by a transitive dependency of google-api-client -->
- <exclusion>
- <groupId>com.google.guava</groupId>
- <artifactId>guava-jdk5</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>${guava.version}</version>
- </dependency>
-
- <dependency>
- <groupId>com.google.code.findbugs</groupId>
- <artifactId>jsr305</artifactId>
- <version>${jsr305.version}</version>
- </dependency>
-
- <dependency>
- <groupId>joda-time</groupId>
- <artifactId>joda-time</artifactId>
- <version>${joda.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- <version>${slf4j.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-jdk14</artifactId>
- <version>${slf4j.version}</version>
- <scope>runtime</scope>
- </dependency>
-
- <dependency>
- <groupId>javax.servlet</groupId>
- <artifactId>javax.servlet-api</artifactId>
- <version>3.1.0</version>
- </dependency>
-
- <!-- Hamcrest and JUnit are required dependencies of DataflowAssert,
- which is used in the main code of DebuggingWordCount example. -->
-
- <dependency>
- <groupId>org.hamcrest</groupId>
- <artifactId>hamcrest-all</artifactId>
- <version>${hamcrest.version}</version>
- </dependency>
-
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>${junit.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-all</artifactId>
- <version>1.10.19</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
-</project>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/src/main/java/com/google/cloud/dataflow/examples/DebuggingWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/com/google/cloud/dataflow/examples/DebuggingWordCount.java b/examples/src/main/java/com/google/cloud/dataflow/examples/DebuggingWordCount.java
deleted file mode 100644
index 8823dbc..0000000
--- a/examples/src/main/java/com/google/cloud/dataflow/examples/DebuggingWordCount.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.examples;
-
-import com.google.cloud.dataflow.examples.WordCount.WordCountOptions;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.Sum;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.regex.Pattern;
-
-
-/**
- * An example that verifies word counts in Shakespeare and includes Dataflow best practices.
- *
- * <p>This class, {@link DebuggingWordCount}, is the third in a series of four successively more
- * detailed 'word count' examples. You may first want to take a look at {@link MinimalWordCount}
- * and {@link WordCount}. After you've looked at this example, then see the
- * {@link WindowedWordCount} pipeline, for introduction of additional concepts.
- *
- * <p>Basic concepts, also in the MinimalWordCount and WordCount examples:
- * Reading text files; counting a PCollection; executing a Pipeline both locally
- * and using the Dataflow service; defining DoFns.
- *
- * <p>New Concepts:
- * <pre>
- * 1. Logging to Cloud Logging
- * 2. Controlling Dataflow worker log levels
- * 3. Creating a custom aggregator
- * 4. Testing your Pipeline via DataflowAssert
- * </pre>
- *
- * <p>To execute this pipeline locally, specify general pipeline configuration:
- * <pre>{@code
- * --project=YOUR_PROJECT_ID
- * }
- * </pre>
- *
- * <p>To execute this pipeline using the Dataflow service and the additional logging discussed
- * below, specify pipeline configuration:
- * <pre>{@code
- * --project=YOUR_PROJECT_ID
- * --stagingLocation=gs://YOUR_STAGING_DIRECTORY
- * --runner=BlockingDataflowPipelineRunner
- * --workerLogLevelOverrides={"com.google.cloud.dataflow.examples":"DEBUG"}
- * }
- * </pre>
- *
- * <p>Note that when you run via <code>mvn exec</code>, you may need to escape
- * the quotations as appropriate for your shell. For example, in <code>bash</code>:
- * <pre>
- * mvn compile exec:java ... \
- * -Dexec.args="... \
- * --workerLogLevelOverrides={\\\"com.google.cloud.dataflow.examples\\\":\\\"DEBUG\\\"}"
- * </pre>
- *
- * <p>Concept #2: Dataflow workers which execute user code are configured to log to Cloud
- * Logging by default at "INFO" log level and higher. One may override log levels for specific
- * logging namespaces by specifying:
- * <pre><code>
- * --workerLogLevelOverrides={"Name1":"Level1","Name2":"Level2",...}
- * </code></pre>
- * For example, by specifying:
- * <pre><code>
- * --workerLogLevelOverrides={"com.google.cloud.dataflow.examples":"DEBUG"}
- * </code></pre>
- * when executing this pipeline using the Dataflow service, Cloud Logging would contain only
- * "DEBUG" or higher level logs for the {@code com.google.cloud.dataflow.examples} package in
- * addition to the default "INFO" or higher level logs. In addition, the default Dataflow worker
- * logging configuration can be overridden by specifying
- * {@code --defaultWorkerLogLevel=<one of TRACE, DEBUG, INFO, WARN, ERROR>}. For example,
- * by specifying {@code --defaultWorkerLogLevel=DEBUG} when executing this pipeline with
- * the Dataflow service, Cloud Logging would contain all "DEBUG" or higher level logs. Note
- * that changing the default worker log level to TRACE or DEBUG will significantly increase
- * the amount of logs output.
- *
- * <p>The input file defaults to {@code gs://dataflow-samples/shakespeare/kinglear.txt} and can be
- * overridden with {@code --inputFile}.
- */
-public class DebuggingWordCount {
- /** A DoFn that filters for a specific key based upon a regular expression. */
- public static class FilterTextFn extends DoFn<KV<String, Long>, KV<String, Long>> {
- /**
- * Concept #1: The logger below uses the fully qualified class name of FilterTextFn
- * as the logger. All log statements emitted by this logger will be referenced by this name
- * and will be visible in the Cloud Logging UI. Learn more at https://cloud.google.com/logging
- * about the Cloud Logging UI.
- */
- private static final Logger LOG = LoggerFactory.getLogger(FilterTextFn.class);
-
- private final Pattern filter;
- public FilterTextFn(String pattern) {
- filter = Pattern.compile(pattern);
- }
-
- /**
- * Concept #3: A custom aggregator can track values in your pipeline as it runs. Those
- * values will be displayed in the Dataflow Monitoring UI when this pipeline is run using the
- * Dataflow service. These aggregators below track the number of matched and unmatched words.
- * Learn more at https://cloud.google.com/dataflow/pipelines/dataflow-monitoring-intf about
- * the Dataflow Monitoring UI.
- */
- private final Aggregator<Long, Long> matchedWords =
- createAggregator("matchedWords", new Sum.SumLongFn());
- private final Aggregator<Long, Long> unmatchedWords =
- createAggregator("umatchedWords", new Sum.SumLongFn());
-
- @Override
- public void processElement(ProcessContext c) {
- if (filter.matcher(c.element().getKey()).matches()) {
- // Log at the "DEBUG" level each element that we match. When executing this pipeline
- // using the Dataflow service, these log lines will appear in the Cloud Logging UI
- // only if the log level is set to "DEBUG" or lower.
- LOG.debug("Matched: " + c.element().getKey());
- matchedWords.addValue(1L);
- c.output(c.element());
- } else {
- // Log at the "TRACE" level each element that is not matched. Different log levels
- // can be used to control the verbosity of logging providing an effective mechanism
- // to filter less important information.
- LOG.trace("Did not match: " + c.element().getKey());
- unmatchedWords.addValue(1L);
- }
- }
- }
-
- public static void main(String[] args) {
- WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
- .as(WordCountOptions.class);
- Pipeline p = Pipeline.create(options);
-
- PCollection<KV<String, Long>> filteredWords =
- p.apply(TextIO.Read.named("ReadLines").from(options.getInputFile()))
- .apply(new WordCount.CountWords())
- .apply(ParDo.of(new FilterTextFn("Flourish|stomach")));
-
- /**
- * Concept #4: DataflowAssert is a set of convenient PTransforms in the style of
- * Hamcrest's collection matchers that can be used when writing Pipeline level tests
- * to validate the contents of PCollections. DataflowAssert is best used in unit tests
- * with small data sets but is demonstrated here as a teaching tool.
- *
- * <p>Below we verify that the set of filtered words matches our expected counts. Note
- * that DataflowAssert does not provide any output and that successful completion of the
- * Pipeline implies that the expectations were met. Learn more at
- * https://cloud.google.com/dataflow/pipelines/testing-your-pipeline on how to test
- * your Pipeline and see {@link DebuggingWordCountTest} for an example unit test.
- */
- List<KV<String, Long>> expectedResults = Arrays.asList(
- KV.of("Flourish", 3L),
- KV.of("stomach", 1L));
- DataflowAssert.that(filteredWords).containsInAnyOrder(expectedResults);
-
- p.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/src/main/java/com/google/cloud/dataflow/examples/MinimalWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/com/google/cloud/dataflow/examples/MinimalWordCount.java b/examples/src/main/java/com/google/cloud/dataflow/examples/MinimalWordCount.java
deleted file mode 100644
index 4ed0520..0000000
--- a/examples/src/main/java/com/google/cloud/dataflow/examples/MinimalWordCount.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.examples;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner;
-import com.google.cloud.dataflow.sdk.transforms.Count;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.MapElements;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.SimpleFunction;
-import com.google.cloud.dataflow.sdk.values.KV;
-
-
-/**
- * An example that counts words in Shakespeare.
- *
- * <p>This class, {@link MinimalWordCount}, is the first in a series of four successively more
- * detailed 'word count' examples. Here, for simplicity, we don't show any error-checking or
- * argument processing, and focus on construction of the pipeline, which chains together the
- * application of core transforms.
- *
- * <p>Next, see the {@link WordCount} pipeline, then the {@link DebuggingWordCount}, and finally
- * the {@link WindowedWordCount} pipeline, for more detailed examples that introduce additional
- * concepts.
- *
- * <p>Concepts:
- * <pre>
- * 1. Reading data from text files
- * 2. Specifying 'inline' transforms
- * 3. Counting a PCollection
- * 4. Writing data to Cloud Storage as text files
- * </pre>
- *
- * <p>To execute this pipeline, first edit the code to set your project ID, the staging
- * location, and the output location. The specified GCS bucket(s) must already exist.
- *
- * <p>Then, run the pipeline as described in the README. It will be deployed and run using the
- * Dataflow service. No args are required to run the pipeline. You can see the results in your
- * output bucket in the GCS browser.
- */
-public class MinimalWordCount {
-
- public static void main(String[] args) {
- // Create a DataflowPipelineOptions object. This object lets us set various execution
- // options for our pipeline, such as the associated Cloud Platform project and the location
- // in Google Cloud Storage to stage files.
- DataflowPipelineOptions options = PipelineOptionsFactory.create()
- .as(DataflowPipelineOptions.class);
- options.setRunner(BlockingDataflowPipelineRunner.class);
- // CHANGE 1/3: Your project ID is required in order to run your pipeline on the Google Cloud.
- options.setProject("SET_YOUR_PROJECT_ID_HERE");
- // CHANGE 2/3: Your Google Cloud Storage path is required for staging local files.
- options.setStagingLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_STAGING_DIRECTORY");
-
- // Create the Pipeline object with the options we defined above.
- Pipeline p = Pipeline.create(options);
-
- // Apply the pipeline's transforms.
-
- // Concept #1: Apply a root transform to the pipeline; in this case, TextIO.Read to read a set
- // of input text files. TextIO.Read returns a PCollection where each element is one line from
- // the input text (a set of Shakespeare's texts).
- p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*"))
- // Concept #2: Apply a ParDo transform to our PCollection of text lines. This ParDo invokes a
- // DoFn (defined in-line) on each element that tokenizes the text line into individual words.
- // The ParDo returns a PCollection<String>, where each element is an individual word in
- // Shakespeare's collected texts.
- .apply(ParDo.named("ExtractWords").of(new DoFn<String, String>() {
- @Override
- public void processElement(ProcessContext c) {
- for (String word : c.element().split("[^a-zA-Z']+")) {
- if (!word.isEmpty()) {
- c.output(word);
- }
- }
- }
- }))
- // Concept #3: Apply the Count transform to our PCollection of individual words. The Count
- // transform returns a new PCollection of key/value pairs, where each key represents a unique
- // word in the text. The associated value is the occurrence count for that word.
- .apply(Count.<String>perElement())
- // Apply a MapElements transform that formats our PCollection of word counts into a printable
- // string, suitable for writing to an output file.
- .apply("FormatResults", MapElements.via(new SimpleFunction<KV<String, Long>, String>() {
- @Override
- public String apply(KV<String, Long> input) {
- return input.getKey() + ": " + input.getValue();
- }
- }))
- // Concept #4: Apply a write transform, TextIO.Write, at the end of the pipeline.
- // TextIO.Write writes the contents of a PCollection (in this case, our PCollection of
- // formatted strings) to a series of text files in Google Cloud Storage.
- // CHANGE 3/3: The Google Cloud Storage path is required for outputting the results to.
- .apply(TextIO.Write.to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX"));
-
- // Run the pipeline.
- p.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/src/main/java/com/google/cloud/dataflow/examples/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/com/google/cloud/dataflow/examples/WindowedWordCount.java b/examples/src/main/java/com/google/cloud/dataflow/examples/WindowedWordCount.java
deleted file mode 100644
index 2adac55..0000000
--- a/examples/src/main/java/com/google/cloud/dataflow/examples/WindowedWordCount.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.examples;
-
-import com.google.api.services.bigquery.model.TableFieldSchema;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-import com.google.cloud.dataflow.examples.common.DataflowExampleOptions;
-import com.google.cloud.dataflow.examples.common.DataflowExampleUtils;
-import com.google.cloud.dataflow.examples.common.ExampleBigQueryTableOptions;
-import com.google.cloud.dataflow.examples.common.ExamplePubsubTopicOptions;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.PipelineResult;
-import com.google.cloud.dataflow.sdk.io.BigQueryIO;
-import com.google.cloud.dataflow.sdk.io.PubsubIO;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-
-/**
- * An example that counts words in text, and can run over either unbounded or bounded input
- * collections.
- *
- * <p>This class, {@link WindowedWordCount}, is the last in a series of four successively more
- * detailed 'word count' examples. First take a look at {@link MinimalWordCount},
- * {@link WordCount}, and {@link DebuggingWordCount}.
- *
- * <p>Basic concepts, also in the MinimalWordCount, WordCount, and DebuggingWordCount examples:
- * Reading text files; counting a PCollection; writing to GCS; executing a Pipeline both locally
- * and using the Dataflow service; defining DoFns; creating a custom aggregator;
- * user-defined PTransforms; defining PipelineOptions.
- *
- * <p>New Concepts:
- * <pre>
- * 1. Unbounded and bounded pipeline input modes
- * 2. Adding timestamps to data
- * 3. PubSub topics as sources
- * 4. Windowing
- * 5. Re-using PTransforms over windowed PCollections
- * 6. Writing to BigQuery
- * </pre>
- *
- * <p>To execute this pipeline locally, specify general pipeline configuration:
- * <pre>{@code
- * --project=YOUR_PROJECT_ID
- * }
- * </pre>
- *
- * <p>To execute this pipeline using the Dataflow service, specify pipeline configuration:
- * <pre>{@code
- * --project=YOUR_PROJECT_ID
- * --stagingLocation=gs://YOUR_STAGING_DIRECTORY
- * --runner=BlockingDataflowPipelineRunner
- * }
- * </pre>
- *
- * <p>Optionally specify the input file path via:
- * {@code --inputFile=gs://INPUT_PATH},
- * which defaults to {@code gs://dataflow-samples/shakespeare/kinglear.txt}.
- *
- * <p>Specify an output BigQuery dataset and optionally, a table for the output. If you don't
- * specify the table, one will be created for you using the job name. If you don't specify the
- * dataset, a dataset called {@code dataflow-examples} must already exist in your project.
- * {@code --bigQueryDataset=YOUR-DATASET --bigQueryTable=YOUR-NEW-TABLE-NAME}.
- *
- * <p>Decide whether you want your pipeline to run with 'bounded' (such as files in GCS) or
- * 'unbounded' input (such as a PubSub topic). To run with unbounded input, set
- * {@code --unbounded=true}. Then, optionally specify the Google Cloud PubSub topic to read from
- * via {@code --pubsubTopic=projects/PROJECT_ID/topics/YOUR_TOPIC_NAME}. If the topic does not
- * exist, the pipeline will create one for you. It will delete this topic when it terminates.
- * The pipeline will automatically launch an auxiliary batch pipeline to populate the given PubSub
- * topic with the contents of the {@code --inputFile}, in order to make the example easy to run.
- * If you want to use an independently-populated PubSub topic, indicate this by setting
- * {@code --inputFile=""}. In that case, the auxiliary pipeline will not be started.
- *
- * <p>By default, the pipeline will do fixed windowing, on 1-minute windows. You can
- * change this interval by setting the {@code --windowSize} parameter, e.g. {@code --windowSize=10}
- * for 10-minute windows.
- */
-public class WindowedWordCount {
- private static final Logger LOG = LoggerFactory.getLogger(WindowedWordCount.class);
- static final int WINDOW_SIZE = 1; // Default window duration in minutes
-
- /**
- * Concept #2: A DoFn that sets the data element timestamp. This is a silly method, just for
- * this example, for the bounded data case.
- *
- * <p>Imagine that many ghosts of Shakespeare are all typing madly at the same time to recreate
- * his masterworks. Each line of the corpus will get a random associated timestamp somewhere in a
- * 2-hour period.
- */
- static class AddTimestampFn extends DoFn<String, String> {
- private static final long RAND_RANGE = 7200000; // 2 hours in ms
-
- @Override
- public void processElement(ProcessContext c) {
- // Generate a timestamp that falls somewhere in the past two hours.
- long randomTimestamp = System.currentTimeMillis()
- - (int) (Math.random() * RAND_RANGE);
- /**
- * Concept #2: Set the data element with that timestamp.
- */
- c.outputWithTimestamp(c.element(), new Instant(randomTimestamp));
- }
- }
-
- /** A DoFn that converts a Word and Count into a BigQuery table row. */
- static class FormatAsTableRowFn extends DoFn<KV<String, Long>, TableRow> {
- @Override
- public void processElement(ProcessContext c) {
- TableRow row = new TableRow()
- .set("word", c.element().getKey())
- .set("count", c.element().getValue())
- // include a field for the window timestamp
- .set("window_timestamp", c.timestamp().toString());
- c.output(row);
- }
- }
-
- /**
- * Helper method that defines the BigQuery schema used for the output.
- */
- private static TableSchema getSchema() {
- List<TableFieldSchema> fields = new ArrayList<>();
- fields.add(new TableFieldSchema().setName("word").setType("STRING"));
- fields.add(new TableFieldSchema().setName("count").setType("INTEGER"));
- fields.add(new TableFieldSchema().setName("window_timestamp").setType("TIMESTAMP"));
- TableSchema schema = new TableSchema().setFields(fields);
- return schema;
- }
-
- /**
- * Concept #6: We'll stream the results to a BigQuery table. The BigQuery output source is one
- * that supports both bounded and unbounded data. This is a helper method that creates a
- * TableReference from input options, to tell the pipeline where to write its BigQuery results.
- */
- private static TableReference getTableReference(Options options) {
- TableReference tableRef = new TableReference();
- tableRef.setProjectId(options.getProject());
- tableRef.setDatasetId(options.getBigQueryDataset());
- tableRef.setTableId(options.getBigQueryTable());
- return tableRef;
- }
-
- /**
- * Options supported by {@link WindowedWordCount}.
- *
- * <p>Inherits standard example configuration options, which allow specification of the BigQuery
- * table and the PubSub topic, as well as the {@link WordCount.WordCountOptions} support for
- * specification of the input file.
- */
- public static interface Options extends WordCount.WordCountOptions,
- DataflowExampleOptions, ExamplePubsubTopicOptions, ExampleBigQueryTableOptions {
- @Description("Fixed window duration, in minutes")
- @Default.Integer(WINDOW_SIZE)
- Integer getWindowSize();
- void setWindowSize(Integer value);
-
- @Description("Whether to run the pipeline with unbounded input")
- boolean isUnbounded();
- void setUnbounded(boolean value);
- }
-
- public static void main(String[] args) throws IOException {
- Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
- options.setBigQuerySchema(getSchema());
- // DataflowExampleUtils creates the necessary input sources to simplify execution of this
- // Pipeline.
- DataflowExampleUtils exampleDataflowUtils = new DataflowExampleUtils(options,
- options.isUnbounded());
-
- Pipeline pipeline = Pipeline.create(options);
-
- /**
- * Concept #1: the Dataflow SDK lets us run the same pipeline with either a bounded or
- * unbounded input source.
- */
- PCollection<String> input;
- if (options.isUnbounded()) {
- LOG.info("Reading from PubSub.");
- /**
- * Concept #3: Read from the PubSub topic. A topic will be created if it wasn't
- * specified as an argument. The data elements' timestamps will come from the pubsub
- * injection.
- */
- input = pipeline
- .apply(PubsubIO.Read.topic(options.getPubsubTopic()));
- } else {
- /** Else, this is a bounded pipeline. Read from the GCS file. */
- input = pipeline
- .apply(TextIO.Read.from(options.getInputFile()))
- // Concept #2: Add an element timestamp, using an artificial time just to show windowing.
- // See AddTimestampFn for more detail on this.
- .apply(ParDo.of(new AddTimestampFn()));
- }
-
- /**
- * Concept #4: Window into fixed windows. The fixed window size for this example defaults to 1
- * minute (you can change this with a command-line option). See the documentation for more
- * information on how fixed windows work, and for information on the other types of windowing
- * available (e.g., sliding windows).
- */
- PCollection<String> windowedWords = input
- .apply(Window.<String>into(
- FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))));
-
- /**
- * Concept #5: Re-use our existing CountWords transform that does not have knowledge of
- * windows over a PCollection containing windowed values.
- */
- PCollection<KV<String, Long>> wordCounts = windowedWords.apply(new WordCount.CountWords());
-
- /**
- * Concept #6: Format the results for a BigQuery table, then write to BigQuery.
- * The BigQuery output source supports both bounded and unbounded data.
- */
- wordCounts.apply(ParDo.of(new FormatAsTableRowFn()))
- .apply(BigQueryIO.Write
- .to(getTableReference(options))
- .withSchema(getSchema())
- .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
- .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
-
- PipelineResult result = pipeline.run();
-
- /**
- * To mock unbounded input from PubSub, we'll now start an auxiliary 'injector' pipeline that
- * runs for a limited time, and publishes to the input PubSub topic.
- *
- * With an unbounded input source, you will need to explicitly shut down this pipeline when you
- * are done with it, so that you do not continue to be charged for the instances. You can do
- * this via a ctrl-C from the command line, or from the developer's console UI for Dataflow
- * pipelines. The PubSub topic will also be deleted at this time.
- */
- exampleDataflowUtils.mockUnboundedSource(options.getInputFile(), result);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/src/main/java/com/google/cloud/dataflow/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/com/google/cloud/dataflow/examples/WordCount.java b/examples/src/main/java/com/google/cloud/dataflow/examples/WordCount.java
deleted file mode 100644
index 1086106..0000000
--- a/examples/src/main/java/com/google/cloud/dataflow/examples/WordCount.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.examples;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.DefaultValueFactory;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
-import com.google.cloud.dataflow.sdk.transforms.Count;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.MapElements;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.SimpleFunction;
-import com.google.cloud.dataflow.sdk.transforms.Sum;
-import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-
-/**
- * An example that counts words in Shakespeare and includes Dataflow best practices.
- *
- * <p>This class, {@link WordCount}, is the second in a series of four successively more detailed
- * 'word count' examples. You may first want to take a look at {@link MinimalWordCount}.
- * After you've looked at this example, then see the {@link DebuggingWordCount}
- * pipeline, for introduction of additional concepts.
- *
- * <p>For a detailed walkthrough of this example, see
- * <a href="https://cloud.google.com/dataflow/java-sdk/wordcount-example">
- * https://cloud.google.com/dataflow/java-sdk/wordcount-example
- * </a>
- *
- * <p>Basic concepts, also in the MinimalWordCount example:
- * Reading text files; counting a PCollection; writing to GCS.
- *
- * <p>New Concepts:
- * <pre>
- * 1. Executing a Pipeline both locally and using the Dataflow service
- * 2. Using ParDo with static DoFns defined out-of-line
- * 3. Building a composite transform
- * 4. Defining your own pipeline options
- * </pre>
- *
- * <p>Concept #1: you can execute this pipeline either locally or using the Dataflow service.
- * These are now command-line options and not hard-coded as they were in the MinimalWordCount
- * example.
- * To execute this pipeline locally, specify general pipeline configuration:
- * <pre>{@code
- * --project=YOUR_PROJECT_ID
- * }
- * </pre>
- * and a local output file or output prefix on GCS:
- * <pre>{@code
- * --output=[YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PREFIX]
- * }</pre>
- *
- * <p>To execute this pipeline using the Dataflow service, specify pipeline configuration:
- * <pre>{@code
- * --project=YOUR_PROJECT_ID
- * --stagingLocation=gs://YOUR_STAGING_DIRECTORY
- * --runner=BlockingDataflowPipelineRunner
- * }
- * </pre>
- * and an output prefix on GCS:
- * <pre>{@code
- * --output=gs://YOUR_OUTPUT_PREFIX
- * }</pre>
- *
- * <p>The input file defaults to {@code gs://dataflow-samples/shakespeare/kinglear.txt} and can be
- * overridden with {@code --inputFile}.
- */
-public class WordCount {
-
- /**
- * Concept #2: You can make your pipeline code less verbose by defining your DoFns statically out-
- * of-line. This DoFn tokenizes lines of text into individual words; we pass it to a ParDo in the
- * pipeline.
- */
- static class ExtractWordsFn extends DoFn<String, String> {
- private final Aggregator<Long, Long> emptyLines =
- createAggregator("emptyLines", new Sum.SumLongFn());
-
- @Override
- public void processElement(ProcessContext c) {
- if (c.element().trim().isEmpty()) {
- emptyLines.addValue(1L);
- }
-
- // Split the line into words.
- String[] words = c.element().split("[^a-zA-Z']+");
-
- // Output each word encountered into the output PCollection.
- for (String word : words) {
- if (!word.isEmpty()) {
- c.output(word);
- }
- }
- }
- }
-
- /** A SimpleFunction that converts a Word and Count into a printable string. */
- public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
- @Override
- public String apply(KV<String, Long> input) {
- return input.getKey() + ": " + input.getValue();
- }
- }
-
- /**
- * A PTransform that converts a PCollection containing lines of text into a PCollection of
- * formatted word counts.
- *
- * <p>Concept #3: This is a custom composite transform that bundles two transforms (ParDo and
- * Count) as a reusable PTransform subclass. Using composite transforms allows for easy reuse,
- * modular testing, and an improved monitoring experience.
- */
- public static class CountWords extends PTransform<PCollection<String>,
- PCollection<KV<String, Long>>> {
- @Override
- public PCollection<KV<String, Long>> apply(PCollection<String> lines) {
-
- // Convert lines of text into individual words.
- PCollection<String> words = lines.apply(
- ParDo.of(new ExtractWordsFn()));
-
- // Count the number of times each word occurs.
- PCollection<KV<String, Long>> wordCounts =
- words.apply(Count.<String>perElement());
-
- return wordCounts;
- }
- }
-
- /**
- * Options supported by {@link WordCount}.
- *
- * <p>Concept #4: Defining your own configuration options. Here, you can add your own arguments
- * to be processed by the command-line parser, and specify default values for them. You can then
- * access the options values in your pipeline code.
- *
- * <p>Inherits standard configuration options.
- */
- public static interface WordCountOptions extends PipelineOptions {
- @Description("Path of the file to read from")
- @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
- String getInputFile();
- void setInputFile(String value);
-
- @Description("Path of the file to write to")
- @Default.InstanceFactory(OutputFactory.class)
- String getOutput();
- void setOutput(String value);
-
- /**
- * Returns "gs://${YOUR_STAGING_DIRECTORY}/counts.txt" as the default destination.
- */
- public static class OutputFactory implements DefaultValueFactory<String> {
- @Override
- public String create(PipelineOptions options) {
- DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
- if (dataflowOptions.getStagingLocation() != null) {
- return GcsPath.fromUri(dataflowOptions.getStagingLocation())
- .resolve("counts.txt").toString();
- } else {
- throw new IllegalArgumentException("Must specify --output or --stagingLocation");
- }
- }
- }
-
- }
-
- public static void main(String[] args) {
- WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
- .as(WordCountOptions.class);
- Pipeline p = Pipeline.create(options);
-
- // Concepts #2 and #3: Our pipeline applies the composite CountWords transform, and passes the
- // static FormatAsTextFn() to the ParDo transform.
- p.apply(TextIO.Read.named("ReadLines").from(options.getInputFile()))
- .apply(new CountWords())
- .apply(MapElements.via(new FormatAsTextFn()))
- .apply(TextIO.Write.named("WriteCounts").to(options.getOutput()));
-
- p.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/src/main/java/com/google/cloud/dataflow/examples/common/DataflowExampleOptions.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/com/google/cloud/dataflow/examples/common/DataflowExampleOptions.java b/examples/src/main/java/com/google/cloud/dataflow/examples/common/DataflowExampleOptions.java
deleted file mode 100644
index 606bfb4..0000000
--- a/examples/src/main/java/com/google/cloud/dataflow/examples/common/DataflowExampleOptions.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.examples.common;
-
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.Description;
-
-/**
- * Options that can be used to configure the Dataflow examples.
- */
-public interface DataflowExampleOptions extends DataflowPipelineOptions {
- @Description("Whether to keep jobs running on the Dataflow service after local process exit")
- @Default.Boolean(false)
- boolean getKeepJobsRunning();
- void setKeepJobsRunning(boolean keepJobsRunning);
-
- @Description("Number of workers to use when executing the injector pipeline")
- @Default.Integer(1)
- int getInjectorNumWorkers();
- void setInjectorNumWorkers(int numWorkers);
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/src/main/java/com/google/cloud/dataflow/examples/common/DataflowExampleUtils.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/com/google/cloud/dataflow/examples/common/DataflowExampleUtils.java b/examples/src/main/java/com/google/cloud/dataflow/examples/common/DataflowExampleUtils.java
deleted file mode 100644
index 4dfdd85..0000000
--- a/examples/src/main/java/com/google/cloud/dataflow/examples/common/DataflowExampleUtils.java
+++ /dev/null
@@ -1,485 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.examples.common;
-
-import com.google.api.client.googleapis.json.GoogleJsonResponseException;
-import com.google.api.client.googleapis.services.AbstractGoogleClientRequest;
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.Sleeper;
-import com.google.api.services.bigquery.Bigquery;
-import com.google.api.services.bigquery.Bigquery.Datasets;
-import com.google.api.services.bigquery.Bigquery.Tables;
-import com.google.api.services.bigquery.model.Dataset;
-import com.google.api.services.bigquery.model.DatasetReference;
-import com.google.api.services.bigquery.model.Table;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableSchema;
-import com.google.api.services.dataflow.Dataflow;
-import com.google.api.services.pubsub.Pubsub;
-import com.google.api.services.pubsub.model.Subscription;
-import com.google.api.services.pubsub.model.Topic;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.PipelineResult;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.options.BigQueryOptions;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineJob;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
-import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
-import com.google.cloud.dataflow.sdk.transforms.IntraBundleParallelization;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.util.AttemptBoundedExponentialBackOff;
-import com.google.cloud.dataflow.sdk.util.MonitoringUtil;
-import com.google.cloud.dataflow.sdk.util.Transport;
-import com.google.cloud.dataflow.sdk.values.PBegin;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.common.base.Strings;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import javax.servlet.http.HttpServletResponse;
-
-/**
- * The utility class that sets up and tears down external resources, starts the Google Cloud Pub/Sub
- * injector, and cancels the streaming and the injector pipelines once the program terminates.
- *
- * <p>It is used to run Dataflow examples, such as TrafficMaxLaneFlow and TrafficRoutes.
- */
-public class DataflowExampleUtils {
-
- private final DataflowPipelineOptions options;
- private Bigquery bigQueryClient = null;
- private Pubsub pubsubClient = null;
- private Dataflow dataflowClient = null;
- private Set<DataflowPipelineJob> jobsToCancel = Sets.newHashSet();
- private List<String> pendingMessages = Lists.newArrayList();
-
- public DataflowExampleUtils(DataflowPipelineOptions options) {
- this.options = options;
- }
-
- /**
- * Do resources and runner options setup.
- */
- public DataflowExampleUtils(DataflowPipelineOptions options, boolean isUnbounded)
- throws IOException {
- this.options = options;
- setupResourcesAndRunner(isUnbounded);
- }
-
- /**
- * Sets up external resources that are required by the example,
- * such as Pub/Sub topics and BigQuery tables.
- *
- * @throws IOException if there is a problem setting up the resources
- */
- public void setup() throws IOException {
- Sleeper sleeper = Sleeper.DEFAULT;
- BackOff backOff = new AttemptBoundedExponentialBackOff(3, 200);
- Throwable lastException = null;
- try {
- do {
- try {
- setupPubsub();
- setupBigQueryTable();
- return;
- } catch (GoogleJsonResponseException e) {
- lastException = e;
- }
- } while (BackOffUtils.next(sleeper, backOff));
- } catch (InterruptedException e) {
- // Ignore InterruptedException
- }
- Throwables.propagate(lastException);
- }
-
- /**
- * Set up external resources, and configure the runner appropriately.
- */
- public void setupResourcesAndRunner(boolean isUnbounded) throws IOException {
- if (isUnbounded) {
- options.setStreaming(true);
- }
- setup();
- setupRunner();
- }
-
- /**
- * Sets up the Google Cloud Pub/Sub topic.
- *
- * <p>If the topic doesn't exist, a new topic with the given name will be created.
- *
- * @throws IOException if there is a problem setting up the Pub/Sub topic
- */
- public void setupPubsub() throws IOException {
- ExamplePubsubTopicAndSubscriptionOptions pubsubOptions =
- options.as(ExamplePubsubTopicAndSubscriptionOptions.class);
- if (!pubsubOptions.getPubsubTopic().isEmpty()) {
- pendingMessages.add("**********************Set Up Pubsub************************");
- setupPubsubTopic(pubsubOptions.getPubsubTopic());
- pendingMessages.add("The Pub/Sub topic has been set up for this example: "
- + pubsubOptions.getPubsubTopic());
-
- if (!pubsubOptions.getPubsubSubscription().isEmpty()) {
- setupPubsubSubscription(
- pubsubOptions.getPubsubTopic(), pubsubOptions.getPubsubSubscription());
- pendingMessages.add("The Pub/Sub subscription has been set up for this example: "
- + pubsubOptions.getPubsubSubscription());
- }
- }
- }
-
- /**
- * Sets up the BigQuery table with the given schema.
- *
- * <p>If the table already exists, the schema has to match the given one. Otherwise, the example
- * will throw a RuntimeException. If the table doesn't exist, a new table with the given schema
- * will be created.
- *
- * @throws IOException if there is a problem setting up the BigQuery table
- */
- public void setupBigQueryTable() throws IOException {
- ExampleBigQueryTableOptions bigQueryTableOptions =
- options.as(ExampleBigQueryTableOptions.class);
- if (bigQueryTableOptions.getBigQueryDataset() != null
- && bigQueryTableOptions.getBigQueryTable() != null
- && bigQueryTableOptions.getBigQuerySchema() != null) {
- pendingMessages.add("******************Set Up Big Query Table*******************");
- setupBigQueryTable(bigQueryTableOptions.getProject(),
- bigQueryTableOptions.getBigQueryDataset(),
- bigQueryTableOptions.getBigQueryTable(),
- bigQueryTableOptions.getBigQuerySchema());
- pendingMessages.add("The BigQuery table has been set up for this example: "
- + bigQueryTableOptions.getProject()
- + ":" + bigQueryTableOptions.getBigQueryDataset()
- + "." + bigQueryTableOptions.getBigQueryTable());
- }
- }
-
- /**
- * Tears down external resources that can be deleted upon the example's completion.
- */
- private void tearDown() {
- pendingMessages.add("*************************Tear Down*************************");
- ExamplePubsubTopicAndSubscriptionOptions pubsubOptions =
- options.as(ExamplePubsubTopicAndSubscriptionOptions.class);
- if (!pubsubOptions.getPubsubTopic().isEmpty()) {
- try {
- deletePubsubTopic(pubsubOptions.getPubsubTopic());
- pendingMessages.add("The Pub/Sub topic has been deleted: "
- + pubsubOptions.getPubsubTopic());
- } catch (IOException e) {
- pendingMessages.add("Failed to delete the Pub/Sub topic : "
- + pubsubOptions.getPubsubTopic());
- }
- if (!pubsubOptions.getPubsubSubscription().isEmpty()) {
- try {
- deletePubsubSubscription(pubsubOptions.getPubsubSubscription());
- pendingMessages.add("The Pub/Sub subscription has been deleted: "
- + pubsubOptions.getPubsubSubscription());
- } catch (IOException e) {
- pendingMessages.add("Failed to delete the Pub/Sub subscription : "
- + pubsubOptions.getPubsubSubscription());
- }
- }
- }
-
- ExampleBigQueryTableOptions bigQueryTableOptions =
- options.as(ExampleBigQueryTableOptions.class);
- if (bigQueryTableOptions.getBigQueryDataset() != null
- && bigQueryTableOptions.getBigQueryTable() != null
- && bigQueryTableOptions.getBigQuerySchema() != null) {
- pendingMessages.add("The BigQuery table might contain the example's output, "
- + "and it is not deleted automatically: "
- + bigQueryTableOptions.getProject()
- + ":" + bigQueryTableOptions.getBigQueryDataset()
- + "." + bigQueryTableOptions.getBigQueryTable());
- pendingMessages.add("Please go to the Developers Console to delete it manually."
- + " Otherwise, you may be charged for its usage.");
- }
- }
-
- private void setupBigQueryTable(String projectId, String datasetId, String tableId,
- TableSchema schema) throws IOException {
- if (bigQueryClient == null) {
- bigQueryClient = Transport.newBigQueryClient(options.as(BigQueryOptions.class)).build();
- }
-
- Datasets datasetService = bigQueryClient.datasets();
- if (executeNullIfNotFound(datasetService.get(projectId, datasetId)) == null) {
- Dataset newDataset = new Dataset().setDatasetReference(
- new DatasetReference().setProjectId(projectId).setDatasetId(datasetId));
- datasetService.insert(projectId, newDataset).execute();
- }
-
- Tables tableService = bigQueryClient.tables();
- Table table = executeNullIfNotFound(tableService.get(projectId, datasetId, tableId));
- if (table == null) {
- Table newTable = new Table().setSchema(schema).setTableReference(
- new TableReference().setProjectId(projectId).setDatasetId(datasetId).setTableId(tableId));
- tableService.insert(projectId, datasetId, newTable).execute();
- } else if (!table.getSchema().equals(schema)) {
- throw new RuntimeException(
- "Table exists and schemas do not match, expecting: " + schema.toPrettyString()
- + ", actual: " + table.getSchema().toPrettyString());
- }
- }
-
- private void setupPubsubTopic(String topic) throws IOException {
- if (pubsubClient == null) {
- pubsubClient = Transport.newPubsubClient(options).build();
- }
- if (executeNullIfNotFound(pubsubClient.projects().topics().get(topic)) == null) {
- pubsubClient.projects().topics().create(topic, new Topic().setName(topic)).execute();
- }
- }
-
- private void setupPubsubSubscription(String topic, String subscription) throws IOException {
- if (pubsubClient == null) {
- pubsubClient = Transport.newPubsubClient(options).build();
- }
- if (executeNullIfNotFound(pubsubClient.projects().subscriptions().get(subscription)) == null) {
- Subscription subInfo = new Subscription()
- .setAckDeadlineSeconds(60)
- .setTopic(topic);
- pubsubClient.projects().subscriptions().create(subscription, subInfo).execute();
- }
- }
-
- /**
- * Deletes the Google Cloud Pub/Sub topic.
- *
- * @throws IOException if there is a problem deleting the Pub/Sub topic
- */
- private void deletePubsubTopic(String topic) throws IOException {
- if (pubsubClient == null) {
- pubsubClient = Transport.newPubsubClient(options).build();
- }
- if (executeNullIfNotFound(pubsubClient.projects().topics().get(topic)) != null) {
- pubsubClient.projects().topics().delete(topic).execute();
- }
- }
-
- /**
- * Deletes the Google Cloud Pub/Sub subscription.
- *
- * @throws IOException if there is a problem deleting the Pub/Sub subscription
- */
- private void deletePubsubSubscription(String subscription) throws IOException {
- if (pubsubClient == null) {
- pubsubClient = Transport.newPubsubClient(options).build();
- }
- if (executeNullIfNotFound(pubsubClient.projects().subscriptions().get(subscription)) != null) {
- pubsubClient.projects().subscriptions().delete(subscription).execute();
- }
- }
-
- /**
- * If this is an unbounded (streaming) pipeline, and both inputFile and pubsub topic are defined,
- * start an 'injector' pipeline that publishes the contents of the file to the given topic, first
- * creating the topic if necessary.
- */
- public void startInjectorIfNeeded(String inputFile) {
- ExamplePubsubTopicOptions pubsubTopicOptions = options.as(ExamplePubsubTopicOptions.class);
- if (pubsubTopicOptions.isStreaming()
- && !Strings.isNullOrEmpty(inputFile)
- && !Strings.isNullOrEmpty(pubsubTopicOptions.getPubsubTopic())) {
- runInjectorPipeline(inputFile, pubsubTopicOptions.getPubsubTopic());
- }
- }
-
- /**
- * Do some runner setup: check that the DirectPipelineRunner is not used in conjunction with
- * streaming, and if streaming is specified, use the DataflowPipelineRunner. Return the streaming
- * flag value.
- */
- public void setupRunner() {
- if (options.isStreaming() && options.getRunner() != DirectPipelineRunner.class) {
- // In order to cancel the pipelines automatically,
- // {@literal DataflowPipelineRunner} is forced to be used.
- options.setRunner(DataflowPipelineRunner.class);
- }
- }
-
- /**
- * Runs a batch pipeline to inject data into the PubSubIO input topic.
- *
- * <p>The injector pipeline will read from the given text file, and inject data
- * into the Google Cloud Pub/Sub topic.
- */
- public void runInjectorPipeline(String inputFile, String topic) {
- runInjectorPipeline(TextIO.Read.from(inputFile), topic, null);
- }
-
- /**
- * Runs a batch pipeline to inject data into the PubSubIO input topic.
- *
- * <p>The injector pipeline will read from the given source, and inject data
- * into the Google Cloud Pub/Sub topic.
- */
- public void runInjectorPipeline(PTransform<? super PBegin, PCollection<String>> readSource,
- String topic,
- String pubsubTimestampTabelKey) {
- PubsubFileInjector.Bound injector;
- if (Strings.isNullOrEmpty(pubsubTimestampTabelKey)) {
- injector = PubsubFileInjector.publish(topic);
- } else {
- injector = PubsubFileInjector.withTimestampLabelKey(pubsubTimestampTabelKey).publish(topic);
- }
- DataflowPipelineOptions copiedOptions = options.cloneAs(DataflowPipelineOptions.class);
- if (options.getServiceAccountName() != null) {
- copiedOptions.setServiceAccountName(options.getServiceAccountName());
- }
- if (options.getServiceAccountKeyfile() != null) {
- copiedOptions.setServiceAccountKeyfile(options.getServiceAccountKeyfile());
- }
- copiedOptions.setStreaming(false);
- copiedOptions.setNumWorkers(options.as(DataflowExampleOptions.class).getInjectorNumWorkers());
- copiedOptions.setJobName(options.getJobName() + "-injector");
- Pipeline injectorPipeline = Pipeline.create(copiedOptions);
- injectorPipeline.apply(readSource)
- .apply(IntraBundleParallelization
- .of(injector)
- .withMaxParallelism(20));
- PipelineResult result = injectorPipeline.run();
- if (result instanceof DataflowPipelineJob) {
- jobsToCancel.add(((DataflowPipelineJob) result));
- }
- }
-
- /**
- * Runs the provided pipeline to inject data into the PubSubIO input topic.
- */
- public void runInjectorPipeline(Pipeline injectorPipeline) {
- PipelineResult result = injectorPipeline.run();
- if (result instanceof DataflowPipelineJob) {
- jobsToCancel.add(((DataflowPipelineJob) result));
- }
- }
-
- /**
- * Start the auxiliary injector pipeline, then wait for this pipeline to finish.
- */
- public void mockUnboundedSource(String inputFile, PipelineResult result) {
- startInjectorIfNeeded(inputFile);
- waitToFinish(result);
- }
-
- /**
- * If {@literal DataflowPipelineRunner} or {@literal BlockingDataflowPipelineRunner} is used,
- * waits for the pipeline to finish and cancels it (and the injector) before the program exists.
- */
- public void waitToFinish(PipelineResult result) {
- if (result instanceof DataflowPipelineJob) {
- final DataflowPipelineJob job = (DataflowPipelineJob) result;
- jobsToCancel.add(job);
- if (!options.as(DataflowExampleOptions.class).getKeepJobsRunning()) {
- addShutdownHook(jobsToCancel);
- }
- try {
- job.waitToFinish(-1, TimeUnit.SECONDS, new MonitoringUtil.PrintHandler(System.out));
- } catch (Exception e) {
- throw new RuntimeException("Failed to wait for job to finish: " + job.getJobId());
- }
- } else {
- // Do nothing if the given PipelineResult doesn't support waitToFinish(),
- // such as EvaluationResults returned by DirectPipelineRunner.
- tearDown();
- printPendingMessages();
- }
- }
-
- private void addShutdownHook(final Collection<DataflowPipelineJob> jobs) {
- if (dataflowClient == null) {
- dataflowClient = options.getDataflowClient();
- }
-
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- tearDown();
- printPendingMessages();
- for (DataflowPipelineJob job : jobs) {
- System.out.println("Canceling example pipeline: " + job.getJobId());
- try {
- job.cancel();
- } catch (IOException e) {
- System.out.println("Failed to cancel the job,"
- + " please go to the Developers Console to cancel it manually");
- System.out.println(
- MonitoringUtil.getJobMonitoringPageURL(job.getProjectId(), job.getJobId()));
- }
- }
-
- for (DataflowPipelineJob job : jobs) {
- boolean cancellationVerified = false;
- for (int retryAttempts = 6; retryAttempts > 0; retryAttempts--) {
- if (job.getState().isTerminal()) {
- cancellationVerified = true;
- System.out.println("Canceled example pipeline: " + job.getJobId());
- break;
- } else {
- System.out.println(
- "The example pipeline is still running. Verifying the cancellation.");
- }
- try {
- Thread.sleep(10000);
- } catch (InterruptedException e) {
- // Ignore
- }
- }
- if (!cancellationVerified) {
- System.out.println("Failed to verify the cancellation for job: " + job.getJobId());
- System.out.println("Please go to the Developers Console to verify manually:");
- System.out.println(
- MonitoringUtil.getJobMonitoringPageURL(job.getProjectId(), job.getJobId()));
- }
- }
- }
- });
- }
-
- private void printPendingMessages() {
- System.out.println();
- System.out.println("***********************************************************");
- System.out.println("***********************************************************");
- for (String message : pendingMessages) {
- System.out.println(message);
- }
- System.out.println("***********************************************************");
- System.out.println("***********************************************************");
- }
-
- private static <T> T executeNullIfNotFound(
- AbstractGoogleClientRequest<T> request) throws IOException {
- try {
- return request.execute();
- } catch (GoogleJsonResponseException e) {
- if (e.getStatusCode() == HttpServletResponse.SC_NOT_FOUND) {
- return null;
- } else {
- throw e;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/src/main/java/com/google/cloud/dataflow/examples/common/ExampleBigQueryTableOptions.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/com/google/cloud/dataflow/examples/common/ExampleBigQueryTableOptions.java b/examples/src/main/java/com/google/cloud/dataflow/examples/common/ExampleBigQueryTableOptions.java
deleted file mode 100644
index 7c213b5..0000000
--- a/examples/src/main/java/com/google/cloud/dataflow/examples/common/ExampleBigQueryTableOptions.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.examples.common;
-
-import com.google.api.services.bigquery.model.TableSchema;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.DefaultValueFactory;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-
-/**
- * Options that can be used to configure BigQuery tables in Dataflow examples.
- * The project defaults to the project being used to run the example.
- */
-public interface ExampleBigQueryTableOptions extends DataflowPipelineOptions {
- @Description("BigQuery dataset name")
- @Default.String("dataflow_examples")
- String getBigQueryDataset();
- void setBigQueryDataset(String dataset);
-
- @Description("BigQuery table name")
- @Default.InstanceFactory(BigQueryTableFactory.class)
- String getBigQueryTable();
- void setBigQueryTable(String table);
-
- @Description("BigQuery table schema")
- TableSchema getBigQuerySchema();
- void setBigQuerySchema(TableSchema schema);
-
- /**
- * Returns the job name as the default BigQuery table name.
- */
- static class BigQueryTableFactory implements DefaultValueFactory<String> {
- @Override
- public String create(PipelineOptions options) {
- return options.as(DataflowPipelineOptions.class).getJobName()
- .replace('-', '_');
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/src/main/java/com/google/cloud/dataflow/examples/common/ExamplePubsubTopicAndSubscriptionOptions.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/com/google/cloud/dataflow/examples/common/ExamplePubsubTopicAndSubscriptionOptions.java b/examples/src/main/java/com/google/cloud/dataflow/examples/common/ExamplePubsubTopicAndSubscriptionOptions.java
deleted file mode 100644
index d7bd4b8..0000000
--- a/examples/src/main/java/com/google/cloud/dataflow/examples/common/ExamplePubsubTopicAndSubscriptionOptions.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.examples.common;
-
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.DefaultValueFactory;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-
-/**
- * Options that can be used to configure Pub/Sub topic/subscription in Dataflow examples.
- */
-public interface ExamplePubsubTopicAndSubscriptionOptions extends ExamplePubsubTopicOptions {
- @Description("Pub/Sub subscription")
- @Default.InstanceFactory(PubsubSubscriptionFactory.class)
- String getPubsubSubscription();
- void setPubsubSubscription(String subscription);
-
- /**
- * Returns a default Pub/Sub subscription based on the project and the job names.
- */
- static class PubsubSubscriptionFactory implements DefaultValueFactory<String> {
- @Override
- public String create(PipelineOptions options) {
- DataflowPipelineOptions dataflowPipelineOptions =
- options.as(DataflowPipelineOptions.class);
- return "projects/" + dataflowPipelineOptions.getProject()
- + "/subscriptions/" + dataflowPipelineOptions.getJobName();
- }
- }
-}