You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/24 03:48:30 UTC
[66/67] incubator-beam git commit: Directory reorganization
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/257a7a6b/maven-archetypes/pom.xml
----------------------------------------------------------------------
diff --git a/maven-archetypes/pom.xml b/maven-archetypes/pom.xml
deleted file mode 100644
index 4565253..0000000
--- a/maven-archetypes/pom.xml
+++ /dev/null
@@ -1,41 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.beam</groupId>
- <artifactId>parent</artifactId>
- <version>0.1.0-incubating-SNAPSHOT</version>
- <relativePath>../pom.xml</relativePath>
- </parent>
-
- <artifactId>maven-archetypes-parent</artifactId>
- <packaging>pom</packaging>
-
- <name>Apache Beam :: Maven Archetypes</name>
-
- <modules>
- <module>starter</module>
- <module>examples</module>
- </modules>
-
-</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/257a7a6b/maven-archetypes/starter/pom.xml
----------------------------------------------------------------------
diff --git a/maven-archetypes/starter/pom.xml b/maven-archetypes/starter/pom.xml
deleted file mode 100644
index 933e8b1..0000000
--- a/maven-archetypes/starter/pom.xml
+++ /dev/null
@@ -1,57 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.beam</groupId>
- <artifactId>maven-archetypes-parent</artifactId>
- <version>0.1.0-incubating-SNAPSHOT</version>
- <relativePath>../pom.xml</relativePath>
- </parent>
-
- <groupId>org.apache.beam</groupId>
- <artifactId>maven-archetypes-starter</artifactId>
- <name>Apache Beam :: Maven Archetypes :: Starter</name>
- <description>A Maven archetype to create a simple starter pipeline to
- get started using the Apache Beam Java SDK. </description>
-
- <packaging>maven-archetype</packaging>
-
- <build>
- <extensions>
- <extension>
- <groupId>org.apache.maven.archetype</groupId>
- <artifactId>archetype-packaging</artifactId>
- <version>2.4</version>
- </extension>
- </extensions>
-
- <pluginManagement>
- <plugins>
- <plugin>
- <artifactId>maven-archetype-plugin</artifactId>
- <version>2.4</version>
- </plugin>
- </plugins>
- </pluginManagement>
- </build>
-</project>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/257a7a6b/maven-archetypes/starter/src/main/resources/META-INF/maven/archetype-metadata.xml
----------------------------------------------------------------------
diff --git a/maven-archetypes/starter/src/main/resources/META-INF/maven/archetype-metadata.xml b/maven-archetypes/starter/src/main/resources/META-INF/maven/archetype-metadata.xml
deleted file mode 100644
index bf75798..0000000
--- a/maven-archetypes/starter/src/main/resources/META-INF/maven/archetype-metadata.xml
+++ /dev/null
@@ -1,21 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<archetype-descriptor
- xsi:schemaLocation="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0 http://maven.apache.org/xsd/archetype-descriptor-1.0.0.xsd"
- name="Google Cloud Dataflow Starter Pipeline Archetype"
- xmlns="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
- <requiredProperties>
- <requiredProperty key="targetPlatform">
- <defaultValue>1.7</defaultValue>
- </requiredProperty>
- </requiredProperties>
-
- <fileSets>
- <fileSet filtered="true" packaged="true" encoding="UTF-8">
- <directory>src/main/java</directory>
- <includes>
- <include>**/*.java</include>
- </includes>
- </fileSet>
- </fileSets>
-</archetype-descriptor>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/257a7a6b/maven-archetypes/starter/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git a/maven-archetypes/starter/src/main/resources/archetype-resources/pom.xml b/maven-archetypes/starter/src/main/resources/archetype-resources/pom.xml
deleted file mode 100644
index 19e7d2d..0000000
--- a/maven-archetypes/starter/src/main/resources/archetype-resources/pom.xml
+++ /dev/null
@@ -1,43 +0,0 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <groupId>${groupId}</groupId>
- <artifactId>${artifactId}</artifactId>
- <version>${version}</version>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.3</version>
- <configuration>
- <source>${targetPlatform}</source>
- <target>${targetPlatform}</target>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.beam</groupId>
- <artifactId>java-sdk-all</artifactId>
- <version>[0-incubating, 1-incubating)</version>
- </dependency>
-
- <!-- slf4j API frontend binding with JUL backend -->
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- <version>1.7.7</version>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-jdk14</artifactId>
- <version>1.7.7</version>
- </dependency>
- </dependencies>
-</project>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/257a7a6b/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java
----------------------------------------------------------------------
diff --git a/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java b/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java
deleted file mode 100644
index ffabbc0..0000000
--- a/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package ${package};
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A starter example for writing Google Cloud Dataflow programs.
- *
- * <p>The example takes two strings, converts them to their upper-case
- * representation and logs them.
- *
- * <p>To run this starter example locally using DirectPipelineRunner, just
- * execute it without any additional parameters from your favorite development
- * environment.
- *
- * <p>To run this starter example using managed resource in Google Cloud
- * Platform, you should specify the following command-line options:
- * --project=<YOUR_PROJECT_ID>
- * --stagingLocation=<STAGING_LOCATION_IN_CLOUD_STORAGE>
- * --runner=BlockingDataflowPipelineRunner
- */
-public class StarterPipeline {
- private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class);
-
- public static void main(String[] args) {
- Pipeline p = Pipeline.create(
- PipelineOptionsFactory.fromArgs(args).withValidation().create());
-
- p.apply(Create.of("Hello", "World"))
- .apply(ParDo.of(new DoFn<String, String>() {
- @Override
- public void processElement(ProcessContext c) {
- c.output(c.element().toUpperCase());
- }
- }))
- .apply(ParDo.of(new DoFn<String, Void>() {
- @Override
- public void processElement(ProcessContext c) {
- LOG.info(c.element());
- }
- }));
-
- p.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/257a7a6b/maven-archetypes/starter/src/test/resources/projects/basic/archetype.properties
----------------------------------------------------------------------
diff --git a/maven-archetypes/starter/src/test/resources/projects/basic/archetype.properties b/maven-archetypes/starter/src/test/resources/projects/basic/archetype.properties
deleted file mode 100644
index c59e77a..0000000
--- a/maven-archetypes/starter/src/test/resources/projects/basic/archetype.properties
+++ /dev/null
@@ -1,5 +0,0 @@
-package=it.pkg
-version=0.1-SNAPSHOT
-groupId=archetype.it
-artifactId=basic
-targetPlatform=1.7
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/257a7a6b/maven-archetypes/starter/src/test/resources/projects/basic/goal.txt
----------------------------------------------------------------------
diff --git a/maven-archetypes/starter/src/test/resources/projects/basic/goal.txt b/maven-archetypes/starter/src/test/resources/projects/basic/goal.txt
deleted file mode 100644
index 0b59873..0000000
--- a/maven-archetypes/starter/src/test/resources/projects/basic/goal.txt
+++ /dev/null
@@ -1 +0,0 @@
-verify
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/257a7a6b/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml
----------------------------------------------------------------------
diff --git a/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml b/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml
deleted file mode 100644
index d29424a..0000000
--- a/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml
+++ /dev/null
@@ -1,43 +0,0 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <groupId>archetype.it</groupId>
- <artifactId>basic</artifactId>
- <version>0.1-SNAPSHOT</version>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.3</version>
- <configuration>
- <source>1.7</source>
- <target>1.7</target>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.beam</groupId>
- <artifactId>java-sdk-all</artifactId>
- <version>[0-incubating, 1-incubating)</version>
- </dependency>
-
- <!-- slf4j API frontend binding with JUL backend -->
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- <version>1.7.7</version>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-jdk14</artifactId>
- <version>1.7.7</version>
- </dependency>
- </dependencies>
-</project>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/257a7a6b/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java
----------------------------------------------------------------------
diff --git a/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java b/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java
deleted file mode 100644
index 2e7c4e1..0000000
--- a/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package it.pkg;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A starter example for writing Google Cloud Dataflow programs.
- *
- * <p>The example takes two strings, converts them to their upper-case
- * representation and logs them.
- *
- * <p>To run this starter example locally using DirectPipelineRunner, just
- * execute it without any additional parameters from your favorite development
- * environment.
- *
- * <p>To run this starter example using managed resource in Google Cloud
- * Platform, you should specify the following command-line options:
- * --project=<YOUR_PROJECT_ID>
- * --stagingLocation=<STAGING_LOCATION_IN_CLOUD_STORAGE>
- * --runner=BlockingDataflowPipelineRunner
- */
-public class StarterPipeline {
- private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class);
-
- public static void main(String[] args) {
- Pipeline p = Pipeline.create(
- PipelineOptionsFactory.fromArgs(args).withValidation().create());
-
- p.apply(Create.of("Hello", "World"))
- .apply(ParDo.of(new DoFn<String, String>() {
- @Override
- public void processElement(ProcessContext c) {
- c.output(c.element().toUpperCase());
- }
- }))
- .apply(ParDo.of(new DoFn<String, Void>() {
- @Override
- public void processElement(ProcessContext c) {
- LOG.info(c.element());
- }
- }));
-
- p.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/257a7a6b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6b2fd93..b79ddf6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -128,7 +128,7 @@
<module>sdks/java/core</module>
<module>runners</module>
<module>examples/java</module>
- <module>maven-archetypes</module>
+ <module>sdks/java/maven-archetypes</module>
</modules>
<profiles>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/257a7a6b/sdks/java/maven-archetypes/examples/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/pom.xml b/sdks/java/maven-archetypes/examples/pom.xml
new file mode 100644
index 0000000..7e74b9d
--- /dev/null
+++ b/sdks/java/maven-archetypes/examples/pom.xml
@@ -0,0 +1,56 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>maven-archetypes-parent</artifactId>
+ <version>0.1.0-incubating-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>maven-archetypes-examples</artifactId>
+ <name>Apache Beam :: Maven Archetypes :: Examples</name>
+ <description>A Maven Archetype to create a project containing all the
+ example pipelines from the Apache Beam Java SDK.</description>
+
+ <packaging>maven-archetype</packaging>
+
+ <build>
+ <extensions>
+ <extension>
+ <groupId>org.apache.maven.archetype</groupId>
+ <artifactId>archetype-packaging</artifactId>
+ <version>2.4</version>
+ </extension>
+ </extensions>
+
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <artifactId>maven-archetype-plugin</artifactId>
+ <version>2.4</version>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/257a7a6b/sdks/java/maven-archetypes/examples/src/main/resources/META-INF/maven/archetype-metadata.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/META-INF/maven/archetype-metadata.xml b/sdks/java/maven-archetypes/examples/src/main/resources/META-INF/maven/archetype-metadata.xml
new file mode 100644
index 0000000..7742af4
--- /dev/null
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/META-INF/maven/archetype-metadata.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<archetype-descriptor
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0 http://maven.apache.org/xsd/archetype-descriptor-1.0.0.xsd"
+ name="Google Cloud Dataflow Example Pipelines Archetype"
+ xmlns="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+
+ <requiredProperties>
+ <requiredProperty key="targetPlatform">
+ <defaultValue>1.7</defaultValue>
+ </requiredProperty>
+ </requiredProperties>
+
+ <fileSets>
+ <fileSet filtered="true" packaged="true" encoding="UTF-8">
+ <directory>src/main/java</directory>
+ <includes>
+ <include>**/*.java</include>
+ </includes>
+ </fileSet>
+
+ <fileSet filtered="true" packaged="true" encoding="UTF-8">
+ <directory>src/test/java</directory>
+ <includes>
+ <include>**/*.java</include>
+ </includes>
+ </fileSet>
+ </fileSets>
+</archetype-descriptor>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/257a7a6b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
new file mode 100644
index 0000000..d19d0c6
--- /dev/null
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
@@ -0,0 +1,204 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+ ~ Copyright (C) 2015 Google Inc.
+ ~
+ ~ Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ ~ use this file except in compliance with the License. You may obtain a copy of
+ ~ the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ ~ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ ~ License for the specific language governing permissions and limitations under
+ ~ the License.
+ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>${groupId}</groupId>
+ <artifactId>${artifactId}</artifactId>
+ <version>${version}</version>
+
+ <packaging>jar</packaging>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.3</version>
+ <configuration>
+ <source>${targetPlatform}</source>
+ <target>${targetPlatform}</target>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>2.3</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <finalName>${project.artifactId}-bundled-${project.version}</finalName>
+ <artifactSet>
+ <includes>
+ <include>*:*</include>
+ </includes>
+ </artifactSet>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.18.1</version>
+ <configuration>
+ <parallel>all</parallel>
+ <threadCount>4</threadCount>
+ <redirectTestOutputToFile>true</redirectTestOutputToFile>
+ </configuration>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.maven.surefire</groupId>
+ <artifactId>surefire-junit47</artifactId>
+ <version>2.18.1</version>
+ </dependency>
+ </dependencies>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <!-- Adds a dependency on a specific version of the Dataflow SDK. -->
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>java-sdk-all</artifactId>
+ <version>[0-incubating, 2-incubating)</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.api-client</groupId>
+ <artifactId>google-api-client</artifactId>
+ <version>1.21.0</version>
+ <exclusions>
+ <!-- Exclude an old version of guava that is being pulled
+ in by a transitive dependency of google-api-client -->
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava-jdk5</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <!-- Dependencies below this line are specific dependencies needed by the examples code. -->
+ <dependency>
+ <groupId>com.google.apis</groupId>
+ <artifactId>google-api-services-bigquery</artifactId>
+ <version>v2-rev248-1.21.0</version>
+ <exclusions>
+ <!-- Exclude an old version of guava that is being pulled
+ in by a transitive dependency of google-api-client -->
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava-jdk5</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.http-client</groupId>
+ <artifactId>google-http-client</artifactId>
+ <version>1.21.0</version>
+ <exclusions>
+ <!-- Exclude an old version of guava that is being pulled
+ in by a transitive dependency of google-api-client -->
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava-jdk5</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.apis</groupId>
+ <artifactId>google-api-services-pubsub</artifactId>
+ <version>v1-rev7-1.21.0</version>
+ <exclusions>
+ <!-- Exclude an old version of guava that is being pulled
+ in by a transitive dependency of google-api-client -->
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava-jdk5</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ <version>2.4</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>18.0</version>
+ </dependency>
+
+ <dependency>
+ <groupId>javax.servlet</groupId>
+ <artifactId>javax.servlet-api</artifactId>
+ <version>3.1.0</version>
+ </dependency>
+
+ <!-- Add slf4j API frontend binding with JUL backend -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.7.7</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-jdk14</artifactId>
+ <version>1.7.7</version>
+ <!-- When loaded at runtime this will wire up slf4j to the JUL backend -->
+ <scope>runtime</scope>
+ </dependency>
+
+ <!-- Hamcrest and JUnit are required dependencies of DataflowAssert,
+ which is used in the main code of DebuggingWordCount example. -->
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <version>1.3</version>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.11</version>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/257a7a6b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java
new file mode 100644
index 0000000..3cf2bc0
--- /dev/null
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java
@@ -0,0 +1,182 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package ${package};
+
+import ${package}.WordCount.WordCountOptions;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
+import com.google.cloud.dataflow.sdk.transforms.Aggregator;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.Sum;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.regex.Pattern;
+
+
+/**
+ * An example that verifies word counts in Shakespeare and includes Dataflow best practices.
+ *
+ * <p>This class, {@link DebuggingWordCount}, is the third in a series of four successively more
+ * detailed 'word count' examples. You may first want to take a look at {@link MinimalWordCount}
+ * and {@link WordCount}. After you've looked at this example, then see the
+ * {@link WindowedWordCount} pipeline, for introduction of additional concepts.
+ *
+ * <p>Basic concepts, also in the MinimalWordCount and WordCount examples:
+ * Reading text files; counting a PCollection; executing a Pipeline both locally
+ * and using the Dataflow service; defining DoFns.
+ *
+ * <p>New Concepts:
+ * <pre>
+ * 1. Logging to Cloud Logging
+ * 2. Controlling Dataflow worker log levels
+ * 3. Creating a custom aggregator
+ * 4. Testing your Pipeline via DataflowAssert
+ * </pre>
+ *
+ * <p>To execute this pipeline locally, specify general pipeline configuration:
+ * <pre>{@code
+ * --project=YOUR_PROJECT_ID
+ * }
+ * </pre>
+ *
+ * <p>To execute this pipeline using the Dataflow service and the additional logging discussed
+ * below, specify pipeline configuration:
+ * <pre>{@code
+ * --project=YOUR_PROJECT_ID
+ * --stagingLocation=gs://YOUR_STAGING_DIRECTORY
+ * --runner=BlockingDataflowPipelineRunner
+ * --workerLogLevelOverrides={"com.google.cloud.dataflow.examples":"DEBUG"}
+ * }
+ * </pre>
+ *
+ * <p>Note that when you run via <code>mvn exec</code>, you may need to escape
+ * the quotations as appropriate for your shell. For example, in <code>bash</code>:
+ * <pre>
+ * mvn compile exec:java ... \
+ * -Dexec.args="... \
+ * --workerLogLevelOverrides={\\\"com.google.cloud.dataflow.examples\\\":\\\"DEBUG\\\"}"
+ * </pre>
+ *
+ * <p>Concept #2: Dataflow workers which execute user code are configured to log to Cloud
+ * Logging by default at "INFO" log level and higher. One may override log levels for specific
+ * logging namespaces by specifying:
+ * <pre><code>
+ * --workerLogLevelOverrides={"Name1":"Level1","Name2":"Level2",...}
+ * </code></pre>
+ * For example, by specifying:
+ * <pre><code>
+ * --workerLogLevelOverrides={"com.google.cloud.dataflow.examples":"DEBUG"}
+ * </code></pre>
+ * when executing this pipeline using the Dataflow service, Cloud Logging would contain only
+ * "DEBUG" or higher level logs for the {@code com.google.cloud.dataflow.examples} package in
+ * addition to the default "INFO" or higher level logs. In addition, the default Dataflow worker
+ * logging configuration can be overridden by specifying
+ * {@code --defaultWorkerLogLevel=<one of TRACE, DEBUG, INFO, WARN, ERROR>}. For example,
+ * by specifying {@code --defaultWorkerLogLevel=DEBUG} when executing this pipeline with
+ * the Dataflow service, Cloud Logging would contain all "DEBUG" or higher level logs. Note
+ * that changing the default worker log level to TRACE or DEBUG will significantly increase
+ * the amount of logs output.
+ *
+ * <p>The input file defaults to {@code gs://dataflow-samples/shakespeare/kinglear.txt} and can be
+ * overridden with {@code --inputFile}.
+ */
+public class DebuggingWordCount {
+ /** A DoFn that filters for a specific key based upon a regular expression. */
+ public static class FilterTextFn extends DoFn<KV<String, Long>, KV<String, Long>> {
+ /**
+ * Concept #1: The logger below uses the fully qualified class name of FilterTextFn
+ * as the logger. All log statements emitted by this logger will be referenced by this name
+ * and will be visible in the Cloud Logging UI. Learn more at https://cloud.google.com/logging
+ * about the Cloud Logging UI.
+ */
+ private static final Logger LOG = LoggerFactory.getLogger(FilterTextFn.class);
+
+ private final Pattern filter;
+ public FilterTextFn(String pattern) {
+ filter = Pattern.compile(pattern);
+ }
+
+ /**
+ * Concept #3: A custom aggregator can track values in your pipeline as it runs. Those
+ * values will be displayed in the Dataflow Monitoring UI when this pipeline is run using the
+ * Dataflow service. These aggregators below track the number of matched and unmatched words.
+ * Learn more at https://cloud.google.com/dataflow/pipelines/dataflow-monitoring-intf about
+ * the Dataflow Monitoring UI.
+ */
+ private final Aggregator<Long, Long> matchedWords =
+ createAggregator("matchedWords", new Sum.SumLongFn());
+ private final Aggregator<Long, Long> unmatchedWords =
+ createAggregator("umatchedWords", new Sum.SumLongFn());
+
+ @Override
+ public void processElement(ProcessContext c) {
+ if (filter.matcher(c.element().getKey()).matches()) {
+ // Log at the "DEBUG" level each element that we match. When executing this pipeline
+ // using the Dataflow service, these log lines will appear in the Cloud Logging UI
+ // only if the log level is set to "DEBUG" or lower.
+ LOG.debug("Matched: " + c.element().getKey());
+ matchedWords.addValue(1L);
+ c.output(c.element());
+ } else {
+ // Log at the "TRACE" level each element that is not matched. Different log levels
+ // can be used to control the verbosity of logging providing an effective mechanism
+ // to filter less important information.
+ LOG.trace("Did not match: " + c.element().getKey());
+ unmatchedWords.addValue(1L);
+ }
+ }
+ }
+
+ public static void main(String[] args) {
+ WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
+ .as(WordCountOptions.class);
+ Pipeline p = Pipeline.create(options);
+
+ PCollection<KV<String, Long>> filteredWords =
+ p.apply(TextIO.Read.named("ReadLines").from(options.getInputFile()))
+ .apply(new WordCount.CountWords())
+ .apply(ParDo.of(new FilterTextFn("Flourish|stomach")));
+
+ /**
+ * Concept #4: DataflowAssert is a set of convenient PTransforms in the style of
+ * Hamcrest's collection matchers that can be used when writing Pipeline level tests
+ * to validate the contents of PCollections. DataflowAssert is best used in unit tests
+ * with small data sets but is demonstrated here as a teaching tool.
+ *
+ * <p>Below we verify that the set of filtered words matches our expected counts. Note
+ * that DataflowAssert does not provide any output and that successful completion of the
+ * Pipeline implies that the expectations were met. Learn more at
+ * https://cloud.google.com/dataflow/pipelines/testing-your-pipeline on how to test
+ * your Pipeline and see {@link DebuggingWordCountTest} for an example unit test.
+ */
+ List<KV<String, Long>> expectedResults = Arrays.asList(
+ KV.of("Flourish", 3L),
+ KV.of("stomach", 1L));
+ DataflowAssert.that(filteredWords).containsInAnyOrder(expectedResults);
+
+ p.run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/257a7a6b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java
new file mode 100644
index 0000000..035db01
--- /dev/null
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java
@@ -0,0 +1,115 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package ${package};
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner;
+import com.google.cloud.dataflow.sdk.transforms.Count;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.values.KV;
+
+
+/**
+ * An example that counts words in Shakespeare.
+ *
+ * <p>This class, {@link MinimalWordCount}, is the first in a series of four successively more
+ * detailed 'word count' examples. Here, for simplicity, we don't show any error-checking or
+ * argument processing, and focus on construction of the pipeline, which chains together the
+ * application of core transforms.
+ *
+ * <p>Next, see the {@link WordCount} pipeline, then the {@link DebuggingWordCount}, and finally
+ * the {@link WindowedWordCount} pipeline, for more detailed examples that introduce additional
+ * concepts.
+ *
+ * <p>Concepts:
+ * <pre>
+ * 1. Reading data from text files
+ * 2. Specifying 'inline' transforms
+ * 3. Counting a PCollection
+ * 4. Writing data to Cloud Storage as text files
+ * </pre>
+ *
+ * <p>To execute this pipeline, first edit the code to set your project ID, the staging
+ * location, and the output location. The specified GCS bucket(s) must already exist.
+ *
+ * <p>Then, run the pipeline as described in the README. It will be deployed and run using the
+ * Dataflow service. No args are required to run the pipeline. You can see the results in your
+ * output bucket in the GCS browser.
+ */
+public class MinimalWordCount {
+
+ public static void main(String[] args) {
+ // Create a DataflowPipelineOptions object. This object lets us set various execution
+ // options for our pipeline, such as the associated Cloud Platform project and the location
+ // in Google Cloud Storage to stage files.
+ DataflowPipelineOptions options = PipelineOptionsFactory.create()
+ .as(DataflowPipelineOptions.class);
+ options.setRunner(BlockingDataflowPipelineRunner.class);
+ // CHANGE 1/3: Your project ID is required in order to run your pipeline on the Google Cloud.
+ options.setProject("SET_YOUR_PROJECT_ID_HERE");
+ // CHANGE 2/3: Your Google Cloud Storage path is required for staging local files.
+ options.setStagingLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_STAGING_DIRECTORY");
+
+ // Create the Pipeline object with the options we defined above.
+ Pipeline p = Pipeline.create(options);
+
+ // Apply the pipeline's transforms.
+
+ // Concept #1: Apply a root transform to the pipeline; in this case, TextIO.Read to read a set
+ // of input text files. TextIO.Read returns a PCollection where each element is one line from
+ // the input text (a set of Shakespeare's texts).
+ p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*"))
+ // Concept #2: Apply a ParDo transform to our PCollection of text lines. This ParDo invokes a
+ // DoFn (defined in-line) on each element that tokenizes the text line into individual words.
+ // The ParDo returns a PCollection<String>, where each element is an individual word in
+ // Shakespeare's collected texts.
+ .apply(ParDo.named("ExtractWords").of(new DoFn<String, String>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ for (String word : c.element().split("[^a-zA-Z']+")) {
+ if (!word.isEmpty()) {
+ c.output(word);
+ }
+ }
+ }
+ }))
+ // Concept #3: Apply the Count transform to our PCollection of individual words. The Count
+ // transform returns a new PCollection of key/value pairs, where each key represents a unique
+ // word in the text. The associated value is the occurrence count for that word.
+ .apply(Count.<String>perElement())
+ // Apply another ParDo transform that formats our PCollection of word counts into a printable
+ // string, suitable for writing to an output file.
+ .apply(ParDo.named("FormatResults").of(new DoFn<KV<String, Long>, String>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ c.output(c.element().getKey() + ": " + c.element().getValue());
+ }
+ }))
+ // Concept #4: Apply a write transform, TextIO.Write, at the end of the pipeline.
+ // TextIO.Write writes the contents of a PCollection (in this case, our PCollection of
+ // formatted strings) to a series of text files in Google Cloud Storage.
+ // CHANGE 3/3: The Google Cloud Storage path is required for outputting the results to.
+ .apply(TextIO.Write.to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX"));
+
+ // Run the pipeline.
+ p.run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/257a7a6b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java
new file mode 100644
index 0000000..29921e2
--- /dev/null
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java
@@ -0,0 +1,262 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package ${package};
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import ${package}.common.DataflowExampleUtils;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.PipelineResult;
+import com.google.cloud.dataflow.sdk.io.BigQueryIO;
+import com.google.cloud.dataflow.sdk.io.PubsubIO;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * An example that counts words in text, and can run over either unbounded or bounded input
+ * collections.
+ *
+ * <p>This class, {@link WindowedWordCount}, is the last in a series of four successively more
+ * detailed 'word count' examples. First take a look at {@link MinimalWordCount},
+ * {@link WordCount}, and {@link DebuggingWordCount}.
+ *
+ * <p>Basic concepts, also in the MinimalWordCount, WordCount, and DebuggingWordCount examples:
+ * Reading text files; counting a PCollection; writing to GCS; executing a Pipeline both locally
+ * and using the Dataflow service; defining DoFns; creating a custom aggregator;
+ * user-defined PTransforms; defining PipelineOptions.
+ *
+ * <p>New Concepts:
+ * <pre>
+ * 1. Unbounded and bounded pipeline input modes
+ * 2. Adding timestamps to data
+ * 3. PubSub topics as sources
+ * 4. Windowing
+ * 5. Re-using PTransforms over windowed PCollections
+ * 6. Writing to BigQuery
+ * </pre>
+ *
+ * <p>To execute this pipeline locally, specify general pipeline configuration:
+ * <pre>{@code
+ * --project=YOUR_PROJECT_ID
+ * }
+ * </pre>
+ *
+ * <p>To execute this pipeline using the Dataflow service, specify pipeline configuration:
+ * <pre>{@code
+ * --project=YOUR_PROJECT_ID
+ * --stagingLocation=gs://YOUR_STAGING_DIRECTORY
+ * --runner=BlockingDataflowPipelineRunner
+ * }
+ * </pre>
+ *
+ * <p>Optionally specify the input file path via:
+ * {@code --inputFile=gs://INPUT_PATH},
+ * which defaults to {@code gs://dataflow-samples/shakespeare/kinglear.txt}.
+ *
+ * <p>Specify an output BigQuery dataset and optionally, a table for the output. If you don't
+ * specify the table, one will be created for you using the job name. If you don't specify the
+ * dataset, a dataset called {@code dataflow-examples} must already exist in your project.
+ * {@code --bigQueryDataset=YOUR-DATASET --bigQueryTable=YOUR-NEW-TABLE-NAME}.
+ *
+ * <p>Decide whether you want your pipeline to run with 'bounded' (such as files in GCS) or
+ * 'unbounded' input (such as a PubSub topic). To run with unbounded input, set
+ * {@code --unbounded=true}. Then, optionally specify the Google Cloud PubSub topic to read from
+ * via {@code --pubsubTopic=projects/PROJECT_ID/topics/YOUR_TOPIC_NAME}. If the topic does not
+ * exist, the pipeline will create one for you. It will delete this topic when it terminates.
+ * The pipeline will automatically launch an auxiliary batch pipeline to populate the given PubSub
+ * topic with the contents of the {@code --inputFile}, in order to make the example easy to run.
+ * If you want to use an independently-populated PubSub topic, indicate this by setting
+ * {@code --inputFile=""}. In that case, the auxiliary pipeline will not be started.
+ *
+ * <p>By default, the pipeline will do fixed windowing, on 1-minute windows. You can
+ * change this interval by setting the {@code --windowSize} parameter, e.g. {@code --windowSize=10}
+ * for 10-minute windows.
+ */
+public class WindowedWordCount {
+ private static final Logger LOG = LoggerFactory.getLogger(WindowedWordCount.class);
+ static final int WINDOW_SIZE = 1; // Default window duration in minutes
+
+ /**
+ * Concept #2: A DoFn that sets the data element timestamp. This is a silly method, just for
+ * this example, for the bounded data case.
+ *
+ * <p>Imagine that many ghosts of Shakespeare are all typing madly at the same time to recreate
+ * his masterworks. Each line of the corpus will get a random associated timestamp somewhere in a
+ * 2-hour period.
+ */
+ static class AddTimestampFn extends DoFn<String, String> {
+ private static final long RAND_RANGE = 7200000; // 2 hours in ms
+
+ @Override
+ public void processElement(ProcessContext c) {
+ // Generate a timestamp that falls somewhere in the past two hours.
+ long randomTimestamp = System.currentTimeMillis()
+ - (int) (Math.random() * RAND_RANGE);
+ /**
+ * Concept #2: Set the data element with that timestamp.
+ */
+ c.outputWithTimestamp(c.element(), new Instant(randomTimestamp));
+ }
+ }
+
+ /** A DoFn that converts a Word and Count into a BigQuery table row. */
+ static class FormatAsTableRowFn extends DoFn<KV<String, Long>, TableRow> {
+ @Override
+ public void processElement(ProcessContext c) {
+ TableRow row = new TableRow()
+ .set("word", c.element().getKey())
+ .set("count", c.element().getValue())
+ // include a field for the window timestamp
+ .set("window_timestamp", c.timestamp().toString());
+ c.output(row);
+ }
+ }
+
+ /**
+ * Helper method that defines the BigQuery schema used for the output.
+ */
+ private static TableSchema getSchema() {
+ List<TableFieldSchema> fields = new ArrayList<>();
+ fields.add(new TableFieldSchema().setName("word").setType("STRING"));
+ fields.add(new TableFieldSchema().setName("count").setType("INTEGER"));
+ fields.add(new TableFieldSchema().setName("window_timestamp").setType("TIMESTAMP"));
+ TableSchema schema = new TableSchema().setFields(fields);
+ return schema;
+ }
+
+ /**
+ * Concept #6: We'll stream the results to a BigQuery table. The BigQuery output source is one
+ * that supports both bounded and unbounded data. This is a helper method that creates a
+ * TableReference from input options, to tell the pipeline where to write its BigQuery results.
+ */
+ private static TableReference getTableReference(Options options) {
+ TableReference tableRef = new TableReference();
+ tableRef.setProjectId(options.getProject());
+ tableRef.setDatasetId(options.getBigQueryDataset());
+ tableRef.setTableId(options.getBigQueryTable());
+ return tableRef;
+ }
+
+ /**
+ * Options supported by {@link WindowedWordCount}.
+ *
+ * <p>Inherits standard example configuration options, which allow specification of the BigQuery
+ * table and the PubSub topic, as well as the {@link WordCount.WordCountOptions} support for
+ * specification of the input file.
+ */
+ public static interface Options
+ extends WordCount.WordCountOptions, DataflowExampleUtils.DataflowExampleUtilsOptions {
+ @Description("Fixed window duration, in minutes")
+ @Default.Integer(WINDOW_SIZE)
+ Integer getWindowSize();
+ void setWindowSize(Integer value);
+
+ @Description("Whether to run the pipeline with unbounded input")
+ boolean isUnbounded();
+ void setUnbounded(boolean value);
+ }
+
+ public static void main(String[] args) throws IOException {
+ Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+ options.setBigQuerySchema(getSchema());
+ // DataflowExampleUtils creates the necessary input sources to simplify execution of this
+ // Pipeline.
+ DataflowExampleUtils exampleDataflowUtils = new DataflowExampleUtils(options,
+ options.isUnbounded());
+
+ Pipeline pipeline = Pipeline.create(options);
+
+ /**
+ * Concept #1: the Dataflow SDK lets us run the same pipeline with either a bounded or
+ * unbounded input source.
+ */
+ PCollection<String> input;
+ if (options.isUnbounded()) {
+ LOG.info("Reading from PubSub.");
+ /**
+ * Concept #3: Read from the PubSub topic. A topic will be created if it wasn't
+ * specified as an argument. The data elements' timestamps will come from the pubsub
+ * injection.
+ */
+ input = pipeline
+ .apply(PubsubIO.Read.topic(options.getPubsubTopic()));
+ } else {
+ /** Else, this is a bounded pipeline. Read from the GCS file. */
+ input = pipeline
+ .apply(TextIO.Read.from(options.getInputFile()))
+ // Concept #2: Add an element timestamp, using an artificial time just to show windowing.
+ // See AddTimestampFn for more detail on this.
+ .apply(ParDo.of(new AddTimestampFn()));
+ }
+
+ /**
+ * Concept #4: Window into fixed windows. The fixed window size for this example defaults to 1
+ * minute (you can change this with a command-line option). See the documentation for more
+ * information on how fixed windows work, and for information on the other types of windowing
+ * available (e.g., sliding windows).
+ */
+ PCollection<String> windowedWords = input
+ .apply(Window.<String>into(
+ FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))));
+
+ /**
+ * Concept #5: Re-use our existing CountWords transform that does not have knowledge of
+ * windows over a PCollection containing windowed values.
+ */
+ PCollection<KV<String, Long>> wordCounts = windowedWords.apply(new WordCount.CountWords());
+
+ /**
+ * Concept #6: Format the results for a BigQuery table, then write to BigQuery.
+ * The BigQuery output source supports both bounded and unbounded data.
+ */
+ wordCounts.apply(ParDo.of(new FormatAsTableRowFn()))
+ .apply(BigQueryIO.Write.to(getTableReference(options)).withSchema(getSchema()));
+
+ PipelineResult result = pipeline.run();
+
+ /**
+ * To mock unbounded input from PubSub, we'll now start an auxiliary 'injector' pipeline that
+ * runs for a limited time, and publishes to the input PubSub topic.
+ *
+ * With an unbounded input source, you will need to explicitly shut down this pipeline when you
+ * are done with it, so that you do not continue to be charged for the instances. You can do
+ * this via a ctrl-C from the command line, or from the developer's console UI for Dataflow
+ * pipelines. The PubSub topic will also be deleted at this time.
+ */
+ exampleDataflowUtils.mockUnboundedSource(options.getInputFile(), result);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/257a7a6b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java
new file mode 100644
index 0000000..150b60d
--- /dev/null
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java
@@ -0,0 +1,204 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package ${package};
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.DefaultValueFactory;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.transforms.Aggregator;
+import com.google.cloud.dataflow.sdk.transforms.Count;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.Sum;
+import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+
+/**
+ * An example that counts words in Shakespeare and includes Dataflow best practices.
+ *
+ * <p>This class, {@link WordCount}, is the second in a series of four successively more detailed
+ * 'word count' examples. You may first want to take a look at {@link MinimalWordCount}.
+ * After you've looked at this example, then see the {@link DebuggingWordCount}
+ * pipeline, for introduction of additional concepts.
+ *
+ * <p>For a detailed walkthrough of this example, see
+ * <a href="https://cloud.google.com/dataflow/java-sdk/wordcount-example">
+ * https://cloud.google.com/dataflow/java-sdk/wordcount-example
+ * </a>
+ *
+ * <p>Basic concepts, also in the MinimalWordCount example:
+ * Reading text files; counting a PCollection; writing to GCS.
+ *
+ * <p>New Concepts:
+ * <pre>
+ * 1. Executing a Pipeline both locally and using the Dataflow service
+ * 2. Using ParDo with static DoFns defined out-of-line
+ * 3. Building a composite transform
+ * 4. Defining your own pipeline options
+ * </pre>
+ *
+ * <p>Concept #1: you can execute this pipeline either locally or using the Dataflow service.
+ * These are now command-line options and not hard-coded as they were in the MinimalWordCount
+ * example.
+ * To execute this pipeline locally, specify general pipeline configuration:
+ * <pre>{@code
+ * --project=YOUR_PROJECT_ID
+ * }
+ * </pre>
+ * and a local output file or output prefix on GCS:
+ * <pre>{@code
+ * --output=[YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PREFIX]
+ * }</pre>
+ *
+ * <p>To execute this pipeline using the Dataflow service, specify pipeline configuration:
+ * <pre>{@code
+ * --project=YOUR_PROJECT_ID
+ * --stagingLocation=gs://YOUR_STAGING_DIRECTORY
+ * --runner=BlockingDataflowPipelineRunner
+ * }
+ * </pre>
+ * and an output prefix on GCS:
+ * <pre>{@code
+ * --output=gs://YOUR_OUTPUT_PREFIX
+ * }</pre>
+ *
+ * <p>The input file defaults to {@code gs://dataflow-samples/shakespeare/kinglear.txt} and can be
+ * overridden with {@code --inputFile}.
+ */
+public class WordCount {
+
+ /**
+ * Concept #2: You can make your pipeline code less verbose by defining your DoFns statically out-
+ * of-line. This DoFn tokenizes lines of text into individual words; we pass it to a ParDo in the
+ * pipeline.
+ */
+ static class ExtractWordsFn extends DoFn<String, String> {
+ private final Aggregator<Long, Long> emptyLines =
+ createAggregator("emptyLines", new Sum.SumLongFn());
+
+ @Override
+ public void processElement(ProcessContext c) {
+ if (c.element().trim().isEmpty()) {
+ emptyLines.addValue(1L);
+ }
+
+ // Split the line into words.
+ String[] words = c.element().split("[^a-zA-Z']+");
+
+ // Output each word encountered into the output PCollection.
+ for (String word : words) {
+ if (!word.isEmpty()) {
+ c.output(word);
+ }
+ }
+ }
+ }
+
+ /** A DoFn that converts a Word and Count into a printable string. */
+ public static class FormatAsTextFn extends DoFn<KV<String, Long>, String> {
+ @Override
+ public void processElement(ProcessContext c) {
+ c.output(c.element().getKey() + ": " + c.element().getValue());
+ }
+ }
+
+ /**
+ * A PTransform that converts a PCollection containing lines of text into a PCollection of
+ * formatted word counts.
+ *
+ * <p>Concept #3: This is a custom composite transform that bundles two transforms (ParDo and
+ * Count) as a reusable PTransform subclass. Using composite transforms allows for easy reuse,
+ * modular testing, and an improved monitoring experience.
+ */
+ public static class CountWords extends PTransform<PCollection<String>,
+ PCollection<KV<String, Long>>> {
+ @Override
+ public PCollection<KV<String, Long>> apply(PCollection<String> lines) {
+
+ // Convert lines of text into individual words.
+ PCollection<String> words = lines.apply(
+ ParDo.of(new ExtractWordsFn()));
+
+ // Count the number of times each word occurs.
+ PCollection<KV<String, Long>> wordCounts =
+ words.apply(Count.<String>perElement());
+
+ return wordCounts;
+ }
+ }
+
+ /**
+ * Options supported by {@link WordCount}.
+ *
+ * <p>Concept #4: Defining your own configuration options. Here, you can add your own arguments
+ * to be processed by the command-line parser, and specify default values for them. You can then
+ * access the options values in your pipeline code.
+ *
+ * <p>Inherits standard configuration options.
+ */
+ public static interface WordCountOptions extends PipelineOptions {
+ @Description("Path of the file to read from")
+ @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
+ String getInputFile();
+ void setInputFile(String value);
+
+ @Description("Path of the file to write to")
+ @Default.InstanceFactory(OutputFactory.class)
+ String getOutput();
+ void setOutput(String value);
+
+ /**
+ * Returns "gs://${YOUR_STAGING_DIRECTORY}/counts.txt" as the default destination.
+ */
+ public static class OutputFactory implements DefaultValueFactory<String> {
+ @Override
+ public String create(PipelineOptions options) {
+ DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
+ if (dataflowOptions.getStagingLocation() != null) {
+ return GcsPath.fromUri(dataflowOptions.getStagingLocation())
+ .resolve("counts.txt").toString();
+ } else {
+ throw new IllegalArgumentException("Must specify --output or --stagingLocation");
+ }
+ }
+ }
+
+ }
+
+ public static void main(String[] args) {
+ WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
+ .as(WordCountOptions.class);
+ Pipeline p = Pipeline.create(options);
+
+ // Concepts #2 and #3: Our pipeline applies the composite CountWords transform, and passes the
+ // static FormatAsTextFn() to the ParDo transform.
+ p.apply(TextIO.Read.named("ReadLines").from(options.getInputFile()))
+ .apply(new CountWords())
+ .apply(ParDo.of(new FormatAsTextFn()))
+ .apply(TextIO.Write.named("WriteCounts").to(options.getOutput()));
+
+ p.run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/257a7a6b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleOptions.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleOptions.java
new file mode 100644
index 0000000..e182f4c
--- /dev/null
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleOptions.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package ${package}.common;
+
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+
+/**
+ * Options that can be used to configure the Dataflow examples.
+ */
+public interface DataflowExampleOptions extends DataflowPipelineOptions {
+ @Description("Whether to keep jobs running on the Dataflow service after local process exit")
+ @Default.Boolean(false)
+ boolean getKeepJobsRunning();
+ void setKeepJobsRunning(boolean keepJobsRunning);
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/257a7a6b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java
new file mode 100644
index 0000000..9861769
--- /dev/null
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java
@@ -0,0 +1,398 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package ${package}.common;
+
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.client.googleapis.services.AbstractGoogleClientRequest;
+import com.google.api.services.bigquery.Bigquery;
+import com.google.api.services.bigquery.Bigquery.Datasets;
+import com.google.api.services.bigquery.Bigquery.Tables;
+import com.google.api.services.bigquery.model.Dataset;
+import com.google.api.services.bigquery.model.DatasetReference;
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.api.services.dataflow.Dataflow;
+import com.google.api.services.pubsub.Pubsub;
+import com.google.api.services.pubsub.model.Topic;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.PipelineResult;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.options.BigQueryOptions;
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.runners.DataflowPipelineJob;
+import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
+import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
+import com.google.cloud.dataflow.sdk.transforms.IntraBundleParallelization;
+import com.google.cloud.dataflow.sdk.util.MonitoringUtil;
+import com.google.cloud.dataflow.sdk.util.Transport;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import javax.servlet.http.HttpServletResponse;
+
+/**
+ * The utility class that sets up and tears down external resources, starts the Google Cloud Pub/Sub
+ * injector, and cancels the streaming and the injector pipelines once the program terminates.
+ *
+ * <p>It is used to run Dataflow examples, such as TrafficMaxLaneFlow and TrafficRoutes.
+ */
+public class DataflowExampleUtils {
+
+ private final DataflowPipelineOptions options;
+ private Bigquery bigQueryClient = null;
+ private Pubsub pubsubClient = null;
+ private Dataflow dataflowClient = null;
+ private Set<DataflowPipelineJob> jobsToCancel = Sets.newHashSet();
+ private List<String> pendingMessages = Lists.newArrayList();
+
+ /**
+ * Define an interface that supports the PubSub and BigQuery example options.
+ */
+ public static interface DataflowExampleUtilsOptions
+ extends DataflowExampleOptions, ExamplePubsubTopicOptions, ExampleBigQueryTableOptions {
+ }
+
+ public DataflowExampleUtils(DataflowPipelineOptions options) {
+ this.options = options;
+ }
+
+ /**
+ * Do resources and runner options setup.
+ */
+ public DataflowExampleUtils(DataflowPipelineOptions options, boolean isUnbounded)
+ throws IOException {
+ this.options = options;
+ setupResourcesAndRunner(isUnbounded);
+ }
+
+ /**
+ * Sets up external resources that are required by the example,
+ * such as Pub/Sub topics and BigQuery tables.
+ *
+ * @throws IOException if there is a problem setting up the resources
+ */
+ public void setup() throws IOException {
+ setupPubsubTopic();
+ setupBigQueryTable();
+ }
+
+ /**
+ * Set up external resources, and configure the runner appropriately.
+ */
+ public void setupResourcesAndRunner(boolean isUnbounded) throws IOException {
+ if (isUnbounded) {
+ options.setStreaming(true);
+ }
+ setup();
+ setupRunner();
+ }
+
+ /**
+ * Sets up the Google Cloud Pub/Sub topic.
+ *
+ * <p>If the topic doesn't exist, a new topic with the given name will be created.
+ *
+ * @throws IOException if there is a problem setting up the Pub/Sub topic
+ */
+ public void setupPubsubTopic() throws IOException {
+ ExamplePubsubTopicOptions pubsubTopicOptions = options.as(ExamplePubsubTopicOptions.class);
+ if (!pubsubTopicOptions.getPubsubTopic().isEmpty()) {
+ pendingMessages.add("*******************Set Up Pubsub Topic*********************");
+ setupPubsubTopic(pubsubTopicOptions.getPubsubTopic());
+ pendingMessages.add("The Pub/Sub topic has been set up for this example: "
+ + pubsubTopicOptions.getPubsubTopic());
+ }
+ }
+
+ /**
+ * Sets up the BigQuery table with the given schema.
+ *
+ * <p>If the table already exists, the schema has to match the given one. Otherwise, the example
+ * will throw a RuntimeException. If the table doesn't exist, a new table with the given schema
+ * will be created.
+ *
+ * @throws IOException if there is a problem setting up the BigQuery table
+ */
+ public void setupBigQueryTable() throws IOException {
+ ExampleBigQueryTableOptions bigQueryTableOptions =
+ options.as(ExampleBigQueryTableOptions.class);
+ if (bigQueryTableOptions.getBigQueryDataset() != null
+ && bigQueryTableOptions.getBigQueryTable() != null
+ && bigQueryTableOptions.getBigQuerySchema() != null) {
+ pendingMessages.add("******************Set Up Big Query Table*******************");
+ setupBigQueryTable(bigQueryTableOptions.getProject(),
+ bigQueryTableOptions.getBigQueryDataset(),
+ bigQueryTableOptions.getBigQueryTable(),
+ bigQueryTableOptions.getBigQuerySchema());
+ pendingMessages.add("The BigQuery table has been set up for this example: "
+ + bigQueryTableOptions.getProject()
+ + ":" + bigQueryTableOptions.getBigQueryDataset()
+ + "." + bigQueryTableOptions.getBigQueryTable());
+ }
+ }
+
+ /**
+ * Tears down external resources that can be deleted upon the example's completion.
+ */
+ private void tearDown() {
+ pendingMessages.add("*************************Tear Down*************************");
+ ExamplePubsubTopicOptions pubsubTopicOptions = options.as(ExamplePubsubTopicOptions.class);
+ if (!pubsubTopicOptions.getPubsubTopic().isEmpty()) {
+ try {
+ deletePubsubTopic(pubsubTopicOptions.getPubsubTopic());
+ pendingMessages.add("The Pub/Sub topic has been deleted: "
+ + pubsubTopicOptions.getPubsubTopic());
+ } catch (IOException e) {
+ pendingMessages.add("Failed to delete the Pub/Sub topic : "
+ + pubsubTopicOptions.getPubsubTopic());
+ }
+ }
+
+ ExampleBigQueryTableOptions bigQueryTableOptions =
+ options.as(ExampleBigQueryTableOptions.class);
+ if (bigQueryTableOptions.getBigQueryDataset() != null
+ && bigQueryTableOptions.getBigQueryTable() != null
+ && bigQueryTableOptions.getBigQuerySchema() != null) {
+ pendingMessages.add("The BigQuery table might contain the example's output, "
+ + "and it is not deleted automatically: "
+ + bigQueryTableOptions.getProject()
+ + ":" + bigQueryTableOptions.getBigQueryDataset()
+ + "." + bigQueryTableOptions.getBigQueryTable());
+ pendingMessages.add("Please go to the Developers Console to delete it manually."
+ + " Otherwise, you may be charged for its usage.");
+ }
+ }
+
+ private void setupBigQueryTable(String projectId, String datasetId, String tableId,
+ TableSchema schema) throws IOException {
+ if (bigQueryClient == null) {
+ bigQueryClient = Transport.newBigQueryClient(options.as(BigQueryOptions.class)).build();
+ }
+
+ Datasets datasetService = bigQueryClient.datasets();
+ if (executeNullIfNotFound(datasetService.get(projectId, datasetId)) == null) {
+ Dataset newDataset = new Dataset().setDatasetReference(
+ new DatasetReference().setProjectId(projectId).setDatasetId(datasetId));
+ datasetService.insert(projectId, newDataset).execute();
+ }
+
+ Tables tableService = bigQueryClient.tables();
+ Table table = executeNullIfNotFound(tableService.get(projectId, datasetId, tableId));
+ if (table == null) {
+ Table newTable = new Table().setSchema(schema).setTableReference(
+ new TableReference().setProjectId(projectId).setDatasetId(datasetId).setTableId(tableId));
+ tableService.insert(projectId, datasetId, newTable).execute();
+ } else if (!table.getSchema().equals(schema)) {
+ throw new RuntimeException(
+ "Table exists and schemas do not match, expecting: " + schema.toPrettyString()
+ + ", actual: " + table.getSchema().toPrettyString());
+ }
+ }
+
+ private void setupPubsubTopic(String topic) throws IOException {
+ if (pubsubClient == null) {
+ pubsubClient = Transport.newPubsubClient(options).build();
+ }
+ if (executeNullIfNotFound(pubsubClient.projects().topics().get(topic)) == null) {
+ pubsubClient.projects().topics().create(topic, new Topic().setName(topic)).execute();
+ }
+ }
+
+ /**
+ * Deletes the Google Cloud Pub/Sub topic.
+ *
+ * @throws IOException if there is a problem deleting the Pub/Sub topic
+ */
+ private void deletePubsubTopic(String topic) throws IOException {
+ if (pubsubClient == null) {
+ pubsubClient = Transport.newPubsubClient(options).build();
+ }
+ if (executeNullIfNotFound(pubsubClient.projects().topics().get(topic)) != null) {
+ pubsubClient.projects().topics().delete(topic).execute();
+ }
+ }
+
+ /**
+ * If this is an unbounded (streaming) pipeline, and both inputFile and pubsub topic are defined,
+ * start an 'injector' pipeline that publishes the contents of the file to the given topic, first
+ * creating the topic if necessary.
+ */
+ public void startInjectorIfNeeded(String inputFile) {
+ ExamplePubsubTopicOptions pubsubTopicOptions = options.as(ExamplePubsubTopicOptions.class);
+ if (pubsubTopicOptions.isStreaming()
+ && inputFile != null && !inputFile.isEmpty()
+ && pubsubTopicOptions.getPubsubTopic() != null
+ && !pubsubTopicOptions.getPubsubTopic().isEmpty()) {
+ runInjectorPipeline(inputFile, pubsubTopicOptions.getPubsubTopic());
+ }
+ }
+
+ /**
+ * Do some runner setup: check that the DirectPipelineRunner is not used in conjunction with
+ * streaming, and if streaming is specified, use the DataflowPipelineRunner. Return the streaming
+ * flag value.
+ */
+ public void setupRunner() {
+ if (options.isStreaming()) {
+ if (options.getRunner() == DirectPipelineRunner.class) {
+ throw new IllegalArgumentException(
+ "Processing of unbounded input sources is not supported with the DirectPipelineRunner.");
+ }
+ // In order to cancel the pipelines automatically,
+ // {@literal DataflowPipelineRunner} is forced to be used.
+ options.setRunner(DataflowPipelineRunner.class);
+ }
+ }
+
+ /**
+ * Runs the batch injector for the streaming pipeline.
+ *
+ * <p>The injector pipeline will read from the given text file, and inject data
+ * into the Google Cloud Pub/Sub topic.
+ */
+ public void runInjectorPipeline(String inputFile, String topic) {
+ DataflowPipelineOptions copiedOptions = options.cloneAs(DataflowPipelineOptions.class);
+ copiedOptions.setStreaming(false);
+ copiedOptions.setNumWorkers(
+ options.as(ExamplePubsubTopicOptions.class).getInjectorNumWorkers());
+ copiedOptions.setJobName(options.getJobName() + "-injector");
+ Pipeline injectorPipeline = Pipeline.create(copiedOptions);
+ injectorPipeline.apply(TextIO.Read.from(inputFile))
+ .apply(IntraBundleParallelization
+ .of(PubsubFileInjector.publish(topic))
+ .withMaxParallelism(20));
+ DataflowPipelineJob injectorJob = (DataflowPipelineJob) injectorPipeline.run();
+ jobsToCancel.add(injectorJob);
+ }
+
+ /**
+ * Runs the provided injector pipeline for the streaming pipeline.
+ */
+ public void runInjectorPipeline(Pipeline injectorPipeline) {
+ DataflowPipelineJob injectorJob = (DataflowPipelineJob) injectorPipeline.run();
+ jobsToCancel.add(injectorJob);
+ }
+
+ /**
+ * Start the auxiliary injector pipeline, then wait for this pipeline to finish.
+ */
+ public void mockUnboundedSource(String inputFile, PipelineResult result) {
+ startInjectorIfNeeded(inputFile);
+ waitToFinish(result);
+ }
+
+ /**
+ * If {@literal DataflowPipelineRunner} or {@literal BlockingDataflowPipelineRunner} is used,
+ * waits for the pipeline to finish and cancels it (and the injector) before the program exists.
+ */
+ public void waitToFinish(PipelineResult result) {
+ if (result instanceof DataflowPipelineJob) {
+ final DataflowPipelineJob job = (DataflowPipelineJob) result;
+ jobsToCancel.add(job);
+ if (!options.as(DataflowExampleOptions.class).getKeepJobsRunning()) {
+ addShutdownHook(jobsToCancel);
+ }
+ try {
+ job.waitToFinish(-1, TimeUnit.SECONDS, new MonitoringUtil.PrintHandler(System.out));
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to wait for job to finish: " + job.getJobId());
+ }
+ } else {
+ // Do nothing if the given PipelineResult doesn't support waitToFinish(),
+ // such as EvaluationResults returned by DirectPipelineRunner.
+ }
+ }
+
+ private void addShutdownHook(final Collection<DataflowPipelineJob> jobs) {
+ if (dataflowClient == null) {
+ dataflowClient = options.getDataflowClient();
+ }
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ tearDown();
+ printPendingMessages();
+ for (DataflowPipelineJob job : jobs) {
+ System.out.println("Canceling example pipeline: " + job.getJobId());
+ try {
+ job.cancel();
+ } catch (IOException e) {
+ System.out.println("Failed to cancel the job,"
+ + " please go to the Developers Console to cancel it manually");
+ System.out.println(
+ MonitoringUtil.getJobMonitoringPageURL(job.getProjectId(), job.getJobId()));
+ }
+ }
+
+ for (DataflowPipelineJob job : jobs) {
+ boolean cancellationVerified = false;
+ for (int retryAttempts = 6; retryAttempts > 0; retryAttempts--) {
+ if (job.getState().isTerminal()) {
+ cancellationVerified = true;
+ System.out.println("Canceled example pipeline: " + job.getJobId());
+ break;
+ } else {
+ System.out.println(
+ "The example pipeline is still running. Verifying the cancellation.");
+ }
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ }
+ if (!cancellationVerified) {
+ System.out.println("Failed to verify the cancellation for job: " + job.getJobId());
+ System.out.println("Please go to the Developers Console to verify manually:");
+ System.out.println(
+ MonitoringUtil.getJobMonitoringPageURL(job.getProjectId(), job.getJobId()));
+ }
+ }
+ }
+ });
+ }
+
+ private void printPendingMessages() {
+ System.out.println();
+ System.out.println("***********************************************************");
+ System.out.println("***********************************************************");
+ for (String message : pendingMessages) {
+ System.out.println(message);
+ }
+ System.out.println("***********************************************************");
+ System.out.println("***********************************************************");
+ }
+
+ private static <T> T executeNullIfNotFound(
+ AbstractGoogleClientRequest<T> request) throws IOException {
+ try {
+ return request.execute();
+ } catch (GoogleJsonResponseException e) {
+ if (e.getStatusCode() == HttpServletResponse.SC_NOT_FOUND) {
+ return null;
+ } else {
+ throw e;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/257a7a6b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleBigQueryTableOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleBigQueryTableOptions.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleBigQueryTableOptions.java
new file mode 100644
index 0000000..bef5bfd
--- /dev/null
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleBigQueryTableOptions.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package ${package}.common;
+
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.DefaultValueFactory;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+
+/**
+ * Options that can be used to configure BigQuery tables in Dataflow examples.
+ * The project defaults to the project being used to run the example.
+ */
+public interface ExampleBigQueryTableOptions extends DataflowPipelineOptions {
+ @Description("BigQuery dataset name")
+ @Default.String("dataflow_examples")
+ String getBigQueryDataset();
+ void setBigQueryDataset(String dataset);
+
+ @Description("BigQuery table name")
+ @Default.InstanceFactory(BigQueryTableFactory.class)
+ String getBigQueryTable();
+ void setBigQueryTable(String table);
+
+ @Description("BigQuery table schema")
+ TableSchema getBigQuerySchema();
+ void setBigQuerySchema(TableSchema schema);
+
+ /**
+ * Returns the job name as the default BigQuery table name.
+ */
+ static class BigQueryTableFactory implements DefaultValueFactory<String> {
+ @Override
+ public String create(PipelineOptions options) {
+ return options.as(DataflowPipelineOptions.class).getJobName()
+ .replace('-', '_');
+ }
+ }
+}