You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/10/09 21:35:43 UTC

[2/6] incubator-streams-examples git commit: flink example

flink example


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/commit/4491cfe1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/tree/4491cfe1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/diff/4491cfe1

Branch: refs/heads/master
Commit: 4491cfe1d0bf7324073537e89e7e8b6ed8ab43d5
Parents: b3429dd
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Mon Sep 26 12:43:22 2016 -0500
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Mon Sep 26 12:43:22 2016 -0500

----------------------------------------------------------------------
 flink/flink-twitter-collection/README.md        |    8 +
 flink/flink-twitter-collection/pom.xml          |  420 ++++++++
 .../jsonschema/FlinkBatchConfiguration.json     |   12 +
 .../jsonschema/FlinkStreamingConfiguration.json |   40 +
 .../jsonschema/StreamsFlinkConfiguration.json   |   48 +
 .../TwitterFollowingPipelineConfiguration.json  |   29 +
 .../TwitterPostsPipelineConfiguration.json      |   29 +
 ...terUserInformationPipelineConfiguration.json |   29 +
 .../streams/examples/flink/FlinkBase.scala      |  200 ++++
 .../FlinkTwitterFollowingPipeline.scala         |  149 +++
 .../collection/FlinkTwitterPostsPipeline.scala  |  165 +++
 .../FlinkTwitterUserInformationPipeline.scala   |  163 +++
 .../markdown/FlinkTwitterFollowingPipeline.md   |   41 +
 .../site/markdown/FlinkTwitterPostsPipeline.md  |   41 +
 .../FlinkTwitterUserInformationPipeline.md      |   41 +
 .../src/site/markdown/index.md                  |   32 +
 .../site/resources/FlinkBatchConfiguration.json |   12 +
 .../resources/FlinkStreamingConfiguration.json  |   40 +
 .../resources/StreamsFlinkConfiguration.json    |   48 +
 .../TwitterFollowingBatchConfiguration.json     |   23 +
 .../TwitterFollowingPipelineConfiguration.json  |   29 +
 .../TwitterPostsBatchConfiguration.json         |   23 +
 .../TwitterPostsPipelineConfiguration.json      |   29 +
 ...witterUserInformationBatchConfiguration.json |   23 +
 ...terUserInformationPipelineConfiguration.json |   29 +
 .../src/test/resources/1000twitterids.txt       | 1000 ++++++++++++++++++
 .../FlinkTwitterFollowingPipeline.conf          |   10 +
 .../resources/FlinkTwitterPostsPipeline.conf    |   10 +
 .../FlinkTwitterUserInformationPipeline.conf    |   10 +
 .../src/test/resources/asf.txt                  |    1 +
 .../test/FlinkTwitterFollowingPipelineIT.scala  |   81 ++
 .../test/FlinkTwitterPostsPipelineIT.scala      |   55 +
 .../FlinkTwitterUserInformationPipelineIT.scala |   56 +
 flink/pom.xml                                   |   47 +
 pom.xml                                         |   29 +-
 35 files changed, 2988 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/README.md
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/README.md b/flink/flink-twitter-collection/README.md
new file mode 100644
index 0000000..f9fe687
--- /dev/null
+++ b/flink/flink-twitter-collection/README.md
@@ -0,0 +1,8 @@
+Apache Streams (incubating)
+Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0
+--------------------------------------------------------------------------------
+
+org.apache.streams:flink-twitter-collection
+===========================================
+
+[README.md](src/site/markdown/index.md "README")

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/pom.xml
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/pom.xml b/flink/flink-twitter-collection/pom.xml
new file mode 100644
index 0000000..33b05fe
--- /dev/null
+++ b/flink/flink-twitter-collection/pom.xml
@@ -0,0 +1,420 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.streams</groupId>
+        <artifactId>streams-examples-flink</artifactId>
+        <version>0.4-incubating-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>flink-twitter-collection</artifactId>
+    <name>flink-twitter-collection</name>
+
+    <description>Collects twitter documents using flink.</description>
+
+    <properties>
+        <docker.repo>apachestreams</docker.repo>
+        <hdfs.version>2.7.0</hdfs.version>
+        <flink.version>1.1.2</flink.version>
+        <scala.suffix>2.10</scala.suffix>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-all</artifactId>
+            <version>1.3</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.typesafe</groupId>
+            <artifactId>config</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-config</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-util</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-pojo</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-provider-twitter</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-persist-hdfs</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-logging</groupId>
+                    <artifactId>commons-logging</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-hdfs</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs</artifactId>
+            <version>${hdfs.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>commons-logging</groupId>
+                    <artifactId>commons-logging</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-java</artifactId>
+            <version>${flink.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-logging</groupId>
+                    <artifactId>commons-logging</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.zookeeper</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-clients_${scala.suffix}</artifactId>
+            <version>${flink.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-logging</groupId>
+                    <artifactId>commons-logging</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.zookeeper</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-scala_${scala.suffix}</artifactId>
+            <version>${flink.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-logging</groupId>
+                    <artifactId>commons-logging</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.zookeeper</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-filesystem_2.10</artifactId>
+            <version>${flink.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-logging</groupId>
+                    <artifactId>commons-logging</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-scala_${scala.suffix}</artifactId>
+            <version>${flink.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-logging</groupId>
+                    <artifactId>commons-logging</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-metrics-core</artifactId>
+            <version>${flink.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-logging</groupId>
+                    <artifactId>commons-logging</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.zookeeper</groupId>
+            <artifactId>zookeeper</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-logging</groupId>
+                    <artifactId>commons-logging</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>log4j-over-slf4j</artifactId>
+            <version>${slf4j.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>jcl-over-slf4j</artifactId>
+            <version>${slf4j.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>jul-to-slf4j</artifactId>
+            <version>${slf4j.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+            <version>${logback.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-core</artifactId>
+            <version>${logback.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <sourceDirectory>src/main/scala</sourceDirectory>
+        <testSourceDirectory>src/test/scala</testSourceDirectory>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+            </resource>
+        </resources>
+        <testResources>
+            <testResource>
+                <directory>src/test/resources</directory>
+            </testResource>
+        </testResources>
+        <plugins>
+            <plugin>
+                <artifactId>maven-clean-plugin</artifactId>
+                <configuration>
+                    <filesets>
+                        <fileset>
+                            <directory>data</directory>
+                            <followSymlinks>false</followSymlinks>
+                        </fileset>
+                    </filesets>
+                </configuration>
+            </plugin>
+            <!-- This binary runs with logback -->
+            <!-- Keep log4j out -->
+            <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-enforcer-plugin</artifactId>
+            <version>1.3.1</version>
+            <executions>
+                <execution>
+                    <id>enforce-banned-dependencies</id>
+                    <goals>
+                        <goal>enforce</goal>
+                    </goals>
+                    <configuration>
+                        <rules>
+                            <bannedDependencies>
+                                <excludes>
+                                    <exclude>org.slf4j:slf4j-log4j12</exclude>
+                                    <exclude>org.slf4j:slf4j-jcl</exclude>
+                                    <exclude>org.slf4j:slf4j-jdk14</exclude>
+                                    <exclude>org.log4j:log4j</exclude>
+                                    <exclude>commons-logging:commons-logging</exclude>
+                                </excludes>
+                            </bannedDependencies>
+                        </rules>
+                        <fail>true</fail>
+                    </configuration>
+                </execution>
+            </executions>
+        </plugin>
+        <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-shade-plugin</artifactId>
+        </plugin>
+        <plugin>
+            <groupId>org.jsonschema2pojo</groupId>
+            <artifactId>jsonschema2pojo-maven-plugin</artifactId>
+            <version>0.4.1</version>
+            <configuration>
+                <addCompileSourceRoot>true</addCompileSourceRoot>
+                <generateBuilders>true</generateBuilders>
+                <sourcePaths>
+                    <sourcePath>src/main/jsonschema</sourcePath>
+                </sourcePaths>
+                <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
+                <targetPackage>org.apache.streams.example.elasticsearch</targetPackage>
+                <useJodaDates>false</useJodaDates>
+            </configuration>
+            <executions>
+                <execution>
+                    <goals>
+                        <goal>generate</goal>
+                    </goals>
+                </execution>
+            </executions>
+        </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>add-source</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>target/generated-sources/jsonschema2pojo</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <configuration>
+                    <includes>**/*.json</includes>
+                    <outputDirectory>${project.build.directory}/test-classes</outputDirectory>
+                    <includeGroupIds>org.apache.streams</includeGroupIds>
+                    <includeTypes>test-jar</includeTypes>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>test-resource-dependencies</id>
+                        <phase>process-test-resources</phase>
+                        <goals>
+                            <goal>unpack-dependencies</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-failsafe-plugin</artifactId>
+                <version>2.12.4</version>
+                <executions>
+                    <execution>
+                        <id>integration-tests</id>
+                        <goals>
+                            <goal>integration-test</goal>
+                            <goal>verify</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>io.fabric8</groupId>
+                <artifactId>docker-maven-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/main/jsonschema/FlinkBatchConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/jsonschema/FlinkBatchConfiguration.json b/flink/flink-twitter-collection/src/main/jsonschema/FlinkBatchConfiguration.json
new file mode 100644
index 0000000..30a2942
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/jsonschema/FlinkBatchConfiguration.json
@@ -0,0 +1,12 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "type": "object",
+  "javaType" : "org.apache.streams.flink.FlinkBatchConfiguration",
+  "javaInterfaces": ["java.io.Serializable"],
+  "extends": {
+    "$ref": "StreamsFlinkConfiguration.json"
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/main/jsonschema/FlinkStreamingConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/jsonschema/FlinkStreamingConfiguration.json b/flink/flink-twitter-collection/src/main/jsonschema/FlinkStreamingConfiguration.json
new file mode 100644
index 0000000..0d63f4e
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/jsonschema/FlinkStreamingConfiguration.json
@@ -0,0 +1,40 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "type": "object",
+  "javaType" : "org.apache.streams.flink.FlinkStreamingConfiguration",
+  "javaInterfaces": ["java.io.Serializable"],
+  "extends": {
+    "$ref": "StreamsFlinkConfiguration.json"
+  },
+  "properties": {
+    "parallel": {
+      "type": "integer",
+      "default": 1
+    },
+    "providerWaitMs": {
+      "type": "integer",
+      "default": 1000
+    },
+    "checkpointIntervalMs": {
+      "type": "integer",
+      "default": 300000
+    },
+    "checkpointTimeoutMs": {
+      "type": "integer",
+      "default": 30000
+    },
+    "restartAttempts": {
+      "type": "integer",
+      "description": "number of restart attempts",
+      "default": 3
+    },
+    "restartDelayMs": {
+      "type": "integer",
+      "description": "delay in milliseconds",
+      "default": 10000
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/main/jsonschema/StreamsFlinkConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/jsonschema/StreamsFlinkConfiguration.json b/flink/flink-twitter-collection/src/main/jsonschema/StreamsFlinkConfiguration.json
new file mode 100644
index 0000000..ef78357
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/jsonschema/StreamsFlinkConfiguration.json
@@ -0,0 +1,48 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "type": "object",
+  "javaType" : "org.apache.streams.flink.StreamsFlinkConfiguration",
+  "javaInterfaces": ["java.io.Serializable"],
+  "extends": {
+    "$ref": "http://streams.peoplepattern.com/incubator-streams/0.3.9-PP-SNAPSHOT/streams-config/StreamsConfiguration.json"
+  },
+  "properties": {
+    "parallel": {
+      "type": "integer",
+      "default": 1
+    },
+    "providerWaitMs": {
+      "type": "integer",
+      "default": 1000
+    },
+    "checkpointIntervalMs": {
+      "type": "integer",
+      "default": 300000
+    },
+    "checkpointTimeoutMs": {
+      "type": "integer",
+      "default": 30000
+    },
+    "test": {
+      "type": "boolean",
+      "default": false
+    },
+    "local": {
+      "type": "boolean",
+      "default": true
+    },
+    "restartAttempts": {
+      "type": "integer",
+      "description": "number of restart attempts",
+      "default": 3
+    },
+    "restartDelayMs": {
+      "type": "integer",
+      "description": "delay in milliseconds",
+      "default": 10000
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/main/jsonschema/TwitterFollowingPipelineConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/jsonschema/TwitterFollowingPipelineConfiguration.json b/flink/flink-twitter-collection/src/main/jsonschema/TwitterFollowingPipelineConfiguration.json
new file mode 100644
index 0000000..de4f9bb
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/jsonschema/TwitterFollowingPipelineConfiguration.json
@@ -0,0 +1,29 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "type": "object",
+  "javaType" : "org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration",
+  "javaInterfaces": ["java.io.Serializable"],
+  "extends": {
+    "$ref": "FlinkStreamingConfiguration.json"
+  },
+  "properties": {
+    "twitter": {
+      "type": "object",
+      "javaType": "org.apache.streams.twitter.TwitterFollowingConfiguration"
+    },
+    "source": {
+      "type": "object",
+      "javaType": "org.apache.streams.hdfs.HdfsReaderConfiguration"
+    },
+    "destination": {
+      "type": "object",
+      "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration"
+    },
+    "providerWaitMs": {
+      "type": "integer"
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/main/jsonschema/TwitterPostsPipelineConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/jsonschema/TwitterPostsPipelineConfiguration.json b/flink/flink-twitter-collection/src/main/jsonschema/TwitterPostsPipelineConfiguration.json
new file mode 100644
index 0000000..628d7ee
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/jsonschema/TwitterPostsPipelineConfiguration.json
@@ -0,0 +1,29 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "type": "object",
+  "javaType" : "org.apache.streams.examples.flink.twitter.TwitterPostsPipelineConfiguration",
+  "javaInterfaces": ["java.io.Serializable"],
+  "extends": {
+    "$ref": "FlinkStreamingConfiguration.json"
+  },
+  "properties": {
+    "twitter": {
+      "type": "object",
+      "javaType": "org.apache.streams.twitter.TwitterUserInformationConfiguration"
+    },
+    "source": {
+      "type": "object",
+      "javaType": "org.apache.streams.hdfs.HdfsReaderConfiguration"
+    },
+    "destination": {
+      "type": "object",
+      "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration"
+    },
+    "providerWaitMs": {
+      "type": "integer"
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/main/jsonschema/TwitterUserInformationPipelineConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/jsonschema/TwitterUserInformationPipelineConfiguration.json b/flink/flink-twitter-collection/src/main/jsonschema/TwitterUserInformationPipelineConfiguration.json
new file mode 100644
index 0000000..5261748
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/jsonschema/TwitterUserInformationPipelineConfiguration.json
@@ -0,0 +1,29 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "type": "object",
+  "javaType" : "org.apache.streams.examples.flink.twitter.TwitterUserInformationPipelineConfiguration",
+  "javaInterfaces": ["java.io.Serializable"],
+  "extends": {
+    "$ref": "FlinkStreamingConfiguration.json"
+  },
+  "properties": {
+    "twitter": {
+      "type": "object",
+      "javaType": "org.apache.streams.twitter.TwitterUserInformationConfiguration"
+    },
+    "source": {
+      "type": "object",
+      "javaType": "org.apache.streams.hdfs.HdfsReaderConfiguration"
+    },
+    "destination": {
+      "type": "object",
+      "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration"
+    },
+    "providerWaitMs": {
+      "type": "integer"
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
new file mode 100644
index 0000000..1f1ed6d
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
@@ -0,0 +1,200 @@
+package org.apache.streams.examples.flink
+
+import java.net.MalformedURLException
+
+import com.google.common.base.Strings
+import com.typesafe.config.Config
+import org.apache.flink.api.common.restartstrategy.RestartStrategies
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.streaming.api.CheckpointingMode
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
+import org.apache.streams.flink.{FlinkBatchConfiguration, FlinkStreamingConfiguration, StreamsFlinkConfiguration}
+import org.apache.streams.hdfs.{HdfsConfiguration, HdfsReaderConfiguration, HdfsWriterConfiguration}
+import org.apache.streams.jackson.StreamsJacksonMapper
+import org.slf4j.LoggerFactory
+
+trait FlinkBase {
+
+  private val BASELOGGER = LoggerFactory.getLogger("FlinkBase")
+  private val MAPPER = StreamsJacksonMapper.getInstance()
+
+  var configUrl : String = _
+  var typesafe : Config = _
+  var streamsConfig = StreamsConfigurator.detectConfiguration(StreamsConfigurator.getConfig)
+  var streamsFlinkConfiguration: StreamsFlinkConfiguration = _
+
+  var executionEnvironment: ExecutionEnvironment = _
+  var streamExecutionEnvironment: StreamExecutionEnvironment = _
+
+  /*
+   Basic stuff for every flink job
+   */
+  def main(args: Array[String]): Unit = {
+    // if only one argument, use it as the config URL
+    if( args.size > 0 ) {
+      BASELOGGER.info("Args: {}", args)
+      configUrl = args(0)
+      setup(configUrl)
+    }
+
+  }
+
+  def setup(configUrl : String): Boolean =  {
+    BASELOGGER.info("StreamsConfigurator.config: {}", StreamsConfigurator.config)
+    if( !Strings.isNullOrEmpty(configUrl)) {
+      BASELOGGER.info("StreamsConfigurator.resolveConfig(configUrl): {}", StreamsConfigurator.resolveConfig(configUrl))
+      try {
+        typesafe = StreamsConfigurator.resolveConfig(configUrl).withFallback(StreamsConfigurator.config).resolve()
+      } catch {
+        case mue: MalformedURLException => {
+          BASELOGGER.error("Invalid Configuration URL: ", mue)
+          return false
+        }
+        case e: Exception => {
+          BASELOGGER.error("Invalid Configuration URL: ", e)
+          return false
+        }
+      }
+    }
+    else {
+      typesafe = StreamsConfigurator.getConfig
+    }
+
+    return setup(typesafe)
+
+  }
+
+  def setup(typesafe : Config): Boolean =  {
+    this.typesafe = typesafe
+
+    BASELOGGER.info("Typesafe Config: {}", typesafe)
+
+    if( this.typesafe.getString("mode").equals("streaming")) {
+      val streamingConfiguration: FlinkStreamingConfiguration =
+        new ComponentConfigurator[FlinkStreamingConfiguration](classOf[FlinkStreamingConfiguration]).detectConfiguration(typesafe)
+      return setupStreaming(streamingConfiguration)
+    } else if( this.typesafe.getString("mode").equals("batch")) {
+      val batchConfiguration: FlinkBatchConfiguration =
+        new ComponentConfigurator[FlinkBatchConfiguration](classOf[FlinkBatchConfiguration]).detectConfiguration(typesafe)
+      return setupBatch(batchConfiguration)
+    } else {
+      return false;
+    }
+  }
+
+//  def setup(typesafe: Config): Boolean =  {
+//
+//    val streamsConfig = StreamsConfigurator.detectConfiguration(typesafe)
+//
+//    this.streamsConfig = streamsConfig
+//
+//    BASELOGGER.info("Streams Config: " + streamsConfig)
+//
+//    setup(streamsConfig)
+//  }
+
+  def setupStreaming(streamingConfiguration: FlinkStreamingConfiguration): Boolean = {
+
+    BASELOGGER.info("FsStreamingFlinkConfiguration: " + streamingConfiguration)
+
+    this.streamsFlinkConfiguration = streamingConfiguration
+
+    if( streamsFlinkConfiguration == null) return false
+
+    if( streamExecutionEnvironment == null )
+      streamExecutionEnvironment = streamEnvironment(streamingConfiguration)
+
+    return false
+
+  }
+
+  def setupBatch(batchConfiguration: FlinkBatchConfiguration): Boolean =  {
+
+    BASELOGGER.info("FsBatchFlinkConfiguration: " + batchConfiguration)
+
+    this.streamsFlinkConfiguration = batchConfiguration
+
+    if( streamsFlinkConfiguration == null) return false
+
+    if( executionEnvironment == null )
+      executionEnvironment = batchEnvironment(batchConfiguration)
+
+    return true
+
+  }
+
+  def batchEnvironment(config: FlinkBatchConfiguration = new FlinkBatchConfiguration()) : ExecutionEnvironment = {
+    if (config.getTest == false && config.getLocal == false) {
+      val env = ExecutionEnvironment.getExecutionEnvironment
+      return env
+    } else {
+      val env = ExecutionEnvironment.createLocalEnvironment(config.getParallel.toInt)
+      return env
+    }
+  }
+
+  def streamEnvironment(config: FlinkStreamingConfiguration = new FlinkStreamingConfiguration()) : StreamExecutionEnvironment = {
+    if( config.getTest == false && config.getLocal == false) {
+      val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+      env.setRestartStrategy(RestartStrategies.noRestart());
+
+      // start a checkpoint every hour
+      env.enableCheckpointing(config.getCheckpointIntervalMs)
+
+      env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
+
+      // checkpoints have to complete within five minutes, or are discarded
+      env.getCheckpointConfig.setCheckpointTimeout(config.getCheckpointTimeoutMs)
+
+      // allow only one checkpoint to be in progress at the same time
+      env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
+
+      return env
+    }
+
+    else return StreamExecutionEnvironment.createLocalEnvironment(config.getParallel.toInt)
+  }
+
+  def buildReaderPath(configObject: HdfsReaderConfiguration) : String = {
+    var inPathBuilder : String = ""
+    if (configObject.getScheme.equals(HdfsConfiguration.Scheme.FILE)) {
+      inPathBuilder = configObject.getPath + "/" + configObject.getReaderPath
+    }
+    else if (configObject.getScheme.equals(HdfsConfiguration.Scheme.HDFS)) {
+      inPathBuilder = configObject.getScheme + "://" + configObject.getHost + ":" + configObject.getPort + "/" + configObject.getPath + "/" + configObject.getReaderPath
+    }
+    else if (configObject.getScheme.toString.equals("s3")) {
+      inPathBuilder = configObject.getScheme + "://" + configObject.getPath + "/" + configObject.getReaderPath
+    } else {
+      throw new Exception("scheme not recognized: " + configObject.getScheme)
+    }
+    return inPathBuilder
+  }
+
+  def buildWriterPath(configObject: HdfsWriterConfiguration) : String = {
+    var outPathBuilder : String = ""
+    if( configObject.getScheme.equals(HdfsConfiguration.Scheme.FILE)) {
+      outPathBuilder = configObject.getPath + "/" + configObject.getWriterPath
+    }
+    else if( configObject.getScheme.equals(HdfsConfiguration.Scheme.HDFS)) {
+      outPathBuilder = configObject.getScheme + "://" + configObject.getHost + ":" + configObject.getPort + "/" + configObject.getPath + "/" + configObject.getWriterPath
+    }
+    else if( configObject.getScheme.toString.equals("s3")) {
+      outPathBuilder = configObject.getScheme + "://" + configObject.getPath + "/" + configObject.getWriterPath
+    } else {
+      throw new Exception("output scheme not recognized: " + configObject.getScheme)
+    }
+    return outPathBuilder
+  }
+
+  def toProviderId(input : String) : String = {
+    if( input.startsWith("@") )
+      return input.substring(1)
+    if( input.contains(':'))
+      return input.substring(input.lastIndexOf(':')+1)
+    else return input
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
new file mode 100644
index 0000000..2ac7d32
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
@@ -0,0 +1,149 @@
+package org.apache.streams.examples.flink.twitter.collection
+
+import java.util.concurrent.TimeUnit
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.google.common.util.concurrent.Uninterruptibles
+import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.core.fs.FileSystem
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}
+import org.apache.flink.streaming.connectors.fs.RollingSink
+import org.apache.flink.util.Collector
+import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
+import org.apache.streams.core.StreamsDatum
+import org.apache.streams.jackson.StreamsJacksonMapper
+import org.apache.streams.twitter.TwitterFollowingConfiguration
+import org.apache.streams.twitter.pojo.Follow
+import org.apache.streams.twitter.provider.TwitterFollowingProvider
+import org.slf4j.{Logger, LoggerFactory}
+import org.apache.flink.api.scala._
+import org.apache.streams.examples.flink.FlinkBase
+import org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration
+import org.apache.streams.flink.{FlinkStreamingConfiguration, StreamsFlinkConfiguration}
+
+import scala.collection.JavaConversions._
+
+/**
+ * Created by sblackmon on 4/20/16.
+ */
+/**
+ * Created by sblackmon on 3/15/16.
+ */
+object FlinkTwitterFollowingPipeline extends FlinkBase {
+
+    val STREAMS_ID: String = "FlinkTwitterFollowingPipeline"
+
+    private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterUserInformationPipeline])
+    private val MAPPER: ObjectMapper = StreamsJacksonMapper.getInstance()
+
+    override def main(args: Array[String]) = {
+    super.main(args)
+    val jobConfig = new ComponentConfigurator[TwitterFollowingPipelineConfiguration](classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(typesafe)
+    if( setup(jobConfig) == false ) System.exit(1)
+    val pipeline: FlinkTwitterFollowingPipeline = new FlinkTwitterFollowingPipeline(jobConfig)
+    val thread = new Thread(pipeline)
+    thread.start()
+    thread.join()
+    }
+
+    def setup(jobConfig: TwitterFollowingPipelineConfiguration): Boolean =  {
+
+        LOGGER.info("TwitterFollowingPipelineConfiguration: " + jobConfig)
+
+        if( jobConfig == null ) {
+            LOGGER.error("jobConfig is null!")
+            System.err.println("jobConfig is null!")
+            return false
+        }
+
+        if( jobConfig.getSource == null ) {
+            LOGGER.error("jobConfig.getSource is null!")
+            System.err.println("jobConfig.getSource is null!")
+            return false
+        }
+
+        if( jobConfig.getDestination == null ) {
+            LOGGER.error("jobConfig.getDestination is null!")
+            System.err.println("jobConfig.getDestination is null!")
+            return false
+        }
+
+        if( jobConfig.getTwitter == null ) {
+            LOGGER.error("jobConfig.getTwitter is null!")
+            System.err.println("jobConfig.getTwitter is null!")
+            return false
+        }
+
+        return true
+
+    }
+
+}
+
+class FlinkTwitterFollowingPipeline(config: TwitterFollowingPipelineConfiguration = new ComponentConfigurator[TwitterFollowingPipelineConfiguration](classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)) extends Runnable with java.io.Serializable {
+
+    import FlinkTwitterFollowingPipeline._
+
+    override def run(): Unit = {
+
+        val env: StreamExecutionEnvironment = streamEnvironment(MAPPER.convertValue(config, classOf[FlinkStreamingConfiguration]))
+
+        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+        env.setNumberOfExecutionRetries(0)
+
+        val inPath = buildReaderPath(config.getSource)
+
+        val outPath = buildWriterPath(config.getDestination)
+
+        val keyed_ids: KeyedStream[String, Int] = env.readTextFile(inPath).setParallelism(10).keyBy( id => (id.hashCode % 100).abs )
+
+        // these datums contain 'Follow' objects
+        val followDatums: DataStream[StreamsDatum] =
+            keyed_ids.flatMap(new FollowingCollectorFlatMapFunction(config.getTwitter)).setParallelism(10)
+
+        val follows: DataStream[Follow] = followDatums
+          .map(datum => datum.getDocument.asInstanceOf[Follow])
+
+        val jsons: DataStream[String] = follows
+          .map(follow => {
+              val MAPPER = StreamsJacksonMapper.getInstance
+              MAPPER.writeValueAsString(follow)
+          })
+
+        if( config.getTest == false )
+            jsons.addSink(new RollingSink[String](outPath)).setParallelism(3)
+        else
+            jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
+              .setParallelism(env.getParallelism);
+
+        // if( test == true ) jsons.print();
+
+        env.execute("FlinkTwitterFollowingPipeline")
+    }
+
+    class FollowingCollectorFlatMapFunction(
+                                             twitterConfiguration : TwitterFollowingConfiguration = new ComponentConfigurator[TwitterFollowingConfiguration](classOf[TwitterFollowingConfiguration]).detectConfiguration(StreamsConfigurator.getConfig.getConfig("twitter")),
+                                             flinkConfiguration : StreamsFlinkConfiguration = new ComponentConfigurator[StreamsFlinkConfiguration](classOf[StreamsFlinkConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)
+                                           ) extends RichFlatMapFunction[String, StreamsDatum] with Serializable {
+
+        override def flatMap(input: String, out: Collector[StreamsDatum]): Unit = {
+            collectConnections(input, out)
+        }
+
+        def collectConnections(id : String, out : Collector[StreamsDatum]) = {
+            val twitProvider: TwitterFollowingProvider =
+                new TwitterFollowingProvider(
+                    twitterConfiguration.withIdsOnly(true).withInfo(List(FlinkUtil.toProviderId(id))).withMaxItems(5000l).asInstanceOf[TwitterFollowingConfiguration]
+                )
+            twitProvider.prepare(twitProvider)
+            twitProvider.startStream()
+            var iterator: Iterator[StreamsDatum] = null
+            do {
+                Uninterruptibles.sleepUninterruptibly(flinkConfiguration.getProviderWaitMs, TimeUnit.MILLISECONDS)
+                twitProvider.readCurrent().iterator().toList.map(out.collect(_))
+            } while( twitProvider.isRunning )
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
new file mode 100644
index 0000000..f8e221c
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
@@ -0,0 +1,165 @@
+package org.apache.streams.examples.flink.twitter.collection
+
+import java.util.concurrent.TimeUnit
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.google.common.util.concurrent.Uninterruptibles
+import com.peoplepattern.streams.pdb.pipelines.FlinkStreamingConfiguration
+import com.peoplepattern.streams.pdb.flink.{FlinkBase, FlinkUtil}
+import com.peoplepattern.streams.pipelines.pdb.TwitterPostsPipelineConfiguration
+import com.peoplepattern.streams.twitter.collection.FlinkTwitterPostsPipeline.LOGGER
+import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction}
+import org.apache.flink.api.common.restartstrategy.RestartStrategies
+import org.apache.flink.api.scala.{ExecutionEnvironment, _}
+import org.apache.flink.core.fs.FileSystem
+import org.apache.flink.runtime.state.filesystem.FsStateBackend
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.scala.function.AllWindowFunction
+import org.apache.flink.streaming.api.scala.{AllWindowedStream, DataStream, KeyedStream, StreamExecutionEnvironment}
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
+import org.apache.flink.streaming.connectors.fs.RollingSink
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema
+import org.apache.flink.util.Collector
+import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
+import org.apache.streams.core.StreamsDatum
+import org.apache.streams.examples.flink.FlinkBase
+import org.apache.streams.examples.flink.twitter.TwitterPostsPipelineConfiguration
+import org.apache.streams.flink.FlinkStreamingConfiguration
+import org.apache.streams.hdfs.HdfsConfiguration
+import org.apache.streams.jackson.StreamsJacksonMapper
+import org.apache.streams.twitter.TwitterUserInformationConfiguration
+import org.apache.streams.twitter.pojo.{Tweet, User}
+import org.apache.streams.twitter.provider.{TwitterTimelineProvider, TwitterUserInformationProvider}
+import org.slf4j.{Logger, LoggerFactory}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Created by sblackmon on 7/29/15.
+  */
+object FlinkTwitterPostsPipeline extends FlinkBase {
+
+  val STREAMS_ID: String = "FlinkTwitterPostsPipeline"
+
+  private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterPostsPipeline])
+  private val MAPPER: ObjectMapper = StreamsJacksonMapper.getInstance()
+
+  override def main(args: Array[String]) = {
+    super.main(args)
+    val jobConfig = new ComponentConfigurator[TwitterPostsPipelineConfiguration](classOf[TwitterPostsPipelineConfiguration]).detectConfiguration(typesafe)
+    if( setup(jobConfig) == false ) System.exit(1)
+    val pipeline: FlinkTwitterPostsPipeline = new FlinkTwitterPostsPipeline(jobConfig)
+    val thread = new Thread(pipeline)
+    thread.start()
+    thread.join()
+  }
+
+  def setup(jobConfig: TwitterPostsPipelineConfiguration): Boolean =  {
+
+    LOGGER.info("TwitterPostsPipelineConfiguration: " + jobConfig)
+
+    if( jobConfig == null ) {
+      LOGGER.error("jobConfig is null!")
+      System.err.println("jobConfig is null!")
+      return false
+    }
+
+    if( jobConfig.getSource == null ) {
+      LOGGER.error("jobConfig.getSource is null!")
+      System.err.println("jobConfig.getSource is null!")
+      return false
+    }
+
+    if( jobConfig.getDestination == null ) {
+      LOGGER.error("jobConfig.getDestination is null!")
+      System.err.println("jobConfig.getDestination is null!")
+      return false
+    }
+
+    if( jobConfig.getTwitter == null ) {
+      LOGGER.error("jobConfig.getTwitter is null!")
+      System.err.println("jobConfig.getTwitter is null!")
+      return false
+    }
+
+    return true
+
+  }
+
+}
+
+class FlinkTwitterPostsPipeline(config: TwitterPostsPipelineConfiguration = new ComponentConfigurator[TwitterPostsPipelineConfiguration](classOf[TwitterPostsPipelineConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)) extends Runnable with java.io.Serializable {
+
+  import FlinkTwitterPostsPipeline._
+
+  override def run(): Unit = {
+
+    val env: StreamExecutionEnvironment = streamEnvironment(MAPPER.convertValue(config, classOf[FlinkStreamingConfiguration]))
+
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+    env.setNumberOfExecutionRetries(0)
+
+    val inPath = buildReaderPath(config.getSource)
+
+    val outPath = buildWriterPath(config.getDestination)
+
+    //val inProps = buildKafkaProps(config.getSourceTopic)
+
+    val ids: DataStream[String] = env.readTextFile(inPath).setParallelism(10).name("ids")
+
+    //val idTopicIn = new KafkaSink()
+
+//    val idTopicOut : DataStream[String] = env.addSource[String](
+//      new org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09(config.getSourceTopic.getTopic, new SimpleStringSchema(),
+//        inProps));
+
+    val keyed_ids: KeyedStream[String, Int] = env.readTextFile(inPath).setParallelism(10).name("keyed_ids").keyBy( id => (id.hashCode % 100).abs )
+
+    // these datums contain 'Tweet' objects
+    val tweetDatums: DataStream[StreamsDatum] =
+      keyed_ids.flatMap(new postCollectorFlatMapFunction).setParallelism(10).name("tweetDatums")
+
+    val tweets: DataStream[Tweet] = tweetDatums
+      .map(datum => datum.getDocument.asInstanceOf[Tweet]).name("tweets")
+
+    val jsons: DataStream[String] = tweets
+      .map(tweet => {
+        val MAPPER = StreamsJacksonMapper.getInstance
+        MAPPER.writeValueAsString(tweet)
+      }).name("json")
+
+    if( config.getTest == false )
+      jsons.addSink(new RollingSink[String](outPath)).setParallelism(3).name("hdfs")
+    else
+      jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
+        .setParallelism(env.getParallelism);
+
+    // if( test == true ) jsons.print();
+
+    env.execute("FlinkTwitterPostsPipeline")
+  }
+
+  class postCollectorFlatMapFunction extends RichFlatMapFunction[String, StreamsDatum] with Serializable {
+    override def flatMap(input: String, out: Collector[StreamsDatum]): Unit = {
+        collectPosts(input, out)
+    }
+    def collectPosts(id : String, out : Collector[StreamsDatum]) = {
+      val twitterConfiguration = config.getTwitter
+      val twitProvider: TwitterTimelineProvider =
+        new TwitterTimelineProvider(
+          twitterConfiguration.withInfo(List(FlinkUtil.toProviderId(id))).withMaxItems(200l)
+        )
+      twitProvider.prepare(twitProvider)
+      twitProvider.startStream()
+      var iterator: Iterator[StreamsDatum] = null
+      do {
+        Uninterruptibles.sleepUninterruptibly(config.getProviderWaitMs, TimeUnit.MILLISECONDS)
+        twitProvider.readCurrent().iterator().toList.map(out.collect(_))
+      } while( twitProvider.isRunning )
+    }
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
new file mode 100644
index 0000000..a081c74
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
@@ -0,0 +1,163 @@
+package org.apache.streams.examples.flink.twitter.collection
+
+import java.lang
+import java.util.concurrent.TimeUnit
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import org.apache.flink.core.fs.FileSystem
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.scala.function.{AllWindowFunction, WindowFunction}
+import org.apache.flink.streaming.api.windowing.assigners.{GlobalWindows, TumblingEventTimeWindows}
+
+import scala.collection.JavaConversions._
+import com.peoplepattern.streams.twitter.collection.FlinkTwitterUserInformationPipeline.LOGGER
+import com.google.common.util.concurrent.Uninterruptibles
+import org.apache.streams.examples.flink.FlinkBase
+import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.api.common.restartstrategy.RestartStrategies
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.streaming.api.scala.{AllWindowedStream, DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.triggers._
+import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, TimeWindow, Window}
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.{ExecutionEnvironment, _}
+import org.apache.flink.runtime.state.filesystem.FsStateBackend
+import org.apache.flink.streaming.connectors.fs.RollingSink
+import org.apache.flink.util.Collector
+import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
+import org.apache.streams.core.StreamsDatum
+import org.apache.streams.examples.flink.FlinkBase
+import org.apache.streams.examples.flink.twitter.TwitterUserInformationPipelineConfiguration
+import org.apache.streams.flink.FlinkStreamingConfiguration
+import org.apache.streams.hdfs.HdfsConfiguration
+import org.apache.streams.jackson.StreamsJacksonMapper
+import org.apache.streams.twitter.TwitterUserInformationConfiguration
+import org.apache.streams.twitter.pojo.{Tweet, User}
+import org.apache.streams.twitter.provider.{TwitterTimelineProvider, TwitterUserInformationProvider}
+import org.slf4j.{Logger, LoggerFactory}
+
+/**
+  * Created by sblackmon on 3/15/16.
+  */
+object FlinkTwitterUserInformationPipeline extends FlinkBase {
+
+  val STREAMS_ID: String = "FlinkTwitterUserInformationPipeline"
+
+  private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterUserInformationPipeline])
+  private val MAPPER: ObjectMapper = StreamsJacksonMapper.getInstance()
+
+  override def main(args: Array[String]) = {
+    super.main(args)
+    val jobConfig = new ComponentConfigurator[TwitterUserInformationPipelineConfiguration](classOf[TwitterUserInformationPipelineConfiguration]).detectConfiguration(typesafe)
+    if( setup(jobConfig) == false ) System.exit(1)
+    val pipeline: FlinkTwitterUserInformationPipeline = new FlinkTwitterUserInformationPipeline(jobConfig)
+    val thread = new Thread(pipeline)
+    thread.start()
+    thread.join()
+  }
+
+  def setup(jobConfig: TwitterUserInformationPipelineConfiguration): Boolean =  {
+
+    LOGGER.info("TwitterFollowingPipelineConfiguration: " + jobConfig)
+
+    if( jobConfig == null ) {
+      LOGGER.error("jobConfig is null!")
+      System.err.println("jobConfig is null!")
+      return false
+    }
+
+    if( jobConfig.getSource == null ) {
+      LOGGER.error("jobConfig.getSource is null!")
+      System.err.println("jobConfig.getSource is null!")
+      return false
+    }
+
+    if( jobConfig.getDestination == null ) {
+      LOGGER.error("jobConfig.getDestination is null!")
+      System.err.println("jobConfig.getDestination is null!")
+      return false
+    }
+
+    if( jobConfig.getTwitter == null ) {
+      LOGGER.error("jobConfig.getTwitter is null!")
+      System.err.println("jobConfig.getTwitter is null!")
+      return false
+    }
+
+    return true
+
+  }
+
+}
+
+class FlinkTwitterUserInformationPipeline(config: TwitterUserInformationPipelineConfiguration = new ComponentConfigurator[TwitterUserInformationPipelineConfiguration](classOf[TwitterUserInformationPipelineConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)) extends Runnable with java.io.Serializable {
+
+  import FlinkTwitterUserInformationPipeline._
+
+  override def run(): Unit = {
+
+    val env: StreamExecutionEnvironment = streamEnvironment(MAPPER.convertValue(config, classOf[FlinkStreamingConfiguration]))
+
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+    env.setNumberOfExecutionRetries(0)
+
+    val inPath = buildReaderPath(config.getSource)
+
+    val outPath = buildWriterPath(config.getDestination)
+
+    val ids: DataStream[String] = env.readTextFile(inPath).setParallelism(10).name("ids")
+
+    val keyed_ids: KeyedStream[String, Int] = ids.name("keyed_ids").keyBy( id => (id.hashCode % 100).abs )
+
+    val idWindows: WindowedStream[String, Int, GlobalWindow] = keyed_ids.countWindow(100)
+
+    val idLists: DataStream[List[String]] = idWindows.apply[List[String]] (new idListWindowFunction()).name("idLists")
+
+    val userDatums: DataStream[StreamsDatum] = idLists.flatMap(new profileCollectorFlatMapFunction).setParallelism(10).name("userDatums")
+
+    val user: DataStream[User] = userDatums.map(datum => datum.getDocument.asInstanceOf[User]).name("users")
+
+    val jsons: DataStream[String] = user
+      .map(user => {
+        val MAPPER = StreamsJacksonMapper.getInstance
+        MAPPER.writeValueAsString(user)
+      }).name("jsons")
+
+    if( config.getTest == false )
+      jsons.addSink(new RollingSink[String](outPath)).setParallelism(3).name("hdfs")
+    else
+      jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
+        .setParallelism(env.getParallelism);
+
+    LOGGER.info("StreamExecutionEnvironment: {}", env.toString )
+
+    env.execute("FlinkTwitterUserInformationPipeline")
+  }
+
+  class idListWindowFunction extends WindowFunction[String, List[String], Int, GlobalWindow] {
+    override def apply(key: Int, window: GlobalWindow, input: Iterable[String], out: Collector[List[String]]): Unit = {if( input.size > 0 )
+        out.collect(input.map(id => FlinkUtil.toProviderId(id)).toList)
+    }
+  }
+
+  class profileCollectorFlatMapFunction extends RichFlatMapFunction[List[String], StreamsDatum] with Serializable {
+    override def flatMap(input: List[String], out: Collector[StreamsDatum]): Unit = {
+      collectProfiles(input, out)
+    }
+    def collectProfiles(ids : List[String], out : Collector[StreamsDatum]) = {
+      val twitterConfiguration = config.getTwitter
+      val twitProvider: TwitterUserInformationProvider =
+        new TwitterUserInformationProvider(
+          twitterConfiguration.withInfo(ids)
+        )
+      twitProvider.prepare(twitProvider)
+      twitProvider.startStream()
+      var iterator: Iterator[StreamsDatum] = null
+      do {
+        Uninterruptibles.sleepUninterruptibly(config.getProviderWaitMs, TimeUnit.MILLISECONDS)
+        twitProvider.readCurrent().iterator().toList.map(out.collect(_))
+      } while( twitProvider.isRunning )
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterFollowingPipeline.md
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterFollowingPipeline.md b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterFollowingPipeline.md
new file mode 100644
index 0000000..22f30f5
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterFollowingPipeline.md
@@ -0,0 +1,41 @@
+FlinkTwitterFollowingPipeline
+=============================
+
+Description:
+-----------------
+
+Collects twitter friends or followers with flink.
+
+Specification:
+-----------------
+
+[FlinkTwitterFollowingPipeline.dot](FlinkTwitterFollowingPipeline.dot "FlinkTwitterFollowingPipeline.dot" )
+
+Diagram:
+-----------------
+
+![FlinkTwitterFollowingPipeline.dot.svg](./FlinkTwitterFollowingPipeline.dot.svg)
+
+Example Configuration:
+----------------------
+
+[FlinkTwitterFollowingPipeline.json](FlinkTwitterFollowingPipeline.json "FlinkTwitterFollowingPipeline.json" )
+
+Run (Local):
+------------
+
+    java -cp dist/flink-twitter-collection-jar-with-dependencies.jar -Dconfig.file=file://<location_of_config_file> org.apache.streams.examples.flink.twitter.collection.FlinkTwitterFollowingPipeline
+
+Run (Flink):
+------------
+
+    flink-run.sh dist/flink-twitter-collection-jar-with-dependencies.jar org.apache.streams.examples.flink.twitter.collection.FlinkTwitterFollowingPipeline http://<location_of_config_file> 
+
+Run (YARN):
+-----------
+
+    flink-run.sh yarn dist/flink-twitter-collection-jar-with-dependencies.jar org.apache.streams.examples.flink.twitter.collection.FlinkTwitterFollowingPipeline http://<location_of_config_file> 
+
+[JavaDocs](apidocs/index.html "JavaDocs")
+
+###### Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterPostsPipeline.md
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterPostsPipeline.md b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterPostsPipeline.md
new file mode 100644
index 0000000..5f77994
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterPostsPipeline.md
@@ -0,0 +1,41 @@
+FlinkTwitterPostsPipeline
+=========================
+
+Description:
+-----------------
+
+Collects twitter posts with flink.
+
+Specification:
+-----------------
+
+[FlinkTwitterPostsPipeline.dot](FlinkTwitterPostsPipeline.dot "FlinkTwitterPostsPipeline.dot" )
+
+Diagram:
+-----------------
+
+![FlinkTwitterPostsPipeline.dot.svg](./FlinkTwitterPostsPipeline.dot.svg)
+
+Example Configuration:
+----------------------
+
+[FlinkTwitterPostsPipeline.json](FlinkTwitterPostsPipeline.json "FlinkTwitterPostsPipeline.json" )
+
+Run (Local):
+------------
+
+    java -cp dist/flink-twitter-collection-jar-with-dependencies.jar -Dconfig.file=file://<location_of_config_file> org.apache.streams.examples.flink.twitter.collection.FlinkTwitterPostsPipeline
+
+Run (Flink):
+------------
+
+    flink-run.sh dist/flink-twitter-collection-jar-with-dependencies.jar org.apache.streams.examples.flink.twitter.collection.FlinkTwitterPostsPipeline http://<location_of_config_file> 
+
+Run (YARN):
+-----------
+
+    flink-run.sh yarn dist/flink-twitter-collection-jar-with-dependencies.jar org.apache.streams.examples.flink.twitter.collection.FlinkTwitterPostsPipeline http://<location_of_config_file> 
+
+[JavaDocs](apidocs/index.html "JavaDocs")
+
+###### Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterUserInformationPipeline.md
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterUserInformationPipeline.md b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterUserInformationPipeline.md
new file mode 100644
index 0000000..5e0d1fe
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterUserInformationPipeline.md
@@ -0,0 +1,41 @@
+FlinkTwitterUserInformationPipeline
+===================================
+
+Description:
+-----------------
+
+Collects twitter users with flink.
+
+Specification:
+-----------------
+
+[FlinkTwitterUserInformationPipeline.dot](FlinkTwitterUserInformationPipeline.dot "FlinkTwitterUserInformationPipeline.dot" )
+
+Diagram:
+-----------------
+
+![TwitterUserInformationPipeline.dot.svg](./TwitterUserInformationPipeline.dot.svg)
+
+Example Configuration:
+----------------------
+
+[FlinkTwitterUserInformationPipeline.json](FlinkTwitterUserInformationPipeline.json "FlinkTwitterUserInformationPipeline.json" )
+
+Run (Local):
+------------
+
+    java -cp dist/flink-twitter-collection-jar-with-dependencies.jar -Dconfig.file=file://<location_of_config_file> org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline
+
+Run (Flink):
+------------
+
+    flink-run.sh dist/flink-twitter-collection-jar-with-dependencies.jar org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline http://<location_of_config_file> 
+
+Run (YARN):
+-----------
+
+    flink-run.sh yarn dist/flink-twitter-collection-jar-with-dependencies.jar org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline http://<location_of_config_file> 
+
+[JavaDocs](apidocs/index.html "JavaDocs")
+
+###### Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/markdown/index.md
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/markdown/index.md b/flink/flink-twitter-collection/src/site/markdown/index.md
new file mode 100644
index 0000000..19e44cf
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/markdown/index.md
@@ -0,0 +1,32 @@
+Apache Streams (incubating)
+Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0
+--------------------------------------------------------------------------------
+
+flink-twitter-collection
+========================
+
+Requirements:
+-------------
+ - Authorized Twitter API credentials
+
+Description:
+------------
+Collects large batches of documents from api.twitter.com from a seed set of ids.
+
+Streams:
+--------
+
+<a href="FlinkTwitterUserInformationPipeline.html" target="_self">FlinkTwitterUserInformationPipeline</a>
+
+<a href="FlinkTwitterPostsPipeline.html" target="_self">FlinkTwitterPostsPipeline</a>
+
+<a href="FlinkTwitterFollowingPipeline.html" target="_self">FlinkTwitterFollowingPipeline</a>
+
+Build:
+---------
+
+    mvn clean install verify
+
+[JavaDocs](apidocs/index.html "JavaDocs")
+
+###### Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/resources/FlinkBatchConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/resources/FlinkBatchConfiguration.json b/flink/flink-twitter-collection/src/site/resources/FlinkBatchConfiguration.json
new file mode 100644
index 0000000..30a2942
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/resources/FlinkBatchConfiguration.json
@@ -0,0 +1,12 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "type": "object",
+  "javaType" : "org.apache.streams.flink.FlinkBatchConfiguration",
+  "javaInterfaces": ["java.io.Serializable"],
+  "extends": {
+    "$ref": "StreamsFlinkConfiguration.json"
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/resources/FlinkStreamingConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/resources/FlinkStreamingConfiguration.json b/flink/flink-twitter-collection/src/site/resources/FlinkStreamingConfiguration.json
new file mode 100644
index 0000000..0d63f4e
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/resources/FlinkStreamingConfiguration.json
@@ -0,0 +1,40 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "type": "object",
+  "javaType" : "org.apache.streams.flink.FlinkStreamingConfiguration",
+  "javaInterfaces": ["java.io.Serializable"],
+  "extends": {
+    "$ref": "StreamsFlinkConfiguration.json"
+  },
+  "properties": {
+    "parallel": {
+      "type": "integer",
+      "default": 1
+    },
+    "providerWaitMs": {
+      "type": "integer",
+      "default": 1000
+    },
+    "checkpointIntervalMs": {
+      "type": "integer",
+      "default": 300000
+    },
+    "checkpointTimeoutMs": {
+      "type": "integer",
+      "default": 30000
+    },
+    "restartAttempts": {
+      "type": "integer",
+      "description": "number of restart attempts",
+      "default": 3
+    },
+    "restartDelayMs": {
+      "type": "integer",
+      "description": "delay in milliseconds",
+      "default": 10000
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/resources/StreamsFlinkConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/resources/StreamsFlinkConfiguration.json b/flink/flink-twitter-collection/src/site/resources/StreamsFlinkConfiguration.json
new file mode 100644
index 0000000..ef78357
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/resources/StreamsFlinkConfiguration.json
@@ -0,0 +1,48 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "type": "object",
+  "javaType" : "org.apache.streams.flink.StreamsFlinkConfiguration",
+  "javaInterfaces": ["java.io.Serializable"],
+  "extends": {
+    "$ref": "http://streams.peoplepattern.com/incubator-streams/0.3.9-PP-SNAPSHOT/streams-config/StreamsConfiguration.json"
+  },
+  "properties": {
+    "parallel": {
+      "type": "integer",
+      "default": 1
+    },
+    "providerWaitMs": {
+      "type": "integer",
+      "default": 1000
+    },
+    "checkpointIntervalMs": {
+      "type": "integer",
+      "default": 300000
+    },
+    "checkpointTimeoutMs": {
+      "type": "integer",
+      "default": 30000
+    },
+    "test": {
+      "type": "boolean",
+      "default": false
+    },
+    "local": {
+      "type": "boolean",
+      "default": true
+    },
+    "restartAttempts": {
+      "type": "integer",
+      "description": "number of restart attempts",
+      "default": 3
+    },
+    "restartDelayMs": {
+      "type": "integer",
+      "description": "delay in milliseconds",
+      "default": 10000
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/resources/TwitterFollowingBatchConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/resources/TwitterFollowingBatchConfiguration.json b/flink/flink-twitter-collection/src/site/resources/TwitterFollowingBatchConfiguration.json
new file mode 100644
index 0000000..33afb29
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/resources/TwitterFollowingBatchConfiguration.json
@@ -0,0 +1,23 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "type": "object",
+  "javaType" : "com.peoplepattern.streams.pipelines.pdb.TwitterFollowingBatchConfiguration",
+  "javaInterfaces": ["java.io.Serializable"],
+  "properties": {
+    "twitter": {
+      "type": "object",
+      "javaType": "org.apache.streams.twitter.TwitterFollowingConfiguration"
+    },
+    "hdfs": {
+      "type": "object",
+      "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration"
+    },
+    "s3": {
+      "type": "object",
+      "javaType": "org.apache.streams.s3.S3WriterConfiguration"
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/resources/TwitterFollowingPipelineConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/resources/TwitterFollowingPipelineConfiguration.json b/flink/flink-twitter-collection/src/site/resources/TwitterFollowingPipelineConfiguration.json
new file mode 100644
index 0000000..de4f9bb
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/resources/TwitterFollowingPipelineConfiguration.json
@@ -0,0 +1,29 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "type": "object",
+  "javaType" : "org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration",
+  "javaInterfaces": ["java.io.Serializable"],
+  "extends": {
+    "$ref": "FlinkStreamingConfiguration.json"
+  },
+  "properties": {
+    "twitter": {
+      "type": "object",
+      "javaType": "org.apache.streams.twitter.TwitterFollowingConfiguration"
+    },
+    "source": {
+      "type": "object",
+      "javaType": "org.apache.streams.hdfs.HdfsReaderConfiguration"
+    },
+    "destination": {
+      "type": "object",
+      "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration"
+    },
+    "providerWaitMs": {
+      "type": "integer"
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/resources/TwitterPostsBatchConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/resources/TwitterPostsBatchConfiguration.json b/flink/flink-twitter-collection/src/site/resources/TwitterPostsBatchConfiguration.json
new file mode 100644
index 0000000..376bb4d
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/resources/TwitterPostsBatchConfiguration.json
@@ -0,0 +1,23 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "type": "object",
+  "javaType" : "com.peoplepattern.streams.pipelines.pdb.TwitterPostsBatchConfiguration",
+  "javaInterfaces": ["java.io.Serializable"],
+  "properties": {
+    "twitter": {
+      "type": "object",
+      "javaType": "org.apache.streams.twitter.TwitterUserInformationConfiguration"
+    },
+    "hdfs": {
+      "type": "object",
+      "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration"
+    },
+    "s3": {
+      "type": "object",
+      "javaType": "org.apache.streams.s3.S3WriterConfiguration"
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/resources/TwitterPostsPipelineConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/resources/TwitterPostsPipelineConfiguration.json b/flink/flink-twitter-collection/src/site/resources/TwitterPostsPipelineConfiguration.json
new file mode 100644
index 0000000..628d7ee
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/resources/TwitterPostsPipelineConfiguration.json
@@ -0,0 +1,29 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "type": "object",
+  "javaType" : "org.apache.streams.examples.flink.twitter.TwitterPostsPipelineConfiguration",
+  "javaInterfaces": ["java.io.Serializable"],
+  "extends": {
+    "$ref": "FlinkStreamingConfiguration.json"
+  },
+  "properties": {
+    "twitter": {
+      "type": "object",
+      "javaType": "org.apache.streams.twitter.TwitterUserInformationConfiguration"
+    },
+    "source": {
+      "type": "object",
+      "javaType": "org.apache.streams.hdfs.HdfsReaderConfiguration"
+    },
+    "destination": {
+      "type": "object",
+      "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration"
+    },
+    "providerWaitMs": {
+      "type": "integer"
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/resources/TwitterUserInformationBatchConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/resources/TwitterUserInformationBatchConfiguration.json b/flink/flink-twitter-collection/src/site/resources/TwitterUserInformationBatchConfiguration.json
new file mode 100644
index 0000000..55f9fbd
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/resources/TwitterUserInformationBatchConfiguration.json
@@ -0,0 +1,23 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "type": "object",
+  "javaType" : "com.peoplepattern.streams.pipelines.pdb.TwitterUserInformationBatchConfiguration",
+  "javaInterfaces": ["java.io.Serializable"],
+  "properties": {
+    "twitter": {
+      "type": "object",
+      "javaType": "org.apache.streams.twitter.TwitterUserInformationConfiguration"
+    },
+    "hdfs": {
+      "type": "object",
+      "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration"
+    },
+    "s3": {
+      "type": "object",
+      "javaType": "org.apache.streams.s3.S3WriterConfiguration"
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/resources/TwitterUserInformationPipelineConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/resources/TwitterUserInformationPipelineConfiguration.json b/flink/flink-twitter-collection/src/site/resources/TwitterUserInformationPipelineConfiguration.json
new file mode 100644
index 0000000..5261748
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/resources/TwitterUserInformationPipelineConfiguration.json
@@ -0,0 +1,29 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "type": "object",
+  "javaType" : "org.apache.streams.examples.flink.twitter.TwitterUserInformationPipelineConfiguration",
+  "javaInterfaces": ["java.io.Serializable"],
+  "extends": {
+    "$ref": "FlinkStreamingConfiguration.json"
+  },
+  "properties": {
+    "twitter": {
+      "type": "object",
+      "javaType": "org.apache.streams.twitter.TwitterUserInformationConfiguration"
+    },
+    "source": {
+      "type": "object",
+      "javaType": "org.apache.streams.hdfs.HdfsReaderConfiguration"
+    },
+    "destination": {
+      "type": "object",
+      "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration"
+    },
+    "providerWaitMs": {
+      "type": "integer"
+    }
+  }
+}
\ No newline at end of file