You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/10/09 21:35:44 UTC

[3/6] incubator-streams-examples git commit: flink examples building and running

flink examples building and running


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/commit/0112a838
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/tree/0112a838
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/diff/0112a838

Branch: refs/heads/master
Commit: 0112a83874bb7f896a4e3964d5fde75e5967afe6
Parents: 4491cfe
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Thu Sep 29 19:15:39 2016 -0500
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Thu Sep 29 19:15:39 2016 -0500

----------------------------------------------------------------------
 flink/flink-twitter-collection/pom.xml          | 127 +++++++++++------
 .../TwitterSpritzerPipelineConfiguration.json   |  29 ++++
 .../FlinkTwitterFollowingPipeline.scala         |  11 +-
 .../collection/FlinkTwitterPostsPipeline.scala  |  22 ++-
 .../FlinkTwitterSpritzerPipeline.scala          | 138 +++++++++++++++++++
 .../FlinkTwitterUserInformationPipeline.scala   |  28 ++--
 .../TwitterSpritzerPipelineConfiguration.json   |  29 ++++
 .../FlinkTwitterFollowingPipeline.conf          |  10 --
 ...linkTwitterFollowingPipelineFollowersIT.conf |  16 +++
 .../FlinkTwitterFollowingPipelineFriendsIT.conf |  16 +++
 .../resources/FlinkTwitterPostsPipeline.conf    |  10 --
 .../resources/FlinkTwitterPostsPipelineIT.conf  |  15 ++
 .../FlinkTwitterUserInformationPipeline.conf    |  10 --
 .../FlinkTwitterUserInformationPipelineIT.conf  |  15 ++
 .../test/FlinkTwitterFollowingPipelineIT.scala  |  71 +++++-----
 .../test/FlinkTwitterPostsPipelineIT.scala      |  38 ++---
 .../test/FlinkTwitterSpritzerPipelineIT.scala   |  57 ++++++++
 .../FlinkTwitterUserInformationPipelineIT.scala |  33 +++--
 flink/pom.xml                                   |   3 -
 local/elasticsearch-hdfs/pom.xml                |  14 +-
 local/elasticsearch-reindex/pom.xml             |   2 +-
 local/mongo-elasticsearch-sync/pom.xml          |  12 +-
 local/twitter-follow-graph/pom.xml              |  10 +-
 local/twitter-history-elasticsearch/pom.xml     |  14 +-
 local/twitter-userstream-elasticsearch/pom.xml  |  14 +-
 25 files changed, 543 insertions(+), 201 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/pom.xml
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/pom.xml b/flink/flink-twitter-collection/pom.xml
index 33b05fe..2d35035 100644
--- a/flink/flink-twitter-collection/pom.xml
+++ b/flink/flink-twitter-collection/pom.xml
@@ -34,17 +34,38 @@
     <description>Collects twitter documents using flink.</description>
 
     <properties>
-        <docker.repo>apachestreams</docker.repo>
+        <testng.version>6.9.10</testng.version>
         <hdfs.version>2.7.0</hdfs.version>
         <flink.version>1.1.2</flink.version>
+        <scala.version>2.10.6</scala.version>
+        <scalatest.version>2.2.5</scalatest.version>
         <scala.suffix>2.10</scala.suffix>
+        <scala-maven.plugin.version>3.2.2</scala-maven.plugin.version>
     </properties>
 
     <dependencies>
         <dependency>
-            <groupId>org.hamcrest</groupId>
-            <artifactId>hamcrest-all</artifactId>
-            <version>1.3</version>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-compiler</artifactId>
+            <version>${scala.version}</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+            <version>${scala.version}</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-reflect</artifactId>
+            <version>${scala.version}</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.scalatest</groupId>
+            <artifactId>scalatest_${scala.suffix}</artifactId>
+            <version>${scalatest.version}</version>
             <scope>test</scope>
         </dependency>
         <dependency>
@@ -65,13 +86,11 @@
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-util</artifactId>
             <version>${project.version}</version>
-            <type>test-jar</type>
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-pojo</artifactId>
             <version>${project.version}</version>
