You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/10/09 21:35:43 UTC
[2/6] incubator-streams-examples git commit: flink example
flink example
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/commit/4491cfe1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/tree/4491cfe1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/diff/4491cfe1
Branch: refs/heads/master
Commit: 4491cfe1d0bf7324073537e89e7e8b6ed8ab43d5
Parents: b3429dd
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Mon Sep 26 12:43:22 2016 -0500
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Mon Sep 26 12:43:22 2016 -0500
----------------------------------------------------------------------
flink/flink-twitter-collection/README.md | 8 +
flink/flink-twitter-collection/pom.xml | 420 ++++++++
.../jsonschema/FlinkBatchConfiguration.json | 12 +
.../jsonschema/FlinkStreamingConfiguration.json | 40 +
.../jsonschema/StreamsFlinkConfiguration.json | 48 +
.../TwitterFollowingPipelineConfiguration.json | 29 +
.../TwitterPostsPipelineConfiguration.json | 29 +
...terUserInformationPipelineConfiguration.json | 29 +
.../streams/examples/flink/FlinkBase.scala | 200 ++++
.../FlinkTwitterFollowingPipeline.scala | 149 +++
.../collection/FlinkTwitterPostsPipeline.scala | 165 +++
.../FlinkTwitterUserInformationPipeline.scala | 163 +++
.../markdown/FlinkTwitterFollowingPipeline.md | 41 +
.../site/markdown/FlinkTwitterPostsPipeline.md | 41 +
.../FlinkTwitterUserInformationPipeline.md | 41 +
.../src/site/markdown/index.md | 32 +
.../site/resources/FlinkBatchConfiguration.json | 12 +
.../resources/FlinkStreamingConfiguration.json | 40 +
.../resources/StreamsFlinkConfiguration.json | 48 +
.../TwitterFollowingBatchConfiguration.json | 23 +
.../TwitterFollowingPipelineConfiguration.json | 29 +
.../TwitterPostsBatchConfiguration.json | 23 +
.../TwitterPostsPipelineConfiguration.json | 29 +
...witterUserInformationBatchConfiguration.json | 23 +
...terUserInformationPipelineConfiguration.json | 29 +
.../src/test/resources/1000twitterids.txt | 1000 ++++++++++++++++++
.../FlinkTwitterFollowingPipeline.conf | 10 +
.../resources/FlinkTwitterPostsPipeline.conf | 10 +
.../FlinkTwitterUserInformationPipeline.conf | 10 +
.../src/test/resources/asf.txt | 1 +
.../test/FlinkTwitterFollowingPipelineIT.scala | 81 ++
.../test/FlinkTwitterPostsPipelineIT.scala | 55 +
.../FlinkTwitterUserInformationPipelineIT.scala | 56 +
flink/pom.xml | 47 +
pom.xml | 29 +-
35 files changed, 2988 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/README.md
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/README.md b/flink/flink-twitter-collection/README.md
new file mode 100644
index 0000000..f9fe687
--- /dev/null
+++ b/flink/flink-twitter-collection/README.md
@@ -0,0 +1,8 @@
+Apache Streams (incubating)
+Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0
+--------------------------------------------------------------------------------
+
+org.apache.streams:flink-twitter-collection
+===========================================
+
+[README.md](src/site/markdown/index.md "README")
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/pom.xml
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/pom.xml b/flink/flink-twitter-collection/pom.xml
new file mode 100644
index 0000000..33b05fe
--- /dev/null
+++ b/flink/flink-twitter-collection/pom.xml
@@ -0,0 +1,420 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-examples-flink</artifactId>
+ <version>0.4-incubating-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>flink-twitter-collection</artifactId>
+ <name>flink-twitter-collection</name>
+
+ <description>Collects twitter documents using flink.</description>
+
+ <properties>
+ <docker.repo>apachestreams</docker.repo>
+ <hdfs.version>2.7.0</hdfs.version>
+ <flink.version>1.1.2</flink.version>
+ <scala.suffix>2.10</scala.suffix>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <version>1.3</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.typesafe</groupId>
+ <artifactId>config</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-config</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-util</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-pojo</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-provider-twitter</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-persist-hdfs</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <version>${hdfs.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ <version>${flink.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients_${scala.suffix}</artifactId>
+ <version>${flink.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-scala_${scala.suffix}</artifactId>
+ <version>${flink.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-filesystem_2.10</artifactId>
+ <version>${flink.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-scala_${scala.suffix}</artifactId>
+ <version>${flink.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-metrics-core</artifactId>
+ <version>${flink.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jcl-over-slf4j</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jul-to-slf4j</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <version>${logback.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-core</artifactId>
+ <version>${logback.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <sourceDirectory>src/main/scala</sourceDirectory>
+ <testSourceDirectory>src/test/scala</testSourceDirectory>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ </resource>
+ </resources>
+ <testResources>
+ <testResource>
+ <directory>src/test/resources</directory>
+ </testResource>
+ </testResources>
+ <plugins>
+ <plugin>
+ <artifactId>maven-clean-plugin</artifactId>
+ <configuration>
+ <filesets>
+ <fileset>
+ <directory>data</directory>
+ <followSymlinks>false</followSymlinks>
+ </fileset>
+ </filesets>
+ </configuration>
+ </plugin>
+ <!-- This binary runs with logback -->
+ <!-- Keep log4j out -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-enforcer-plugin</artifactId>
+ <version>1.3.1</version>
+ <executions>
+ <execution>
+ <id>enforce-banned-dependencies</id>
+ <goals>
+ <goal>enforce</goal>
+ </goals>
+ <configuration>
+ <rules>
+ <bannedDependencies>
+ <excludes>
+ <exclude>org.slf4j:slf4j-log4j12</exclude>
+ <exclude>org.slf4j:slf4j-jcl</exclude>
+ <exclude>org.slf4j:slf4j-jdk14</exclude>
+ <exclude>org.log4j:log4j</exclude>
+ <exclude>commons-logging:commons-logging</exclude>
+ </excludes>
+ </bannedDependencies>
+ </rules>
+ <fail>true</fail>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.jsonschema2pojo</groupId>
+ <artifactId>jsonschema2pojo-maven-plugin</artifactId>
+ <version>0.4.1</version>
+ <configuration>
+ <addCompileSourceRoot>true</addCompileSourceRoot>
+ <generateBuilders>true</generateBuilders>
+ <sourcePaths>
+ <sourcePath>src/main/jsonschema</sourcePath>
+ </sourcePaths>
+ <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
+ <targetPackage>org.apache.streams.example.elasticsearch</targetPackage>
+ <useJodaDates>false</useJodaDates>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>generate</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>target/generated-sources/jsonschema2pojo</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <configuration>
+ <includes>**/*.json</includes>
+ <outputDirectory>${project.build.directory}/test-classes</outputDirectory>
+ <includeGroupIds>org.apache.streams</includeGroupIds>
+ <includeTypes>test-jar</includeTypes>
+ </configuration>
+ <executions>
+ <execution>
+ <id>test-resource-dependencies</id>
+ <phase>process-test-resources</phase>
+ <goals>
+ <goal>unpack-dependencies</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <version>2.12.4</version>
+ <executions>
+ <execution>
+ <id>integration-tests</id>
+ <goals>
+ <goal>integration-test</goal>
+ <goal>verify</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>io.fabric8</groupId>
+ <artifactId>docker-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/main/jsonschema/FlinkBatchConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/jsonschema/FlinkBatchConfiguration.json b/flink/flink-twitter-collection/src/main/jsonschema/FlinkBatchConfiguration.json
new file mode 100644
index 0000000..30a2942
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/jsonschema/FlinkBatchConfiguration.json
@@ -0,0 +1,12 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "type": "object",
+ "javaType" : "org.apache.streams.flink.FlinkBatchConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "extends": {
+ "$ref": "StreamsFlinkConfiguration.json"
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/main/jsonschema/FlinkStreamingConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/jsonschema/FlinkStreamingConfiguration.json b/flink/flink-twitter-collection/src/main/jsonschema/FlinkStreamingConfiguration.json
new file mode 100644
index 0000000..0d63f4e
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/jsonschema/FlinkStreamingConfiguration.json
@@ -0,0 +1,40 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "type": "object",
+ "javaType" : "org.apache.streams.flink.FlinkStreamingConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "extends": {
+ "$ref": "StreamsFlinkConfiguration.json"
+ },
+ "properties": {
+ "parallel": {
+ "type": "integer",
+ "default": 1
+ },
+ "providerWaitMs": {
+ "type": "integer",
+ "default": 1000
+ },
+ "checkpointIntervalMs": {
+ "type": "integer",
+ "default": 300000
+ },
+ "checkpointTimeoutMs": {
+ "type": "integer",
+ "default": 30000
+ },
+ "restartAttempts": {
+ "type": "integer",
+ "description": "number of restart attempts",
+ "default": 3
+ },
+ "restartDelayMs": {
+ "type": "integer",
+ "description": "delay in milliseconds",
+ "default": 10000
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/main/jsonschema/StreamsFlinkConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/jsonschema/StreamsFlinkConfiguration.json b/flink/flink-twitter-collection/src/main/jsonschema/StreamsFlinkConfiguration.json
new file mode 100644
index 0000000..ef78357
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/jsonschema/StreamsFlinkConfiguration.json
@@ -0,0 +1,48 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "type": "object",
+ "javaType" : "org.apache.streams.flink.StreamsFlinkConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "extends": {
+ "$ref": "http://streams.peoplepattern.com/incubator-streams/0.3.9-PP-SNAPSHOT/streams-config/StreamsConfiguration.json"
+ },
+ "properties": {
+ "parallel": {
+ "type": "integer",
+ "default": 1
+ },
+ "providerWaitMs": {
+ "type": "integer",
+ "default": 1000
+ },
+ "checkpointIntervalMs": {
+ "type": "integer",
+ "default": 300000
+ },
+ "checkpointTimeoutMs": {
+ "type": "integer",
+ "default": 30000
+ },
+ "test": {
+ "type": "boolean",
+ "default": false
+ },
+ "local": {
+ "type": "boolean",
+ "default": true
+ },
+ "restartAttempts": {
+ "type": "integer",
+ "description": "number of restart attempts",
+ "default": 3
+ },
+ "restartDelayMs": {
+ "type": "integer",
+ "description": "delay in milliseconds",
+ "default": 10000
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/main/jsonschema/TwitterFollowingPipelineConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/jsonschema/TwitterFollowingPipelineConfiguration.json b/flink/flink-twitter-collection/src/main/jsonschema/TwitterFollowingPipelineConfiguration.json
new file mode 100644
index 0000000..de4f9bb
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/jsonschema/TwitterFollowingPipelineConfiguration.json
@@ -0,0 +1,29 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "type": "object",
+ "javaType" : "org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "extends": {
+ "$ref": "FlinkStreamingConfiguration.json"
+ },
+ "properties": {
+ "twitter": {
+ "type": "object",
+ "javaType": "org.apache.streams.twitter.TwitterFollowingConfiguration"
+ },
+ "source": {
+ "type": "object",
+ "javaType": "org.apache.streams.hdfs.HdfsReaderConfiguration"
+ },
+ "destination": {
+ "type": "object",
+ "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration"
+ },
+ "providerWaitMs": {
+ "type": "integer"
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/main/jsonschema/TwitterPostsPipelineConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/jsonschema/TwitterPostsPipelineConfiguration.json b/flink/flink-twitter-collection/src/main/jsonschema/TwitterPostsPipelineConfiguration.json
new file mode 100644
index 0000000..628d7ee
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/jsonschema/TwitterPostsPipelineConfiguration.json
@@ -0,0 +1,29 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "type": "object",
+ "javaType" : "org.apache.streams.examples.flink.twitter.TwitterPostsPipelineConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "extends": {
+ "$ref": "FlinkStreamingConfiguration.json"
+ },
+ "properties": {
+ "twitter": {
+ "type": "object",
+ "javaType": "org.apache.streams.twitter.TwitterUserInformationConfiguration"
+ },
+ "source": {
+ "type": "object",
+ "javaType": "org.apache.streams.hdfs.HdfsReaderConfiguration"
+ },
+ "destination": {
+ "type": "object",
+ "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration"
+ },
+ "providerWaitMs": {
+ "type": "integer"
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/main/jsonschema/TwitterUserInformationPipelineConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/jsonschema/TwitterUserInformationPipelineConfiguration.json b/flink/flink-twitter-collection/src/main/jsonschema/TwitterUserInformationPipelineConfiguration.json
new file mode 100644
index 0000000..5261748
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/jsonschema/TwitterUserInformationPipelineConfiguration.json
@@ -0,0 +1,29 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "type": "object",
+ "javaType" : "org.apache.streams.examples.flink.twitter.TwitterUserInformationPipelineConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "extends": {
+ "$ref": "FlinkStreamingConfiguration.json"
+ },
+ "properties": {
+ "twitter": {
+ "type": "object",
+ "javaType": "org.apache.streams.twitter.TwitterUserInformationConfiguration"
+ },
+ "source": {
+ "type": "object",
+ "javaType": "org.apache.streams.hdfs.HdfsReaderConfiguration"
+ },
+ "destination": {
+ "type": "object",
+ "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration"
+ },
+ "providerWaitMs": {
+ "type": "integer"
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
new file mode 100644
index 0000000..1f1ed6d
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
@@ -0,0 +1,200 @@
+package org.apache.streams.examples.flink
+
+import java.net.MalformedURLException
+
+import com.google.common.base.Strings
+import com.typesafe.config.Config
+import org.apache.flink.api.common.restartstrategy.RestartStrategies
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.streaming.api.CheckpointingMode
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
+import org.apache.streams.flink.{FlinkBatchConfiguration, FlinkStreamingConfiguration, StreamsFlinkConfiguration}
+import org.apache.streams.hdfs.{HdfsConfiguration, HdfsReaderConfiguration, HdfsWriterConfiguration}
+import org.apache.streams.jackson.StreamsJacksonMapper
+import org.slf4j.LoggerFactory
+
+trait FlinkBase {
+
+ private val BASELOGGER = LoggerFactory.getLogger("FlinkBase")
+ private val MAPPER = StreamsJacksonMapper.getInstance()
+
+ var configUrl : String = _
+ var typesafe : Config = _
+ var streamsConfig = StreamsConfigurator.detectConfiguration(StreamsConfigurator.getConfig)
+ var streamsFlinkConfiguration: StreamsFlinkConfiguration = _
+
+ var executionEnvironment: ExecutionEnvironment = _
+ var streamExecutionEnvironment: StreamExecutionEnvironment = _
+
+ /*
+ Basic stuff for every flink job
+ */
+ def main(args: Array[String]): Unit = {
+ // if only one argument, use it as the config URL
+ if( args.size > 0 ) {
+ BASELOGGER.info("Args: {}", args)
+ configUrl = args(0)
+ setup(configUrl)
+ }
+
+ }
+
+ def setup(configUrl : String): Boolean = {
+ BASELOGGER.info("StreamsConfigurator.config: {}", StreamsConfigurator.config)
+ if( !Strings.isNullOrEmpty(configUrl)) {
+ BASELOGGER.info("StreamsConfigurator.resolveConfig(configUrl): {}", StreamsConfigurator.resolveConfig(configUrl))
+ try {
+ typesafe = StreamsConfigurator.resolveConfig(configUrl).withFallback(StreamsConfigurator.config).resolve()
+ } catch {
+ case mue: MalformedURLException => {
+ BASELOGGER.error("Invalid Configuration URL: ", mue)
+ return false
+ }
+ case e: Exception => {
+ BASELOGGER.error("Invalid Configuration URL: ", e)
+ return false
+ }
+ }
+ }
+ else {
+ typesafe = StreamsConfigurator.getConfig
+ }
+
+ return setup(typesafe)
+
+ }
+
+ def setup(typesafe : Config): Boolean = {
+ this.typesafe = typesafe
+
+ BASELOGGER.info("Typesafe Config: {}", typesafe)
+
+ if( this.typesafe.getString("mode").equals("streaming")) {
+ val streamingConfiguration: FlinkStreamingConfiguration =
+ new ComponentConfigurator[FlinkStreamingConfiguration](classOf[FlinkStreamingConfiguration]).detectConfiguration(typesafe)
+ return setupStreaming(streamingConfiguration)
+ } else if( this.typesafe.getString("mode").equals("batch")) {
+ val batchConfiguration: FlinkBatchConfiguration =
+ new ComponentConfigurator[FlinkBatchConfiguration](classOf[FlinkBatchConfiguration]).detectConfiguration(typesafe)
+ return setupBatch(batchConfiguration)
+ } else {
+ return false;
+ }
+ }
+
+// def setup(typesafe: Config): Boolean = {
+//
+// val streamsConfig = StreamsConfigurator.detectConfiguration(typesafe)
+//
+// this.streamsConfig = streamsConfig
+//
+// BASELOGGER.info("Streams Config: " + streamsConfig)
+//
+// setup(streamsConfig)
+// }
+
+ def setupStreaming(streamingConfiguration: FlinkStreamingConfiguration): Boolean = {
+
+ BASELOGGER.info("FsStreamingFlinkConfiguration: " + streamingConfiguration)
+
+ this.streamsFlinkConfiguration = streamingConfiguration
+
+ if( streamsFlinkConfiguration == null) return false
+
+ if( streamExecutionEnvironment == null )
+ streamExecutionEnvironment = streamEnvironment(streamingConfiguration)
+
+ return false
+
+ }
+
+ def setupBatch(batchConfiguration: FlinkBatchConfiguration): Boolean = {
+
+ BASELOGGER.info("FsBatchFlinkConfiguration: " + batchConfiguration)
+
+ this.streamsFlinkConfiguration = batchConfiguration
+
+ if( streamsFlinkConfiguration == null) return false
+
+ if( executionEnvironment == null )
+ executionEnvironment = batchEnvironment(batchConfiguration)
+
+ return true
+
+ }
+
+ def batchEnvironment(config: FlinkBatchConfiguration = new FlinkBatchConfiguration()) : ExecutionEnvironment = {
+ if (config.getTest == false && config.getLocal == false) {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ return env
+ } else {
+ val env = ExecutionEnvironment.createLocalEnvironment(config.getParallel.toInt)
+ return env
+ }
+ }
+
+ def streamEnvironment(config: FlinkStreamingConfiguration = new FlinkStreamingConfiguration()) : StreamExecutionEnvironment = {
+ if( config.getTest == false && config.getLocal == false) {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+ env.setRestartStrategy(RestartStrategies.noRestart());
+
+ // start a checkpoint every hour
+ env.enableCheckpointing(config.getCheckpointIntervalMs)
+
+ env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
+
+ // checkpoints have to complete within five minutes, or are discarded
+ env.getCheckpointConfig.setCheckpointTimeout(config.getCheckpointTimeoutMs)
+
+ // allow only one checkpoint to be in progress at the same time
+ env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
+
+ return env
+ }
+
+ else return StreamExecutionEnvironment.createLocalEnvironment(config.getParallel.toInt)
+ }
+
+ def buildReaderPath(configObject: HdfsReaderConfiguration) : String = {
+ var inPathBuilder : String = ""
+ if (configObject.getScheme.equals(HdfsConfiguration.Scheme.FILE)) {
+ inPathBuilder = configObject.getPath + "/" + configObject.getReaderPath
+ }
+ else if (configObject.getScheme.equals(HdfsConfiguration.Scheme.HDFS)) {
+ inPathBuilder = configObject.getScheme + "://" + configObject.getHost + ":" + configObject.getPort + "/" + configObject.getPath + "/" + configObject.getReaderPath
+ }
+ else if (configObject.getScheme.toString.equals("s3")) {
+ inPathBuilder = configObject.getScheme + "://" + configObject.getPath + "/" + configObject.getReaderPath
+ } else {
+ throw new Exception("scheme not recognized: " + configObject.getScheme)
+ }
+ return inPathBuilder
+ }
+
+ def buildWriterPath(configObject: HdfsWriterConfiguration) : String = {
+ var outPathBuilder : String = ""
+ if( configObject.getScheme.equals(HdfsConfiguration.Scheme.FILE)) {
+ outPathBuilder = configObject.getPath + "/" + configObject.getWriterPath
+ }
+ else if( configObject.getScheme.equals(HdfsConfiguration.Scheme.HDFS)) {
+ outPathBuilder = configObject.getScheme + "://" + configObject.getHost + ":" + configObject.getPort + "/" + configObject.getPath + "/" + configObject.getWriterPath
+ }
+ else if( configObject.getScheme.toString.equals("s3")) {
+ outPathBuilder = configObject.getScheme + "://" + configObject.getPath + "/" + configObject.getWriterPath
+ } else {
+ throw new Exception("output scheme not recognized: " + configObject.getScheme)
+ }
+ return outPathBuilder
+ }
+
+ def toProviderId(input : String) : String = {
+ if( input.startsWith("@") )
+ return input.substring(1)
+ if( input.contains(':'))
+ return input.substring(input.lastIndexOf(':')+1)
+ else return input
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
new file mode 100644
index 0000000..2ac7d32
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
@@ -0,0 +1,149 @@
+package org.apache.streams.examples.flink.twitter.collection
+
+import java.util.concurrent.TimeUnit
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.google.common.util.concurrent.Uninterruptibles
+import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.core.fs.FileSystem
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}
+import org.apache.flink.streaming.connectors.fs.RollingSink
+import org.apache.flink.util.Collector
+import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
+import org.apache.streams.core.StreamsDatum
+import org.apache.streams.jackson.StreamsJacksonMapper
+import org.apache.streams.twitter.TwitterFollowingConfiguration
+import org.apache.streams.twitter.pojo.Follow
+import org.apache.streams.twitter.provider.TwitterFollowingProvider
+import org.slf4j.{Logger, LoggerFactory}
+import org.apache.flink.api.scala._
+import org.apache.streams.examples.flink.FlinkBase
+import org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration
+import org.apache.streams.flink.{FlinkStreamingConfiguration, StreamsFlinkConfiguration}
+
+import scala.collection.JavaConversions._
+
+/**
+ * Created by sblackmon on 4/20/16.
+ */
+/**
+ * Created by sblackmon on 3/15/16.
+ */
+object FlinkTwitterFollowingPipeline extends FlinkBase {
+
+ val STREAMS_ID: String = "FlinkTwitterFollowingPipeline"
+
+ private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterUserInformationPipeline])
+ private val MAPPER: ObjectMapper = StreamsJacksonMapper.getInstance()
+
+ override def main(args: Array[String]) = {
+ super.main(args)
+ val jobConfig = new ComponentConfigurator[TwitterFollowingPipelineConfiguration](classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(typesafe)
+ if( setup(jobConfig) == false ) System.exit(1)
+ val pipeline: FlinkTwitterFollowingPipeline = new FlinkTwitterFollowingPipeline(jobConfig)
+ val thread = new Thread(pipeline)
+ thread.start()
+ thread.join()
+ }
+
+ def setup(jobConfig: TwitterFollowingPipelineConfiguration): Boolean = {
+
+ LOGGER.info("TwitterFollowingPipelineConfiguration: " + jobConfig)
+
+ if( jobConfig == null ) {
+ LOGGER.error("jobConfig is null!")
+ System.err.println("jobConfig is null!")
+ return false
+ }
+
+ if( jobConfig.getSource == null ) {
+ LOGGER.error("jobConfig.getSource is null!")
+ System.err.println("jobConfig.getSource is null!")
+ return false
+ }
+
+ if( jobConfig.getDestination == null ) {
+ LOGGER.error("jobConfig.getDestination is null!")
+ System.err.println("jobConfig.getDestination is null!")
+ return false
+ }
+
+ if( jobConfig.getTwitter == null ) {
+ LOGGER.error("jobConfig.getTwitter is null!")
+ System.err.println("jobConfig.getTwitter is null!")
+ return false
+ }
+
+ return true
+
+ }
+
+}
+
+class FlinkTwitterFollowingPipeline(config: TwitterFollowingPipelineConfiguration = new ComponentConfigurator[TwitterFollowingPipelineConfiguration](classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)) extends Runnable with java.io.Serializable {
+
+ import FlinkTwitterFollowingPipeline._
+
+ override def run(): Unit = {
+
+ val env: StreamExecutionEnvironment = streamEnvironment(MAPPER.convertValue(config, classOf[FlinkStreamingConfiguration]))
+
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+ env.setNumberOfExecutionRetries(0)
+
+ val inPath = buildReaderPath(config.getSource)
+
+ val outPath = buildWriterPath(config.getDestination)
+
+ val keyed_ids: KeyedStream[String, Int] = env.readTextFile(inPath).setParallelism(10).keyBy( id => (id.hashCode % 100).abs )
+
+ // these datums contain 'Follow' objects
+ val followDatums: DataStream[StreamsDatum] =
+ keyed_ids.flatMap(new FollowingCollectorFlatMapFunction(config.getTwitter)).setParallelism(10)
+
+ val follows: DataStream[Follow] = followDatums
+ .map(datum => datum.getDocument.asInstanceOf[Follow])
+
+ val jsons: DataStream[String] = follows
+ .map(follow => {
+ val MAPPER = StreamsJacksonMapper.getInstance
+ MAPPER.writeValueAsString(follow)
+ })
+
+ if( config.getTest == false )
+ jsons.addSink(new RollingSink[String](outPath)).setParallelism(3)
+ else
+ jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
+ .setParallelism(env.getParallelism);
+
+ // if( test == true ) jsons.print();
+
+ env.execute("FlinkTwitterFollowingPipeline")
+ }
+
+ class FollowingCollectorFlatMapFunction(
+ twitterConfiguration : TwitterFollowingConfiguration = new ComponentConfigurator[TwitterFollowingConfiguration](classOf[TwitterFollowingConfiguration]).detectConfiguration(StreamsConfigurator.getConfig.getConfig("twitter")),
+ flinkConfiguration : StreamsFlinkConfiguration = new ComponentConfigurator[StreamsFlinkConfiguration](classOf[StreamsFlinkConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)
+ ) extends RichFlatMapFunction[String, StreamsDatum] with Serializable {
+
+ override def flatMap(input: String, out: Collector[StreamsDatum]): Unit = {
+ collectConnections(input, out)
+ }
+
+ def collectConnections(id : String, out : Collector[StreamsDatum]) = {
+ val twitProvider: TwitterFollowingProvider =
+ new TwitterFollowingProvider(
+ twitterConfiguration.withIdsOnly(true).withInfo(List(FlinkUtil.toProviderId(id))).withMaxItems(5000l).asInstanceOf[TwitterFollowingConfiguration]
+ )
+ twitProvider.prepare(twitProvider)
+ twitProvider.startStream()
+ var iterator: Iterator[StreamsDatum] = null
+ do {
+ Uninterruptibles.sleepUninterruptibly(flinkConfiguration.getProviderWaitMs, TimeUnit.MILLISECONDS)
+ twitProvider.readCurrent().iterator().toList.map(out.collect(_))
+ } while( twitProvider.isRunning )
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
new file mode 100644
index 0000000..f8e221c
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
@@ -0,0 +1,165 @@
+package org.apache.streams.examples.flink.twitter.collection
+
+import java.util.concurrent.TimeUnit
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.google.common.util.concurrent.Uninterruptibles
+import com.peoplepattern.streams.pdb.pipelines.FlinkStreamingConfiguration
+import com.peoplepattern.streams.pdb.flink.{FlinkBase, FlinkUtil}
+import com.peoplepattern.streams.pipelines.pdb.TwitterPostsPipelineConfiguration
+import com.peoplepattern.streams.twitter.collection.FlinkTwitterPostsPipeline.LOGGER
+import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction}
+import org.apache.flink.api.common.restartstrategy.RestartStrategies
+import org.apache.flink.api.scala.{ExecutionEnvironment, _}
+import org.apache.flink.core.fs.FileSystem
+import org.apache.flink.runtime.state.filesystem.FsStateBackend
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.scala.function.AllWindowFunction
+import org.apache.flink.streaming.api.scala.{AllWindowedStream, DataStream, KeyedStream, StreamExecutionEnvironment}
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
+import org.apache.flink.streaming.connectors.fs.RollingSink
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema
+import org.apache.flink.util.Collector
+import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
+import org.apache.streams.core.StreamsDatum
+import org.apache.streams.examples.flink.FlinkBase
+import org.apache.streams.examples.flink.twitter.TwitterPostsPipelineConfiguration
+import org.apache.streams.flink.FlinkStreamingConfiguration
+import org.apache.streams.hdfs.HdfsConfiguration
+import org.apache.streams.jackson.StreamsJacksonMapper
+import org.apache.streams.twitter.TwitterUserInformationConfiguration
+import org.apache.streams.twitter.pojo.{Tweet, User}
+import org.apache.streams.twitter.provider.{TwitterTimelineProvider, TwitterUserInformationProvider}
+import org.slf4j.{Logger, LoggerFactory}
+
+import scala.collection.JavaConversions._
+
+/**
+ * Created by sblackmon on 7/29/15.
+ */
+object FlinkTwitterPostsPipeline extends FlinkBase {
+
+ val STREAMS_ID: String = "FlinkTwitterPostsPipeline"
+
+ private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterPostsPipeline])
+ private val MAPPER: ObjectMapper = StreamsJacksonMapper.getInstance()
+
+ override def main(args: Array[String]) = {
+ super.main(args)
+ val jobConfig = new ComponentConfigurator[TwitterPostsPipelineConfiguration](classOf[TwitterPostsPipelineConfiguration]).detectConfiguration(typesafe)
+ if( setup(jobConfig) == false ) System.exit(1)
+ val pipeline: FlinkTwitterPostsPipeline = new FlinkTwitterPostsPipeline(jobConfig)
+ val thread = new Thread(pipeline)
+ thread.start()
+ thread.join()
+ }
+
+ def setup(jobConfig: TwitterPostsPipelineConfiguration): Boolean = {
+
+ LOGGER.info("TwitterPostsPipelineConfiguration: " + jobConfig)
+
+ if( jobConfig == null ) {
+ LOGGER.error("jobConfig is null!")
+ System.err.println("jobConfig is null!")
+ return false
+ }
+
+ if( jobConfig.getSource == null ) {
+ LOGGER.error("jobConfig.getSource is null!")
+ System.err.println("jobConfig.getSource is null!")
+ return false
+ }
+
+ if( jobConfig.getDestination == null ) {
+ LOGGER.error("jobConfig.getDestination is null!")
+ System.err.println("jobConfig.getDestination is null!")
+ return false
+ }
+
+ if( jobConfig.getTwitter == null ) {
+ LOGGER.error("jobConfig.getTwitter is null!")
+ System.err.println("jobConfig.getTwitter is null!")
+ return false
+ }
+
+ return true
+
+ }
+
+}
+
+class FlinkTwitterPostsPipeline(config: TwitterPostsPipelineConfiguration = new ComponentConfigurator[TwitterPostsPipelineConfiguration](classOf[TwitterPostsPipelineConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)) extends Runnable with java.io.Serializable {
+
+ import FlinkTwitterPostsPipeline._
+
+ override def run(): Unit = {
+
+ val env: StreamExecutionEnvironment = streamEnvironment(MAPPER.convertValue(config, classOf[FlinkStreamingConfiguration]))
+
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+ env.setNumberOfExecutionRetries(0)
+
+ val inPath = buildReaderPath(config.getSource)
+
+ val outPath = buildWriterPath(config.getDestination)
+
+ //val inProps = buildKafkaProps(config.getSourceTopic)
+
+ val ids: DataStream[String] = env.readTextFile(inPath).setParallelism(10).name("ids")
+
+ //val idTopicIn = new KafkaSink()
+
+// val idTopicOut : DataStream[String] = env.addSource[String](
+// new org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09(config.getSourceTopic.getTopic, new SimpleStringSchema(),
+// inProps));
+
+ val keyed_ids: KeyedStream[String, Int] = env.readTextFile(inPath).setParallelism(10).name("keyed_ids").keyBy( id => (id.hashCode % 100).abs )
+
+ // these datums contain 'Tweet' objects
+ val tweetDatums: DataStream[StreamsDatum] =
+ keyed_ids.flatMap(new postCollectorFlatMapFunction).setParallelism(10).name("tweetDatums")
+
+ val tweets: DataStream[Tweet] = tweetDatums
+ .map(datum => datum.getDocument.asInstanceOf[Tweet]).name("tweets")
+
+ val jsons: DataStream[String] = tweets
+ .map(tweet => {
+ val MAPPER = StreamsJacksonMapper.getInstance
+ MAPPER.writeValueAsString(tweet)
+ }).name("json")
+
+ if( config.getTest == false )
+ jsons.addSink(new RollingSink[String](outPath)).setParallelism(3).name("hdfs")
+ else
+ jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
+ .setParallelism(env.getParallelism);
+
+ // if( test == true ) jsons.print();
+
+ env.execute("FlinkTwitterPostsPipeline")
+ }
+
+ class postCollectorFlatMapFunction extends RichFlatMapFunction[String, StreamsDatum] with Serializable {
+ override def flatMap(input: String, out: Collector[StreamsDatum]): Unit = {
+ collectPosts(input, out)
+ }
+ def collectPosts(id : String, out : Collector[StreamsDatum]) = {
+ val twitterConfiguration = config.getTwitter
+ val twitProvider: TwitterTimelineProvider =
+ new TwitterTimelineProvider(
+ twitterConfiguration.withInfo(List(FlinkUtil.toProviderId(id))).withMaxItems(200l)
+ )
+ twitProvider.prepare(twitProvider)
+ twitProvider.startStream()
+ var iterator: Iterator[StreamsDatum] = null
+ do {
+ Uninterruptibles.sleepUninterruptibly(config.getProviderWaitMs, TimeUnit.MILLISECONDS)
+ twitProvider.readCurrent().iterator().toList.map(out.collect(_))
+ } while( twitProvider.isRunning )
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
new file mode 100644
index 0000000..a081c74
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
@@ -0,0 +1,163 @@
+package org.apache.streams.examples.flink.twitter.collection
+
+import java.lang
+import java.util.concurrent.TimeUnit
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import org.apache.flink.core.fs.FileSystem
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.scala.function.{AllWindowFunction, WindowFunction}
+import org.apache.flink.streaming.api.windowing.assigners.{GlobalWindows, TumblingEventTimeWindows}
+
+import scala.collection.JavaConversions._
+import com.peoplepattern.streams.twitter.collection.FlinkTwitterUserInformationPipeline.LOGGER
+import com.google.common.util.concurrent.Uninterruptibles
+import org.apache.streams.examples.flink.FlinkBase
+import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.api.common.restartstrategy.RestartStrategies
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.streaming.api.scala.{AllWindowedStream, DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.triggers._
+import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, TimeWindow, Window}
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.{ExecutionEnvironment, _}
+import org.apache.flink.runtime.state.filesystem.FsStateBackend
+import org.apache.flink.streaming.connectors.fs.RollingSink
+import org.apache.flink.util.Collector
+import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
+import org.apache.streams.core.StreamsDatum
+import org.apache.streams.examples.flink.FlinkBase
+import org.apache.streams.examples.flink.twitter.TwitterUserInformationPipelineConfiguration
+import org.apache.streams.flink.FlinkStreamingConfiguration
+import org.apache.streams.hdfs.HdfsConfiguration
+import org.apache.streams.jackson.StreamsJacksonMapper
+import org.apache.streams.twitter.TwitterUserInformationConfiguration
+import org.apache.streams.twitter.pojo.{Tweet, User}
+import org.apache.streams.twitter.provider.{TwitterTimelineProvider, TwitterUserInformationProvider}
+import org.slf4j.{Logger, LoggerFactory}
+
+/**
+ * Created by sblackmon on 3/15/16.
+ */
+object FlinkTwitterUserInformationPipeline extends FlinkBase {
+
+ val STREAMS_ID: String = "FlinkTwitterUserInformationPipeline"
+
+ private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterUserInformationPipeline])
+ private val MAPPER: ObjectMapper = StreamsJacksonMapper.getInstance()
+
+ override def main(args: Array[String]) = {
+ super.main(args)
+ val jobConfig = new ComponentConfigurator[TwitterUserInformationPipelineConfiguration](classOf[TwitterUserInformationPipelineConfiguration]).detectConfiguration(typesafe)
+ if( setup(jobConfig) == false ) System.exit(1)
+ val pipeline: FlinkTwitterUserInformationPipeline = new FlinkTwitterUserInformationPipeline(jobConfig)
+ val thread = new Thread(pipeline)
+ thread.start()
+ thread.join()
+ }
+
+ def setup(jobConfig: TwitterUserInformationPipelineConfiguration): Boolean = {
+
+ LOGGER.info("TwitterFollowingPipelineConfiguration: " + jobConfig)
+
+ if( jobConfig == null ) {
+ LOGGER.error("jobConfig is null!")
+ System.err.println("jobConfig is null!")
+ return false
+ }
+
+ if( jobConfig.getSource == null ) {
+ LOGGER.error("jobConfig.getSource is null!")
+ System.err.println("jobConfig.getSource is null!")
+ return false
+ }
+
+ if( jobConfig.getDestination == null ) {
+ LOGGER.error("jobConfig.getDestination is null!")
+ System.err.println("jobConfig.getDestination is null!")
+ return false
+ }
+
+ if( jobConfig.getTwitter == null ) {
+ LOGGER.error("jobConfig.getTwitter is null!")
+ System.err.println("jobConfig.getTwitter is null!")
+ return false
+ }
+
+ return true
+
+ }
+
+}
+
+class FlinkTwitterUserInformationPipeline(config: TwitterUserInformationPipelineConfiguration = new ComponentConfigurator[TwitterUserInformationPipelineConfiguration](classOf[TwitterUserInformationPipelineConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)) extends Runnable with java.io.Serializable {
+
+ import FlinkTwitterUserInformationPipeline._
+
+ override def run(): Unit = {
+
+ val env: StreamExecutionEnvironment = streamEnvironment(MAPPER.convertValue(config, classOf[FlinkStreamingConfiguration]))
+
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+ env.setNumberOfExecutionRetries(0)
+
+ val inPath = buildReaderPath(config.getSource)
+
+ val outPath = buildWriterPath(config.getDestination)
+
+ val ids: DataStream[String] = env.readTextFile(inPath).setParallelism(10).name("ids")
+
+ val keyed_ids: KeyedStream[String, Int] = ids.name("keyed_ids").keyBy( id => (id.hashCode % 100).abs )
+
+ val idWindows: WindowedStream[String, Int, GlobalWindow] = keyed_ids.countWindow(100)
+
+ val idLists: DataStream[List[String]] = idWindows.apply[List[String]] (new idListWindowFunction()).name("idLists")
+
+ val userDatums: DataStream[StreamsDatum] = idLists.flatMap(new profileCollectorFlatMapFunction).setParallelism(10).name("userDatums")
+
+ val user: DataStream[User] = userDatums.map(datum => datum.getDocument.asInstanceOf[User]).name("users")
+
+ val jsons: DataStream[String] = user
+ .map(user => {
+ val MAPPER = StreamsJacksonMapper.getInstance
+ MAPPER.writeValueAsString(user)
+ }).name("jsons")
+
+ if( config.getTest == false )
+ jsons.addSink(new RollingSink[String](outPath)).setParallelism(3).name("hdfs")
+ else
+ jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
+ .setParallelism(env.getParallelism);
+
+ LOGGER.info("StreamExecutionEnvironment: {}", env.toString )
+
+ env.execute("FlinkTwitterUserInformationPipeline")
+ }
+
+ class idListWindowFunction extends WindowFunction[String, List[String], Int, GlobalWindow] {
+ override def apply(key: Int, window: GlobalWindow, input: Iterable[String], out: Collector[List[String]]): Unit = {if( input.size > 0 )
+ out.collect(input.map(id => FlinkUtil.toProviderId(id)).toList)
+ }
+ }
+
+ class profileCollectorFlatMapFunction extends RichFlatMapFunction[List[String], StreamsDatum] with Serializable {
+ override def flatMap(input: List[String], out: Collector[StreamsDatum]): Unit = {
+ collectProfiles(input, out)
+ }
+ def collectProfiles(ids : List[String], out : Collector[StreamsDatum]) = {
+ val twitterConfiguration = config.getTwitter
+ val twitProvider: TwitterUserInformationProvider =
+ new TwitterUserInformationProvider(
+ twitterConfiguration.withInfo(ids)
+ )
+ twitProvider.prepare(twitProvider)
+ twitProvider.startStream()
+ var iterator: Iterator[StreamsDatum] = null
+ do {
+ Uninterruptibles.sleepUninterruptibly(config.getProviderWaitMs, TimeUnit.MILLISECONDS)
+ twitProvider.readCurrent().iterator().toList.map(out.collect(_))
+ } while( twitProvider.isRunning )
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterFollowingPipeline.md
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterFollowingPipeline.md b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterFollowingPipeline.md
new file mode 100644
index 0000000..22f30f5
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterFollowingPipeline.md
@@ -0,0 +1,41 @@
+FlinkTwitterFollowingPipeline
+=============================
+
+Description:
+-----------------
+
+Collects twitter friends or followers with flink.
+
+Specification:
+-----------------
+
+[FlinkTwitterFollowingPipeline.dot](FlinkTwitterFollowingPipeline.dot "FlinkTwitterFollowingPipeline.dot" )
+
+Diagram:
+-----------------
+
+![FlinkTwitterFollowingPipeline.dot.svg](./FlinkTwitterFollowingPipeline.dot.svg)
+
+Example Configuration:
+----------------------
+
+[FlinkTwitterFollowingPipeline.json](FlinkTwitterFollowingPipeline.json "FlinkTwitterFollowingPipeline.json" )
+
+Run (Local):
+------------
+
+ java -cp dist/flink-twitter-collection-jar-with-dependencies.jar -Dconfig.file=file://<location_of_config_file> org.apache.streams.examples.flink.twitter.collection.FlinkTwitterFollowingPipeline
+
+Run (Flink):
+------------
+
+ flink-run.sh dist/flink-twitter-collection-jar-with-dependencies.jar org.apache.streams.examples.flink.twitter.collection.FlinkTwitterFollowingPipeline http://<location_of_config_file>
+
+Run (YARN):
+-----------
+
+ flink-run.sh yarn dist/flink-twitter-collection-jar-with-dependencies.jar org.apache.streams.examples.flink.twitter.collection.FlinkTwitterFollowingPipeline http://<location_of_config_file>
+
+[JavaDocs](apidocs/index.html "JavaDocs")
+
+###### Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterPostsPipeline.md
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterPostsPipeline.md b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterPostsPipeline.md
new file mode 100644
index 0000000..5f77994
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterPostsPipeline.md
@@ -0,0 +1,41 @@
+FlinkTwitterPostsPipeline
+=========================
+
+Description:
+-----------------
+
+Collects twitter posts with flink.
+
+Specification:
+-----------------
+
+[FlinkTwitterPostsPipeline.dot](FlinkTwitterPostsPipeline.dot "FlinkTwitterPostsPipeline.dot" )
+
+Diagram:
+-----------------
+
+![FlinkTwitterPostsPipeline.dot.svg](./FlinkTwitterPostsPipeline.dot.svg)
+
+Example Configuration:
+----------------------
+
+[FlinkTwitterPostsPipeline.json](FlinkTwitterPostsPipeline.json "FlinkTwitterPostsPipeline.json" )
+
+Run (Local):
+------------
+
+ java -cp dist/flink-twitter-collection-jar-with-dependencies.jar -Dconfig.file=file://<location_of_config_file> org.apache.streams.examples.flink.twitter.collection.FlinkTwitterPostsPipeline
+
+Run (Flink):
+------------
+
+ flink-run.sh dist/flink-twitter-collection-jar-with-dependencies.jar org.apache.streams.examples.flink.twitter.collection.FlinkTwitterPostsPipeline http://<location_of_config_file>
+
+Run (YARN):
+-----------
+
+ flink-run.sh yarn dist/flink-twitter-collection-jar-with-dependencies.jar org.apache.streams.examples.flink.twitter.collection.FlinkTwitterPostsPipeline http://<location_of_config_file>
+
+[JavaDocs](apidocs/index.html "JavaDocs")
+
+###### Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterUserInformationPipeline.md
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterUserInformationPipeline.md b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterUserInformationPipeline.md
new file mode 100644
index 0000000..5e0d1fe
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterUserInformationPipeline.md
@@ -0,0 +1,41 @@
+FlinkTwitterUserInformationPipeline
+===================================
+
+Description:
+-----------------
+
+Collects twitter users with flink.
+
+Specification:
+-----------------
+
+[FlinkTwitterUserInformationPipeline.dot](FlinkTwitterUserInformationPipeline.dot "FlinkTwitterUserInformationPipeline.dot" )
+
+Diagram:
+-----------------
+
+![TwitterUserInformationPipeline.dot.svg](./TwitterUserInformationPipeline.dot.svg)
+
+Example Configuration:
+----------------------
+
+[FlinkTwitterUserInformationPipeline.json](FlinkTwitterUserInformationPipeline.json "FlinkTwitterUserInformationPipeline.json" )
+
+Run (Local):
+------------
+
+ java -cp dist/flink-twitter-collection-jar-with-dependencies.jar -Dconfig.file=file://<location_of_config_file> org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline
+
+Run (Flink):
+------------
+
+ flink-run.sh dist/flink-twitter-collection-jar-with-dependencies.jar org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline http://<location_of_config_file>
+
+Run (YARN):
+-----------
+
+ flink-run.sh yarn dist/flink-twitter-collection-jar-with-dependencies.jar org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline http://<location_of_config_file>
+
+[JavaDocs](apidocs/index.html "JavaDocs")
+
+###### Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/markdown/index.md
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/markdown/index.md b/flink/flink-twitter-collection/src/site/markdown/index.md
new file mode 100644
index 0000000..19e44cf
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/markdown/index.md
@@ -0,0 +1,32 @@
+Apache Streams (incubating)
+Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0
+--------------------------------------------------------------------------------
+
+flink-twitter-collection
+========================
+
+Requirements:
+-------------
+ - Authorized Twitter API credentials
+
+Description:
+------------
+Collects large batches of documents from api.twitter.com from a seed set of ids.
+
+Streams:
+--------
+
+<a href="FlinkTwitterUserInformationPipeline.html" target="_self">FlinkTwitterUserInformationPipeline</a>
+
+<a href="FlinkTwitterPostsPipeline.html" target="_self">FlinkTwitterPostsPipeline</a>
+
+<a href="FlinkTwitterFollowingPipeline.html" target="_self">FlinkTwitterFollowingPipeline</a>
+
+Build:
+---------
+
+ mvn clean install verify
+
+[JavaDocs](apidocs/index.html "JavaDocs")
+
+###### Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/resources/FlinkBatchConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/resources/FlinkBatchConfiguration.json b/flink/flink-twitter-collection/src/site/resources/FlinkBatchConfiguration.json
new file mode 100644
index 0000000..30a2942
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/resources/FlinkBatchConfiguration.json
@@ -0,0 +1,12 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "type": "object",
+ "javaType" : "org.apache.streams.flink.FlinkBatchConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "extends": {
+ "$ref": "StreamsFlinkConfiguration.json"
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/resources/FlinkStreamingConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/resources/FlinkStreamingConfiguration.json b/flink/flink-twitter-collection/src/site/resources/FlinkStreamingConfiguration.json
new file mode 100644
index 0000000..0d63f4e
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/resources/FlinkStreamingConfiguration.json
@@ -0,0 +1,40 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "type": "object",
+ "javaType" : "org.apache.streams.flink.FlinkStreamingConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "extends": {
+ "$ref": "StreamsFlinkConfiguration.json"
+ },
+ "properties": {
+ "parallel": {
+ "type": "integer",
+ "default": 1
+ },
+ "providerWaitMs": {
+ "type": "integer",
+ "default": 1000
+ },
+ "checkpointIntervalMs": {
+ "type": "integer",
+ "default": 300000
+ },
+ "checkpointTimeoutMs": {
+ "type": "integer",
+ "default": 30000
+ },
+ "restartAttempts": {
+ "type": "integer",
+ "description": "number of restart attempts",
+ "default": 3
+ },
+ "restartDelayMs": {
+ "type": "integer",
+ "description": "delay in milliseconds",
+ "default": 10000
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/resources/StreamsFlinkConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/resources/StreamsFlinkConfiguration.json b/flink/flink-twitter-collection/src/site/resources/StreamsFlinkConfiguration.json
new file mode 100644
index 0000000..ef78357
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/resources/StreamsFlinkConfiguration.json
@@ -0,0 +1,48 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "type": "object",
+ "javaType" : "org.apache.streams.flink.StreamsFlinkConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "extends": {
+ "$ref": "http://streams.peoplepattern.com/incubator-streams/0.3.9-PP-SNAPSHOT/streams-config/StreamsConfiguration.json"
+ },
+ "properties": {
+ "parallel": {
+ "type": "integer",
+ "default": 1
+ },
+ "providerWaitMs": {
+ "type": "integer",
+ "default": 1000
+ },
+ "checkpointIntervalMs": {
+ "type": "integer",
+ "default": 300000
+ },
+ "checkpointTimeoutMs": {
+ "type": "integer",
+ "default": 30000
+ },
+ "test": {
+ "type": "boolean",
+ "default": false
+ },
+ "local": {
+ "type": "boolean",
+ "default": true
+ },
+ "restartAttempts": {
+ "type": "integer",
+ "description": "number of restart attempts",
+ "default": 3
+ },
+ "restartDelayMs": {
+ "type": "integer",
+ "description": "delay in milliseconds",
+ "default": 10000
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/resources/TwitterFollowingBatchConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/resources/TwitterFollowingBatchConfiguration.json b/flink/flink-twitter-collection/src/site/resources/TwitterFollowingBatchConfiguration.json
new file mode 100644
index 0000000..33afb29
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/resources/TwitterFollowingBatchConfiguration.json
@@ -0,0 +1,23 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "type": "object",
+ "javaType" : "com.peoplepattern.streams.pipelines.pdb.TwitterFollowingBatchConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "properties": {
+ "twitter": {
+ "type": "object",
+ "javaType": "org.apache.streams.twitter.TwitterFollowingConfiguration"
+ },
+ "hdfs": {
+ "type": "object",
+ "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration"
+ },
+ "s3": {
+ "type": "object",
+ "javaType": "org.apache.streams.s3.S3WriterConfiguration"
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/resources/TwitterFollowingPipelineConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/resources/TwitterFollowingPipelineConfiguration.json b/flink/flink-twitter-collection/src/site/resources/TwitterFollowingPipelineConfiguration.json
new file mode 100644
index 0000000..de4f9bb
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/resources/TwitterFollowingPipelineConfiguration.json
@@ -0,0 +1,29 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "type": "object",
+ "javaType" : "org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "extends": {
+ "$ref": "FlinkStreamingConfiguration.json"
+ },
+ "properties": {
+ "twitter": {
+ "type": "object",
+ "javaType": "org.apache.streams.twitter.TwitterFollowingConfiguration"
+ },
+ "source": {
+ "type": "object",
+ "javaType": "org.apache.streams.hdfs.HdfsReaderConfiguration"
+ },
+ "destination": {
+ "type": "object",
+ "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration"
+ },
+ "providerWaitMs": {
+ "type": "integer"
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/resources/TwitterPostsBatchConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/resources/TwitterPostsBatchConfiguration.json b/flink/flink-twitter-collection/src/site/resources/TwitterPostsBatchConfiguration.json
new file mode 100644
index 0000000..376bb4d
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/resources/TwitterPostsBatchConfiguration.json
@@ -0,0 +1,23 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "type": "object",
+ "javaType" : "com.peoplepattern.streams.pipelines.pdb.TwitterPostsBatchConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "properties": {
+ "twitter": {
+ "type": "object",
+ "javaType": "org.apache.streams.twitter.TwitterUserInformationConfiguration"
+ },
+ "hdfs": {
+ "type": "object",
+ "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration"
+ },
+ "s3": {
+ "type": "object",
+ "javaType": "org.apache.streams.s3.S3WriterConfiguration"
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/resources/TwitterPostsPipelineConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/resources/TwitterPostsPipelineConfiguration.json b/flink/flink-twitter-collection/src/site/resources/TwitterPostsPipelineConfiguration.json
new file mode 100644
index 0000000..628d7ee
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/resources/TwitterPostsPipelineConfiguration.json
@@ -0,0 +1,29 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "type": "object",
+ "javaType" : "org.apache.streams.examples.flink.twitter.TwitterPostsPipelineConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "extends": {
+ "$ref": "FlinkStreamingConfiguration.json"
+ },
+ "properties": {
+ "twitter": {
+ "type": "object",
+ "javaType": "org.apache.streams.twitter.TwitterUserInformationConfiguration"
+ },
+ "source": {
+ "type": "object",
+ "javaType": "org.apache.streams.hdfs.HdfsReaderConfiguration"
+ },
+ "destination": {
+ "type": "object",
+ "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration"
+ },
+ "providerWaitMs": {
+ "type": "integer"
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/resources/TwitterUserInformationBatchConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/resources/TwitterUserInformationBatchConfiguration.json b/flink/flink-twitter-collection/src/site/resources/TwitterUserInformationBatchConfiguration.json
new file mode 100644
index 0000000..55f9fbd
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/resources/TwitterUserInformationBatchConfiguration.json
@@ -0,0 +1,23 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "type": "object",
+ "javaType" : "com.peoplepattern.streams.pipelines.pdb.TwitterUserInformationBatchConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "properties": {
+ "twitter": {
+ "type": "object",
+ "javaType": "org.apache.streams.twitter.TwitterUserInformationConfiguration"
+ },
+ "hdfs": {
+ "type": "object",
+ "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration"
+ },
+ "s3": {
+ "type": "object",
+ "javaType": "org.apache.streams.s3.S3WriterConfiguration"
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/resources/TwitterUserInformationPipelineConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/resources/TwitterUserInformationPipelineConfiguration.json b/flink/flink-twitter-collection/src/site/resources/TwitterUserInformationPipelineConfiguration.json
new file mode 100644
index 0000000..5261748
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/resources/TwitterUserInformationPipelineConfiguration.json
@@ -0,0 +1,29 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "type": "object",
+ "javaType" : "org.apache.streams.examples.flink.twitter.TwitterUserInformationPipelineConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "extends": {
+ "$ref": "FlinkStreamingConfiguration.json"
+ },
+ "properties": {
+ "twitter": {
+ "type": "object",
+ "javaType": "org.apache.streams.twitter.TwitterUserInformationConfiguration"
+ },
+ "source": {
+ "type": "object",
+ "javaType": "org.apache.streams.hdfs.HdfsReaderConfiguration"
+ },
+ "destination": {
+ "type": "object",
+ "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration"
+ },
+ "providerWaitMs": {
+ "type": "integer"
+ }
+ }
+}
\ No newline at end of file