-            <type>test-jar</type>
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
@@ -277,6 +296,19 @@
             <artifactId>logback-core</artifactId>
             <version>${logback.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.testng</groupId>
+            <artifactId>testng</artifactId>
+            <version>${testng.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-all</artifactId>
+            <version>1.3</version>
+            <scope>test</scope>
+        </dependency>
+
     </dependencies>
 
     <build>
@@ -293,17 +325,6 @@
             </testResource>
         </testResources>
         <plugins>
-            <plugin>
-                <artifactId>maven-clean-plugin</artifactId>
-                <configuration>
-                    <filesets>
-                        <fileset>
-                            <directory>data</directory>
-                            <followSymlinks>false</followSymlinks>
-                        </fileset>
-                    </filesets>
-                </configuration>
-            </plugin>
             <!-- This binary runs with logback -->
             <!-- Keep log4j out -->
             <plugin>
@@ -334,8 +355,56 @@
             </executions>
         </plugin>
         <plugin>
+            <groupId>net.alchim31.maven</groupId>
+            <artifactId>scala-maven-plugin</artifactId>
+            <version>${scala-maven.plugin.version}</version>
+            <executions>
+                <execution>
+                    <id>scala-compile-first</id>
+                    <phase>process-resources</phase>
+                    <goals>
+                        <goal>add-source</goal>
+                        <goal>compile</goal>
+                    </goals>
+                </execution>
+                <execution>
+                    <id>scala-test-compile</id>
+                    <phase>process-test-resources</phase>
+                    <goals>
+                        <goal>testCompile</goal>
+                    </goals>
+                </execution>
+            </executions>
+        </plugin>
+        <plugin>
             <groupId>org.apache.maven.plugins</groupId>
             <artifactId>maven-shade-plugin</artifactId>
+            <executions>
+                <execution>
+                    <phase>package</phase>
+                    <goals>
+                        <goal>shade</goal>
+                    </goals>
+                    <configuration>
+                        <finalName>${project.build.finalName}</finalName>
+                        <filters>
+                            <filter>
+                                <artifact>*:*</artifact>
+                                <excludes>
+                                    <exclude>META-INF/*.SF</exclude>
+                                    <exclude>META-INF/*.DSA</exclude>
+                                    <exclude>META-INF/*.RSA</exclude>
+                                    <exclude>**/logback.xml</exclude>
+                                    <exclude>**/log4j.properties</exclude>
+                                </excludes>
+                            </filter>
+                        </filters>
+                        <transformers>
+                            <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+                        </transformers>
+                    </configuration>
+                </execution>
+            </executions>
         </plugin>
         <plugin>
             <groupId>org.jsonschema2pojo</groupId>
@@ -348,7 +417,6 @@
                     <sourcePath>src/main/jsonschema</sourcePath>
                 </sourcePaths>
                 <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
-                <targetPackage>org.apache.streams.example.elasticsearch</targetPackage>
                 <useJodaDates>false</useJodaDates>
             </configuration>
             <executions>
@@ -379,25 +447,6 @@
             </plugin>
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-dependency-plugin</artifactId>
-                <configuration>
-                    <includes>**/*.json</includes>
-                    <outputDirectory>${project.build.directory}/test-classes</outputDirectory>
-                    <includeGroupIds>org.apache.streams</includeGroupIds>
-                    <includeTypes>test-jar</includeTypes>
-                </configuration>
-                <executions>
-                    <execution>
-                        <id>test-resource-dependencies</id>
-                        <phase>process-test-resources</phase>
-                        <goals>
-                            <goal>unpack-dependencies</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-failsafe-plugin</artifactId>
                 <version>2.12.4</version>
                 <executions>
@@ -410,10 +459,6 @@
                     </execution>
                 </executions>
             </plugin>
-            <plugin>
-                <groupId>io.fabric8</groupId>
-                <artifactId>docker-maven-plugin</artifactId>
-            </plugin>
         </plugins>
     </build>
 

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/main/jsonschema/TwitterSpritzerPipelineConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/jsonschema/TwitterSpritzerPipelineConfiguration.json b/flink/flink-twitter-collection/src/main/jsonschema/TwitterSpritzerPipelineConfiguration.json
new file mode 100644
index 0000000..49d0d1e
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/jsonschema/TwitterSpritzerPipelineConfiguration.json
@@ -0,0 +1,29 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "type": "object",
+  "javaType" : "org.apache.streams.examples.flink.twitter.TwitterSpritzerPipelineConfiguration",
+  "javaInterfaces": ["java.io.Serializable"],
+  "extends": {
+    "$ref": "FlinkStreamingConfiguration.json"
+  },
+  "properties": {
+    "twitter": {
+      "type": "object",
+      "javaType": "org.apache.streams.twitter.TwitterStreamConfiguration"
+    },
+    "source": {
+      "type": "object",
+      "javaType": "org.apache.streams.hdfs.HdfsReaderConfiguration"
+    },
+    "destination": {
+      "type": "object",
+      "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration"
+    },
+    "providerWaitMs": {
+      "type": "integer"
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
index 2ac7d32..2fd9336 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
@@ -3,6 +3,7 @@ package org.apache.streams.examples.flink.twitter.collection
 import java.util.concurrent.TimeUnit
 
 import com.fasterxml.jackson.databind.ObjectMapper
+import com.google.common.base.{Preconditions, Strings}
 import com.google.common.util.concurrent.Uninterruptibles
 import org.apache.flink.api.common.functions.RichFlatMapFunction
 import org.apache.flink.core.fs.FileSystem
@@ -17,10 +18,10 @@ import org.apache.streams.twitter.TwitterFollowingConfiguration
 import org.apache.streams.twitter.pojo.Follow
 import org.apache.streams.twitter.provider.TwitterFollowingProvider
 import org.slf4j.{Logger, LoggerFactory}
-import org.apache.flink.api.scala._
 import org.apache.streams.examples.flink.FlinkBase
 import org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration
 import org.apache.streams.flink.{FlinkStreamingConfiguration, StreamsFlinkConfiguration}
+import org.apache.flink.api.scala._
 
 import scala.collection.JavaConversions._
 
@@ -75,6 +76,12 @@ object FlinkTwitterFollowingPipeline extends FlinkBase {
             return false
         }
 
+        Preconditions.checkNotNull(jobConfig.getTwitter.getOauth)
+        Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessToken))
+        Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessTokenSecret))
+        Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerKey))
+        Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerSecret))
+
         return true
 
     }
@@ -134,7 +141,7 @@ class FlinkTwitterFollowingPipeline(config: TwitterFollowingPipelineConfiguratio
         def collectConnections(id : String, out : Collector[StreamsDatum]) = {
             val twitProvider: TwitterFollowingProvider =
                 new TwitterFollowingProvider(
-                    twitterConfiguration.withIdsOnly(true).withInfo(List(FlinkUtil.toProviderId(id))).withMaxItems(5000l).asInstanceOf[TwitterFollowingConfiguration]
+                    twitterConfiguration.withIdsOnly(true).withInfo(List(toProviderId(id))).withMaxItems(5000l).asInstanceOf[TwitterFollowingConfiguration]
                 )
             twitProvider.prepare(twitProvider)
             twitProvider.startStream()

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
index f8e221c..beea973 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
@@ -3,11 +3,8 @@ package org.apache.streams.examples.flink.twitter.collection
 import java.util.concurrent.TimeUnit
 
 import com.fasterxml.jackson.databind.ObjectMapper
+import com.google.common.base.{Preconditions, Strings}
 import com.google.common.util.concurrent.Uninterruptibles
-import com.peoplepattern.streams.pdb.pipelines.FlinkStreamingConfiguration
-import com.peoplepattern.streams.pdb.flink.{FlinkBase, FlinkUtil}
-import com.peoplepattern.streams.pipelines.pdb.TwitterPostsPipelineConfiguration
-import com.peoplepattern.streams.twitter.collection.FlinkTwitterPostsPipeline.LOGGER
 import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction}
 import org.apache.flink.api.common.restartstrategy.RestartStrategies
 import org.apache.flink.api.scala.{ExecutionEnvironment, _}
@@ -33,6 +30,7 @@ import org.apache.streams.twitter.TwitterUserInformationConfiguration
 import org.apache.streams.twitter.pojo.{Tweet, User}
 import org.apache.streams.twitter.provider.{TwitterTimelineProvider, TwitterUserInformationProvider}
 import org.slf4j.{Logger, LoggerFactory}
+import org.apache.flink.api.scala._
 
 import scala.collection.JavaConversions._
 
@@ -84,6 +82,12 @@ object FlinkTwitterPostsPipeline extends FlinkBase {
       return false
     }
 
+    Preconditions.checkNotNull(jobConfig.getTwitter.getOauth)
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessToken))
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessTokenSecret))
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerKey))
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerSecret))
+
     return true
 
   }
@@ -105,16 +109,8 @@ class FlinkTwitterPostsPipeline(config: TwitterPostsPipelineConfiguration = new
 
     val outPath = buildWriterPath(config.getDestination)
 
-    //val inProps = buildKafkaProps(config.getSourceTopic)
-
     val ids: DataStream[String] = env.readTextFile(inPath).setParallelism(10).name("ids")
 
-    //val idTopicIn = new KafkaSink()
-
-//    val idTopicOut : DataStream[String] = env.addSource[String](
-//      new org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09(config.getSourceTopic.getTopic, new SimpleStringSchema(),
-//        inProps));
-
     val keyed_ids: KeyedStream[String, Int] = env.readTextFile(inPath).setParallelism(10).name("keyed_ids").keyBy( id => (id.hashCode % 100).abs )
 
     // these datums contain 'Tweet' objects
@@ -149,7 +145,7 @@ class FlinkTwitterPostsPipeline(config: TwitterPostsPipelineConfiguration = new
       val twitterConfiguration = config.getTwitter
       val twitProvider: TwitterTimelineProvider =
         new TwitterTimelineProvider(
-          twitterConfiguration.withInfo(List(FlinkUtil.toProviderId(id))).withMaxItems(200l)
+          twitterConfiguration.withInfo(List(toProviderId(id))).withMaxItems(200l)
         )
       twitProvider.prepare(twitProvider)
       twitProvider.startStream()

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
new file mode 100644
index 0000000..b615806
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
@@ -0,0 +1,138 @@
+package org.apache.streams.examples.flink.twitter.collection
+
+import java.util.concurrent.TimeUnit
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.google.common.base.{Preconditions, Strings}
+import com.google.common.util.concurrent.Uninterruptibles
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.core.fs.FileSystem
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
+import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}
+import org.apache.flink.streaming.connectors.fs.RollingSink
+import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
+import org.apache.streams.core.StreamsDatum
+import org.apache.streams.examples.flink.FlinkBase
+import org.apache.streams.examples.flink.twitter.TwitterSpritzerPipelineConfiguration
+import org.apache.streams.flink.FlinkStreamingConfiguration
+import org.apache.streams.jackson.StreamsJacksonMapper
+import org.apache.streams.twitter.TwitterStreamConfiguration
+import org.apache.streams.twitter.provider.TwitterStreamProvider
+import org.slf4j.{Logger, LoggerFactory}
+import org.apache.flink.api.scala._
+
+import scala.collection.JavaConversions._
+
+/**
+  * Created by sblackmon on 7/29/15.
+  */
+object FlinkTwitterSpritzerPipeline extends FlinkBase {
+
+  val STREAMS_ID: String = "FlinkTwitterSpritzerPipeline"
+
+  private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterPostsPipeline])
+  private val MAPPER: ObjectMapper = StreamsJacksonMapper.getInstance()
+
+  override def main(args: Array[String]) = {
+    super.main(args)
+    val jobConfig = new ComponentConfigurator(classOf[TwitterSpritzerPipelineConfiguration]).detectConfiguration(typesafe)
+    if( setup(jobConfig) == false ) System.exit(1)
+    val pipeline: FlinkTwitterSpritzerPipeline = new FlinkTwitterSpritzerPipeline(jobConfig)
+    val thread = new Thread(pipeline)
+    thread.start()
+    thread.join()
+  }
+
+  def setup(jobConfig: TwitterSpritzerPipelineConfiguration): Boolean =  {
+
+    LOGGER.info("TwitterSpritzerPipelineConfiguration: " + jobConfig)
+
+    if( jobConfig == null ) {
+      LOGGER.error("jobConfig is null!")
+      System.err.println("jobConfig is null!")
+      return false
+    }
+
+    if( jobConfig.getDestination == null ) {
+      LOGGER.error("jobConfig.getDestination is null!")
+      System.err.println("jobConfig.getDestination is null!")
+      return false
+    }
+
+    if( jobConfig.getTwitter == null ) {
+      LOGGER.error("jobConfig.getTwitter is null!")
+      System.err.println("jobConfig.getTwitter is null!")
+      return false
+    }
+
+    Preconditions.checkNotNull(jobConfig.getTwitter.getOauth)
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessToken))
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessTokenSecret))
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerKey))
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerSecret))
+
+    return true
+
+  }
+
+}
+
+class FlinkTwitterSpritzerPipeline(config: TwitterSpritzerPipelineConfiguration = new ComponentConfigurator(classOf[TwitterSpritzerPipelineConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)) extends Runnable with java.io.Serializable {
+
+  import FlinkTwitterSpritzerPipeline._
+
+  override def run(): Unit = {
+
+    val env: StreamExecutionEnvironment = streamEnvironment(MAPPER.convertValue(config, classOf[FlinkStreamingConfiguration]))
+
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+    env.setNumberOfExecutionRetries(0)
+
+    val outPath = buildWriterPath(config.getDestination)
+
+    val streamSource : DataStream[String] = env.addSource(new SpritzerSource(config.getTwitter));
+
+    if( config.getTest == false )
+      streamSource.addSink(new RollingSink[String](outPath)).setParallelism(3).name("hdfs")
+    else
+      streamSource.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
+        .setParallelism(env.getParallelism);
+
+    // if( test == true ) jsons.print();
+
+    env.execute("FlinkTwitterPostsPipeline")
+  }
+
+  class SpritzerSource(sourceConfig: TwitterStreamConfiguration) extends RichSourceFunction[String] with Serializable {
+
+    var twitProvider: TwitterStreamProvider = _
+
+    @throws[Exception]
+    override def open(parameters: Configuration): Unit = {
+      twitProvider = new TwitterStreamProvider( sourceConfig )
+      twitProvider.prepare(twitProvider)
+      twitProvider.startStream()
+    }
+
+    override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
+      var iterator: Iterator[StreamsDatum] = null
+      do {
+        Uninterruptibles.sleepUninterruptibly(config.getProviderWaitMs, TimeUnit.MILLISECONDS)
+        iterator = twitProvider.readCurrent().iterator()
+        iterator.toList.map(datum => ctx.collect(datum.getDocument.asInstanceOf[String]))
+      } while( twitProvider.isRunning )
+    }
+
+    override def cancel(): Unit = {
+      twitProvider.cleanUp()
+    }
+
+    @throws[Exception]
+    override def close(): Unit = {
+      twitProvider.cleanUp()
+    }
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
index a081c74..867255d 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
@@ -1,28 +1,18 @@
 package org.apache.streams.examples.flink.twitter.collection
 
-import java.lang
 import java.util.concurrent.TimeUnit
 
 import com.fasterxml.jackson.databind.ObjectMapper
+import com.google.common.base.{Preconditions, Strings}
 import org.apache.flink.core.fs.FileSystem
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.scala.function.{AllWindowFunction, WindowFunction}
 import org.apache.flink.streaming.api.windowing.assigners.{GlobalWindows, TumblingEventTimeWindows}
-
-import scala.collection.JavaConversions._
-import com.peoplepattern.streams.twitter.collection.FlinkTwitterUserInformationPipeline.LOGGER
 import com.google.common.util.concurrent.Uninterruptibles
-import org.apache.streams.examples.flink.FlinkBase
 import org.apache.flink.api.common.functions.RichFlatMapFunction
-import org.apache.flink.api.common.restartstrategy.RestartStrategies
-import org.apache.flink.api.java.tuple.Tuple
 import org.apache.flink.streaming.api.scala.{AllWindowedStream, DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
-import org.apache.flink.streaming.api.windowing.time.Time
-import org.apache.flink.streaming.api.windowing.triggers._
 import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, TimeWindow, Window}
 import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.{ExecutionEnvironment, _}
-import org.apache.flink.runtime.state.filesystem.FsStateBackend
 import org.apache.flink.streaming.connectors.fs.RollingSink
 import org.apache.flink.util.Collector
 import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
@@ -30,13 +20,13 @@ import org.apache.streams.core.StreamsDatum
 import org.apache.streams.examples.flink.FlinkBase
 import org.apache.streams.examples.flink.twitter.TwitterUserInformationPipelineConfiguration
 import org.apache.streams.flink.FlinkStreamingConfiguration
-import org.apache.streams.hdfs.HdfsConfiguration
 import org.apache.streams.jackson.StreamsJacksonMapper
-import org.apache.streams.twitter.TwitterUserInformationConfiguration
-import org.apache.streams.twitter.pojo.{Tweet, User}
-import org.apache.streams.twitter.provider.{TwitterTimelineProvider, TwitterUserInformationProvider}
+import org.apache.streams.twitter.pojo.User
+import org.apache.streams.twitter.provider.TwitterUserInformationProvider
 import org.slf4j.{Logger, LoggerFactory}
 
+import scala.collection.JavaConversions._
+
 /**
   * Created by sblackmon on 3/15/16.
   */
@@ -85,6 +75,12 @@ object FlinkTwitterUserInformationPipeline extends FlinkBase {
       return false
     }
 
+    Preconditions.checkNotNull(jobConfig.getTwitter.getOauth)
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessToken))
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessTokenSecret))
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerKey))
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerSecret))
+
     return true
 
   }
@@ -137,7 +133,7 @@ class FlinkTwitterUserInformationPipeline(config: TwitterUserInformationPipeline
 
   class idListWindowFunction extends WindowFunction[String, List[String], Int, GlobalWindow] {
     override def apply(key: Int, window: GlobalWindow, input: Iterable[String], out: Collector[List[String]]): Unit = {if( input.size > 0 )
-        out.collect(input.map(id => FlinkUtil.toProviderId(id)).toList)
+        out.collect(input.map(id => toProviderId(id)).toList)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/site/resources/TwitterSpritzerPipelineConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/resources/TwitterSpritzerPipelineConfiguration.json b/flink/flink-twitter-collection/src/site/resources/TwitterSpritzerPipelineConfiguration.json
new file mode 100644
index 0000000..49d0d1e
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/resources/TwitterSpritzerPipelineConfiguration.json
@@ -0,0 +1,29 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "type": "object",
+  "javaType" : "org.apache.streams.examples.flink.twitter.TwitterSpritzerPipelineConfiguration",
+  "javaInterfaces": ["java.io.Serializable"],
+  "extends": {
+    "$ref": "FlinkStreamingConfiguration.json"
+  },
+  "properties": {
+    "twitter": {
+      "type": "object",
+      "javaType": "org.apache.streams.twitter.TwitterStreamConfiguration"
+    },
+    "source": {
+      "type": "object",
+      "javaType": "org.apache.streams.hdfs.HdfsReaderConfiguration"
+    },
+    "destination": {
+      "type": "object",
+      "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration"
+    },
+    "providerWaitMs": {
+      "type": "integer"
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipeline.conf
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipeline.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipeline.conf
deleted file mode 100644
index e74f00c..0000000
--- a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipeline.conf
+++ /dev/null
@@ -1,10 +0,0 @@
-twitter {
-  endpoint = followers
-  version = 1.1
-  oauth {
-    consumerKey = ""
-    consumerSecret = ""
-    accessToken = ""
-    accessTokenSecret = ""
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf
new file mode 100644
index 0000000..87057be
--- /dev/null
+++ b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf
@@ -0,0 +1,16 @@
+source {
+  fields = ["ID"]
+  scheme = file
+  path = "target/test-classes"
+  readerPath = "asf.txt"
+}
+destination {
+  fields = ["DOC"]
+  scheme = file
+  path = "target/test-classes"
+  writerPath = "FlinkTwitterFollowingPipelineFollowersIT"
+}
+twitter.endpoint = friends
+providerWaitMs = 1000
+local = true
+test = true

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf
new file mode 100644
index 0000000..b5212ed
--- /dev/null
+++ b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf
@@ -0,0 +1,16 @@
+source {
+  fields = ["ID"]
+  scheme = file
+  path = "target/test-classes"
+  readerPath = "asf.txt"
+}
+destination {
+  fields = ["DOC"]
+  scheme = file
+  path = "target/test-classes"
+  writerPath = "FlinkTwitterFollowingPipelineFriendsIT"
+}
+twitter.endpoint = friends
+providerWaitMs = 1000
+local = true
+test = true

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipeline.conf
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipeline.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipeline.conf
deleted file mode 100644
index 63a6481..0000000
--- a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipeline.conf
+++ /dev/null
@@ -1,10 +0,0 @@
-twitter {
-  version = 1.1
-  endpoint = statuses
-  oauth {
-    consumerKey = ""
-    consumerSecret = ""
-    accessToken = ""
-    accessTokenSecret = ""
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipelineIT.conf
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipelineIT.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipelineIT.conf
new file mode 100644
index 0000000..6954113
--- /dev/null
+++ b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipelineIT.conf
@@ -0,0 +1,15 @@
+source {
+  fields = ["ID"]
+  scheme = file
+  path = "target/test-classes"
+  readerPath = "asf.txt"
+}
+destination {
+  fields = ["DOC"]
+  scheme = file
+  path = "target/test-classes"
+  writerPath = "FlinkTwitterPostsPipelineIT"
+}
+providerWaitMs = 1000
+local = true
+test = true

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipeline.conf
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipeline.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipeline.conf
deleted file mode 100644
index 6e0a879..0000000
--- a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipeline.conf
+++ /dev/null
@@ -1,10 +0,0 @@
-twitter {
-  version = 1.1
-  endpoint = users
-  oauth {
-    consumerKey = ""
-    consumerSecret = ""
-    accessToken = ""
-    accessTokenSecret = ""
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf
new file mode 100644
index 0000000..342a850
--- /dev/null
+++ b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf
@@ -0,0 +1,15 @@
+source {
+  fields = ["ID"]
+  scheme = file
+  path = "target/test-classes"
+  readerPath = "asf.txt"
+}
+destination {
+  fields = ["DOC"]
+  scheme = file
+  path = "target/test-classes"
+  writerPath = "FlinkTwitterUserInformationPipelineIT"
+}
+providerWaitMs = 1000
+local = true
+test = true

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala
index aa2b1a9..b051e90 100644
--- a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala
+++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala
@@ -1,17 +1,22 @@
 package com.peoplepattern.streams.twitter.collection
 
+import java.io.File
 import java.nio.file.{Files, Paths}
 
-import com.peoplepattern.streams.pipelines.pdb.{TwitterFollowingPipelineConfiguration, TwitterPostsPipelineConfiguration}
-import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
+import org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration
+import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
+import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator}
+import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline._
+import org.apache.streams.examples.flink.twitter.collection.{FlinkTwitterFollowingPipeline, FlinkTwitterSpritzerPipeline}
 import org.apache.streams.hdfs.{HdfsConfiguration, HdfsReaderConfiguration, HdfsWriterConfiguration}
-import org.scalatest.FlatSpec
-import org.scalatest.concurrent.Eventually._
-import org.scalatest.time.SpanSugar._
 import org.slf4j.{Logger, LoggerFactory}
 import org.testng.annotations.Test
 
 import scala.io.Source
+import org.scalatest.FlatSpec
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.{Seconds, Span}
+import org.scalatest.time.SpanSugar._
 
 /**
   * Created by sblackmon on 3/13/16.
@@ -20,30 +25,31 @@ class FlinkTwitterFollowingPipelineIT extends FlatSpec {
 
   private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterFollowingPipelineIT])
 
+  import FlinkTwitterFollowingPipeline._
+
   @Test
   def flinkTwitterFollowersPipelineFriendsIT = {
 
-    val testConfig : TwitterFollowingPipelineConfiguration =
-      new ComponentConfigurator[TwitterFollowingPipelineConfiguration](classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)
-    testConfig.getTwitter.setEndpoint("friends")
-    val source : HdfsReaderConfiguration = new HdfsReaderConfiguration().withReaderPath("asf.txt").withScheme(HdfsConfiguration.Scheme.FILE).asInstanceOf[HdfsReaderConfiguration]
-    source.setPath("target/test-classes")
-    testConfig.setSource(source);
-    val destination : HdfsWriterConfiguration = new HdfsWriterConfiguration().withWriterPath("pdb-twitter-collect/FlinkTwitterFollowingPipeline/friends").withScheme(HdfsConfiguration.Scheme.FILE).asInstanceOf[HdfsWriterConfiguration]
-    destination.setPath("target/test-classes")
-    testConfig.setDestination(destination)
-    testConfig.setProviderWaitMs(1000l)
-    testConfig.setTest(true)
+    val reference: Config = ConfigFactory.load()
+    val conf_file: File = new File("target/test-classes/FlinkTwitterFollowingPipelineFollowersIT.conf")
+    assert(conf_file.exists())
+    val testResourceConfig: Config = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+
+    val typesafe: Config = testResourceConfig.withFallback(reference).resolve()
+    val streams: StreamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe)
+    val testConfig = new ComponentConfigurator(classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(typesafe)
+
+    setup(testConfig)
 
     val job = new FlinkTwitterFollowingPipeline(config = testConfig)
     val jobThread = new Thread(job)
     jobThread.start
     jobThread.join
 
-    eventually (timeout(30 seconds), interval(1 seconds)) {
-      assert(Files.exists(Paths.get("target/test-classes/pdb-twitter-collect/FlinkTwitterFollowingPipeline/friends")))
+    eventually (timeout(60 seconds), interval(1 seconds)) {
+      assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath)))
       assert(
-        Source.fromFile("target/test-classes/pdb-twitter-collect/FlinkTwitterFollowingPipeline/friends", "UTF-8").getLines.size
+        Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size
           > 90)
     }
 
@@ -52,27 +58,26 @@ class FlinkTwitterFollowingPipelineIT extends FlatSpec {
   @Test
   def flinkTwitterFollowersPipelineFollowersIT = {
 
-    val testConfig : TwitterFollowingPipelineConfiguration =
-      new ComponentConfigurator[TwitterFollowingPipelineConfiguration](classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)
-    testConfig.getTwitter.setEndpoint("followers")
-    val source : HdfsReaderConfiguration = new HdfsReaderConfiguration().withReaderPath("asf.txt").withScheme(HdfsConfiguration.Scheme.FILE).asInstanceOf[HdfsReaderConfiguration]
-    source.setPath("target/test-classes")
-    testConfig.setSource(source);
-    val destination : HdfsWriterConfiguration = new HdfsWriterConfiguration().withWriterPath("pdb-twitter-collect/FlinkTwitterFollowingPipeline/followers").withScheme(HdfsConfiguration.Scheme.FILE).asInstanceOf[HdfsWriterConfiguration]
-    destination.setPath("target/test-classes")
-    testConfig.setDestination(destination)
-    testConfig.setProviderWaitMs(1000l)
-    testConfig.setTest(true)
+    val reference: Config = ConfigFactory.load()
+    val conf_file: File = new File("target/test-classes/FlinkTwitterFollowingPipelineFriendsIT.conf")
+    assert(conf_file.exists())
+    val testResourceConfig: Config = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+
+    val typesafe: Config = testResourceConfig.withFallback(reference).resolve()
+    val streams: StreamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe)
+    val testConfig = new ComponentConfigurator(classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(typesafe)
+
+    setup(testConfig)
 
     val job = new FlinkTwitterFollowingPipeline(config = testConfig)
     val jobThread = new Thread(job)
     jobThread.start
     jobThread.join
 
-    eventually (timeout(30 seconds), interval(1 seconds)) {
-      assert(Files.exists(Paths.get("target/test-classes/FlinkTwitterFollowingPipeline/followers")))
+    eventually (timeout(60 seconds), interval(1 seconds)) {
+      assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath)))
       assert(
-        Source.fromFile("target/test-classes/FlinkTwitterFollowingPipeline/followers", "UTF-8").getLines.size
+        Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size
           > 500)
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala
index 8a942e5..a355696 100644
--- a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala
+++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala
@@ -1,12 +1,16 @@
 package com.peoplepattern.streams.twitter.collection
 
+import java.io.File
 import java.nio.file.{Files, Paths}
 import java.util.concurrent.TimeUnit
 
 import com.google.common.util.concurrent.{Monitor, Uninterruptibles}
-import com.peoplepattern.streams.pipelines.pdb.TwitterPostsPipelineConfiguration
-import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
+import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
+import org.apache.streams.examples.flink.twitter.{TwitterFollowingPipelineConfiguration, TwitterPostsPipelineConfiguration}
+import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator}
+import org.apache.streams.examples.flink.twitter.TwitterPostsPipelineConfiguration
+import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline._
+import org.apache.streams.examples.flink.twitter.collection.{FlinkTwitterPostsPipeline, FlinkTwitterUserInformationPipeline}
 import org.apache.streams.hdfs.{HdfsConfiguration, HdfsReaderConfiguration, HdfsWriterConfiguration}
 import org.slf4j.{Logger, LoggerFactory}
 
@@ -20,23 +24,25 @@ import org.testng.annotations.Test
 /**
   * Created by sblackmon on 3/13/16.
   */
-class FlinkTwitterPostsPipelineIT extends FlatSpec {
+class FlinkTwitterPostsPipelineIT extends FlatSpec  {
 
   private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterPostsPipelineIT])
 
+  import FlinkTwitterPostsPipeline._
+
   @Test
   def flinkTwitterPostsPipelineIT = {
 
-    val testConfig : TwitterPostsPipelineConfiguration =
-      new ComponentConfigurator[TwitterPostsPipelineConfiguration](classOf[TwitterPostsPipelineConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)
-    val source : HdfsReaderConfiguration = new HdfsReaderConfiguration().withReaderPath("asf.txt").withScheme(HdfsConfiguration.Scheme.FILE).asInstanceOf[HdfsReaderConfiguration]
-    source.setPath("target/test-classes")
-    testConfig.setSource(source);
-    val destination : HdfsWriterConfiguration = new HdfsWriterConfiguration().withWriterPath("pdb-twitter-collect/FlinkTwitterPostsPipeline").withScheme(HdfsConfiguration.Scheme.FILE).asInstanceOf[HdfsWriterConfiguration]
-    destination.setPath("target/test-classes")
-    testConfig.setDestination(destination)
-    testConfig.setProviderWaitMs(1000l)
-    testConfig.setTest(true)
+    val reference: Config = ConfigFactory.load()
+    val conf_file: File = new File("target/test-classes/FlinkTwitterPostsPipelineIT.conf")
+    assert(conf_file.exists())
+    val testResourceConfig: Config = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+
+    val typesafe: Config = testResourceConfig.withFallback(reference).resolve()
+    val streams: StreamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe)
+    val testConfig = new ComponentConfigurator(classOf[TwitterPostsPipelineConfiguration]).detectConfiguration(typesafe)
+
+    setup(testConfig)
 
     val job = new FlinkTwitterPostsPipeline(config = testConfig)
     val jobThread = new Thread(job)
@@ -44,9 +50,9 @@ class FlinkTwitterPostsPipelineIT extends FlatSpec {
     jobThread.join
 
     eventually (timeout(30 seconds), interval(1 seconds)) {
-      assert(Files.exists(Paths.get("target/test-classes/FlinkTwitterPostsPipeline")))
+      assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath)))
       assert(
-        Source.fromFile("target/test-classes/FlinkTwitterPostsPipeline", "UTF-8").getLines.size
+        Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size
           >= 200)
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala
new file mode 100644
index 0000000..f083f65
--- /dev/null
+++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala
@@ -0,0 +1,57 @@
+package org.apache.streams.examples.flink.twitter.test
+
+import java.io.File
+import java.nio.file.{Files, Paths}
+
+import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
+import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator}
+import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline._
+import org.apache.streams.examples.flink.twitter.collection.{FlinkTwitterSpritzerPipeline, FlinkTwitterUserInformationPipeline}
+import org.apache.streams.examples.flink.twitter.{TwitterPostsPipelineConfiguration, TwitterSpritzerPipelineConfiguration}
+import org.slf4j.{Logger, LoggerFactory}
+import org.testng.annotations.Test
+
+import scala.io.Source
+import org.scalatest.FlatSpec
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.{Seconds, Span}
+import org.scalatest.time.SpanSugar._
+
+/**
+  * Created by sblackmon on 3/13/16.
+  */
+class FlinkTwitterSpritzerPipelineIT extends FlatSpec {
+
+  private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterSpritzerPipelineIT])
+
+  import FlinkTwitterSpritzerPipeline._
+
+  @Test(enabled = false)
+  def flinkTwitterSpritzerPipelineIT = {
+
+    val reference: Config = ConfigFactory.load()
+    val conf_file: File = new File("target/test-classes/FlinkTwitterSpritzerPipelineIT.conf")
+    assert(conf_file.exists())
+    val testResourceConfig: Config = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+
+    val typesafe: Config = testResourceConfig.withFallback(reference).resolve()
+    val streams: StreamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe)
+    val testConfig = new ComponentConfigurator(classOf[TwitterSpritzerPipelineConfiguration]).detectConfiguration(typesafe)
+
+    setup(testConfig)
+
+    val job = new FlinkTwitterSpritzerPipeline(config = testConfig)
+    val jobThread = new Thread(job)
+    jobThread.start
+    jobThread.join
+
+    eventually (timeout(30 seconds), interval(1 seconds)) {
+      assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath)))
+      assert(
+        Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size
+          >= 200)
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala
index 3d21244..2ca8650 100644
--- a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala
+++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala
@@ -1,10 +1,13 @@
 package com.peoplepattern.streams.twitter.collection
 
+import java.io.File
 import java.nio.file.{Files, Paths}
 
-import com.peoplepattern.streams.pipelines.pdb.{TwitterPostsPipelineConfiguration, TwitterUserInformationPipelineConfiguration}
+import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
+import org.apache.streams.examples.flink.twitter.{TwitterSpritzerPipelineConfiguration, TwitterUserInformationPipelineConfiguration}
 import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
+import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator}
+import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline
 import org.apache.streams.hdfs.{HdfsConfiguration, HdfsReaderConfiguration, HdfsWriterConfiguration}
 import org.scalatest.FlatSpec
 import org.scalatest._
@@ -25,19 +28,21 @@ class FlinkTwitterUserInformationPipelineIT extends FlatSpec {
 
   private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterUserInformationPipelineIT])
 
+  import FlinkTwitterUserInformationPipeline._
+
   @Test
   def flinkTwitterUserInformationPipelineIT = {
 
-    val testConfig : TwitterUserInformationPipelineConfiguration =
-      new ComponentConfigurator[TwitterUserInformationPipelineConfiguration](classOf[TwitterUserInformationPipelineConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)
-    val source : HdfsReaderConfiguration = new HdfsReaderConfiguration().withReaderPath("1000twitterids.txt").withScheme(HdfsConfiguration.Scheme.FILE).asInstanceOf[HdfsReaderConfiguration]
-    source.setPath("target/test-classes")
-    testConfig.setSource(source);
-    val destination : HdfsWriterConfiguration = new HdfsWriterConfiguration().withWriterPath("pdb-twitter-collect/TwitterUserInformationPipeline").withScheme(HdfsConfiguration.Scheme.FILE).asInstanceOf[HdfsWriterConfiguration]
-    destination.setPath("target/test-classes")
-    testConfig.setDestination(destination)
-    testConfig.setProviderWaitMs(1000l)
-    testConfig.setTest(true)
+    val reference: Config = ConfigFactory.load()
+    val conf_file: File = new File("target/test-classes/FlinkTwitterUserInformationPipelineIT.conf")
+    assert(conf_file.exists())
+    val testResourceConfig: Config = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+
+    val typesafe: Config = testResourceConfig.withFallback(reference).resolve()
+    val streams: StreamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe)
+    val testConfig = new ComponentConfigurator(classOf[TwitterUserInformationPipelineConfiguration]).detectConfiguration(typesafe)
+
+    setup(testConfig)
 
     val job = new FlinkTwitterUserInformationPipeline(config = testConfig)
     val jobThread = new Thread(job)
@@ -45,9 +50,9 @@ class FlinkTwitterUserInformationPipelineIT extends FlatSpec {
     jobThread.join
 
     eventually (timeout(30 seconds), interval(1 seconds)) {
-      assert(Files.exists(Paths.get("target/test-classes/TwitterUserInformationPipeline")))
+      assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath)))
       assert(
-        Source.fromFile("target/test-classes/TwitterUserInformationPipeline", "UTF-8").getLines.size
+        Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size
           > 500)
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/pom.xml
----------------------------------------------------------------------
diff --git a/flink/pom.xml b/flink/pom.xml
index 7054e89..6c50ca2 100644
--- a/flink/pom.xml
+++ b/flink/pom.xml
@@ -41,7 +41,4 @@
         <module>flink-twitter-collection</module>
     </modules>
 
-    <build>
-
-    </build>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/local/elasticsearch-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/local/elasticsearch-hdfs/pom.xml b/local/elasticsearch-hdfs/pom.xml
index 7b653fc..52cd0fc 100644
--- a/local/elasticsearch-hdfs/pom.xml
+++ b/local/elasticsearch-hdfs/pom.xml
@@ -67,7 +67,7 @@
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-core</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>com.typesafe</groupId>
@@ -76,34 +76,34 @@
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-config</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-util</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
             <type>test-jar</type>
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-pojo</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
             <type>test-jar</type>
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-runtime-local</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-persist-elasticsearch</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-persist-hdfs</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
             <exclusions>
                 <exclusion>
                     <groupId>org.slf4j</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/local/elasticsearch-reindex/pom.xml
----------------------------------------------------------------------
diff --git a/local/elasticsearch-reindex/pom.xml b/local/elasticsearch-reindex/pom.xml
index e81cbe2..325e564 100644
--- a/local/elasticsearch-reindex/pom.xml
+++ b/local/elasticsearch-reindex/pom.xml
@@ -92,7 +92,7 @@
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-persist-elasticsearch</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>org.slf4j</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/local/mongo-elasticsearch-sync/pom.xml
----------------------------------------------------------------------
diff --git a/local/mongo-elasticsearch-sync/pom.xml b/local/mongo-elasticsearch-sync/pom.xml
index 318c47e..d268ed7 100644
--- a/local/mongo-elasticsearch-sync/pom.xml
+++ b/local/mongo-elasticsearch-sync/pom.xml
@@ -66,7 +66,7 @@
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-core</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>com.typesafe</groupId>
@@ -75,27 +75,27 @@
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-config</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-runtime-local</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-persist-elasticsearch</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-persist-mongo</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-pojo</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
             <type>test-jar</type>
         </dependency>
         <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/local/twitter-follow-graph/pom.xml
----------------------------------------------------------------------
diff --git a/local/twitter-follow-graph/pom.xml b/local/twitter-follow-graph/pom.xml
index d40adde..9bf980d 100644
--- a/local/twitter-follow-graph/pom.xml
+++ b/local/twitter-follow-graph/pom.xml
@@ -49,17 +49,17 @@
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-config</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-runtime-local</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-provider-twitter</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
             <exclusions>
                 <exclusion>
                     <groupId>commons-logging</groupId>
@@ -70,12 +70,12 @@
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-persist-graph</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-pojo</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
             <type>test-jar</type>
         </dependency>
         <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/local/twitter-history-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/local/twitter-history-elasticsearch/pom.xml b/local/twitter-history-elasticsearch/pom.xml
index afc8cf0..ba6dbe7 100644
--- a/local/twitter-history-elasticsearch/pom.xml
+++ b/local/twitter-history-elasticsearch/pom.xml
@@ -69,7 +69,7 @@
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-core</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>com.typesafe</groupId>
@@ -78,29 +78,29 @@
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-config</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-util</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
             <type>test-jar</type>
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-pojo</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
             <type>test-jar</type>
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-runtime-local</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-provider-twitter</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
             <exclusions>
                 <exclusion>
                     <groupId>commons-logging</groupId>
@@ -111,7 +111,7 @@
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-persist-elasticsearch</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>org.slf4j</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/local/twitter-userstream-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/local/twitter-userstream-elasticsearch/pom.xml b/local/twitter-userstream-elasticsearch/pom.xml
index 224bdd4..1b7b64f 100644
--- a/local/twitter-userstream-elasticsearch/pom.xml
+++ b/local/twitter-userstream-elasticsearch/pom.xml
@@ -67,7 +67,7 @@
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-core</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>com.typesafe</groupId>
@@ -76,32 +76,32 @@
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-config</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-runtime-local</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-filters</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-provider-twitter</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-persist-elasticsearch</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-pojo</artifactId>
-            <version>0.3-incubating</version>
+            <version>0.4-incubating-SNAPSHOT</version>
             <type>test-jar</type>
         </dependency>
         <dependency>