You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/12/17 19:59:01 UTC

[8/9] incubator-streams-examples git commit: STREAMS-474: Apply check style requirements to examples

STREAMS-474: Apply check style requirements to examples


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/commit/cb7c0b9d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/tree/cb7c0b9d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/diff/cb7c0b9d

Branch: refs/heads/master
Commit: cb7c0b9de4abcbd804e142a892d55cacb998a8f0
Parents: c4821b6
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Thu Dec 15 17:39:14 2016 -0600
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Thu Dec 15 17:39:14 2016 -0600

----------------------------------------------------------------------
 .../streams/examples/flink/FlinkBase.scala      |  23 +--
 .../FlinkTwitterFollowingPipeline.scala         | 174 ++++++++++---------
 .../collection/FlinkTwitterPostsPipeline.scala  |  28 ++-
 .../FlinkTwitterSpritzerPipeline.scala          |  10 +-
 .../FlinkTwitterUserInformationPipeline.scala   |  15 +-
 ...inkTwitterFollowingPipelineFollowersIT.scala |   2 +-
 ...FlinkTwitterFollowingPipelineFriendsIT.scala |  16 +-
 .../test/FlinkTwitterPostsPipelineIT.scala      |  16 +-
 .../test/FlinkTwitterSpritzerPipelineIT.scala   |  14 +-
 .../FlinkTwitterUserInformationPipelineIT.scala |  16 +-
 .../streams/example/ElasticsearchHdfs.java      |  63 ++++---
 .../streams/example/HdfsElasticsearch.java      |  63 ++++---
 .../example/test/ElasticsearchHdfsIT.java       |  69 ++++----
 .../example/test/HdfsElasticsearchIT.java       |  77 ++++----
 .../src/test/resources/ElasticsearchHdfsIT.conf |   1 +
 .../src/test/resources/HdfsElasticsearchIT.conf |   3 +-
 .../streams/example/ElasticsearchReindex.java   |  58 +++----
 .../test/ElasticsearchReindexChildIT.java       |  79 +++++----
 .../example/test/ElasticsearchReindexIT.java    |  76 ++++----
 .../test/ElasticsearchReindexParentIT.java      |  89 +++++-----
 .../resources/ElasticsearchReindexChildIT.conf  |   1 +
 .../test/resources/ElasticsearchReindexIT.conf  |   1 +
 .../resources/ElasticsearchReindexParentIT.conf |   1 +
 .../streams/example/MongoElasticsearchSync.java |  61 +++----
 .../example/test/MongoElasticsearchSyncIT.java  |  71 ++++----
 .../resources/MongoElasticsearchSyncIT.conf     |   1 +
 .../streams/example/TwitterFollowNeo4j.java     |  73 ++++----
 .../example/test/TwitterFollowNeo4jIT.java      |  44 +++--
 .../test/resources/TwitterFollowNeo4jIT.conf    |   3 +-
 .../example/TwitterHistoryElasticsearch.java    |  61 +++----
 .../test/TwitterHistoryElasticsearchIT.java     |  76 ++++----
 .../TwitterHistoryElasticsearchIT.conf          |   2 +-
 .../example/TwitterUserstreamElasticsearch.java | 140 +++++++--------
 .../test/TwitterUserstreamElasticsearchIT.java  |  78 ++++-----
 .../TwitterUserstreamElasticsearchIT.conf       |   3 +-
 pom.xml                                         |  78 +++++++++
 36 files changed, 821 insertions(+), 765 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
index 49ca5b9..b6d806c 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
@@ -32,6 +32,9 @@ import org.apache.streams.hdfs.{HdfsConfiguration, HdfsReaderConfiguration, Hdfs
 import org.apache.streams.jackson.StreamsJacksonMapper
 import org.slf4j.LoggerFactory
 
+/**
+  * FlinkBase is a base class with capabilities common to all of the streams flink examples.
+  */
 trait FlinkBase {
 
   private val BASELOGGER = LoggerFactory.getLogger("FlinkBase")
@@ -101,16 +104,16 @@ trait FlinkBase {
     }
   }
 
-//  def setup(typesafe: Config): Boolean =  {
-//
-//    val streamsConfig = StreamsConfigurator.detectConfiguration(typesafe)
-//
-//    this.streamsConfig = streamsConfig
-//
-//    BASELOGGER.info("Streams Config: " + streamsConfig)
-//
-//    setup(streamsConfig)
-//  }
+  //  def setup(typesafe: Config): Boolean =  {
+  //
+  //    val streamsConfig = StreamsConfigurator.detectConfiguration(typesafe)
+  //
+  //    this.streamsConfig = streamsConfig
+  //
+  //    BASELOGGER.info("Streams Config: " + streamsConfig)
+  //
+  //    setup(streamsConfig)
+  //  }
 
   def setupStreaming(streamingConfiguration: FlinkStreamingConfiguration): Boolean = {
 

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
index a5a4f72..17246e5 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper
 import com.google.common.base.{Preconditions, Strings}
 import com.google.common.util.concurrent.Uninterruptibles
 import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.api.scala._
 import org.apache.flink.core.fs.FileSystem
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}
@@ -31,26 +32,29 @@ import org.apache.flink.streaming.connectors.fs.RollingSink
 import org.apache.flink.util.Collector
 import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
 import org.apache.streams.core.StreamsDatum
+import org.apache.streams.examples.flink.FlinkBase
+import org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration
+import org.apache.streams.flink.{FlinkStreamingConfiguration, StreamsFlinkConfiguration}
 import org.apache.streams.jackson.StreamsJacksonMapper
 import org.apache.streams.twitter.TwitterFollowingConfiguration
 import org.apache.streams.twitter.pojo.Follow
 import org.apache.streams.twitter.provider.TwitterFollowingProvider
 import org.slf4j.{Logger, LoggerFactory}
-import org.apache.streams.examples.flink.FlinkBase
-import org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration
-import org.apache.streams.flink.{FlinkStreamingConfiguration, StreamsFlinkConfiguration}
-import org.apache.flink.api.scala._
 
 import scala.collection.JavaConversions._
 
+/**
+  * FlinkTwitterFollowingPipeline collects friends or followers of all profiles from a
+  * set of IDs, writing each connection as a twitter:follow in json format to dfs.
+  */
 object FlinkTwitterFollowingPipeline extends FlinkBase {
 
-    val STREAMS_ID: String = "FlinkTwitterFollowingPipeline"
+  val STREAMS_ID: String = "FlinkTwitterFollowingPipeline"
 
-    private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterUserInformationPipeline])
-    private val MAPPER: ObjectMapper = StreamsJacksonMapper.getInstance()
+  private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterUserInformationPipeline])
+  private val MAPPER: ObjectMapper = StreamsJacksonMapper.getInstance()
 
-    override def main(args: Array[String]) = {
+  override def main(args: Array[String]) = {
     super.main(args)
     val jobConfig = new ComponentConfigurator[TwitterFollowingPipelineConfiguration](classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(typesafe)
     if( !setup(jobConfig) ) System.exit(1)
@@ -58,111 +62,111 @@ object FlinkTwitterFollowingPipeline extends FlinkBase {
     val thread = new Thread(pipeline)
     thread.start()
     thread.join()
-    }
+  }
 
-    def setup(jobConfig: TwitterFollowingPipelineConfiguration): Boolean =  {
+  def setup(jobConfig: TwitterFollowingPipelineConfiguration): Boolean =  {
 
-        LOGGER.info("TwitterFollowingPipelineConfiguration: " + jobConfig)
+    LOGGER.info("TwitterFollowingPipelineConfiguration: " + jobConfig)
 
-        if( jobConfig == null ) {
-            LOGGER.error("jobConfig is null!")
-            System.err.println("jobConfig is null!")
-            return false
-        }
+    if( jobConfig == null ) {
+      LOGGER.error("jobConfig is null!")
+      System.err.println("jobConfig is null!")
+      return false
+    }
 
-        if( jobConfig.getSource == null ) {
-            LOGGER.error("jobConfig.getSource is null!")
-            System.err.println("jobConfig.getSource is null!")
-            return false
-        }
+    if( jobConfig.getSource == null ) {
+      LOGGER.error("jobConfig.getSource is null!")
+      System.err.println("jobConfig.getSource is null!")
+      return false
+    }
 
-        if( jobConfig.getDestination == null ) {
-            LOGGER.error("jobConfig.getDestination is null!")
-            System.err.println("jobConfig.getDestination is null!")
-            return false
-        }
+    if( jobConfig.getDestination == null ) {
+      LOGGER.error("jobConfig.getDestination is null!")
+      System.err.println("jobConfig.getDestination is null!")
+      return false
+    }
 
-        if( jobConfig.getTwitter == null ) {
-            LOGGER.error("jobConfig.getTwitter is null!")
-            System.err.println("jobConfig.getTwitter is null!")
-            return false
-        }
+    if( jobConfig.getTwitter == null ) {
+      LOGGER.error("jobConfig.getTwitter is null!")
+      System.err.println("jobConfig.getTwitter is null!")
+      return false
+    }
 
-        Preconditions.checkNotNull(jobConfig.getTwitter.getOauth)
-        Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessToken))
-        Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessTokenSecret))
-        Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerKey))
-        Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerSecret))
+    Preconditions.checkNotNull(jobConfig.getTwitter.getOauth)
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessToken))
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessTokenSecret))
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerKey))
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerSecret))
 
-        true
+    true
 
-    }
+  }
 
 }
 
 class FlinkTwitterFollowingPipeline(config: TwitterFollowingPipelineConfiguration = new ComponentConfigurator[TwitterFollowingPipelineConfiguration](classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)) extends Runnable with java.io.Serializable {
 
-    import FlinkTwitterFollowingPipeline._
+  import FlinkTwitterFollowingPipeline._
+
+  override def run(): Unit = {
+
+    val env: StreamExecutionEnvironment = streamEnvironment(MAPPER.convertValue(config, classOf[FlinkStreamingConfiguration]))
 
-    override def run(): Unit = {
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+    env.setNumberOfExecutionRetries(0)
 
-        val env: StreamExecutionEnvironment = streamEnvironment(MAPPER.convertValue(config, classOf[FlinkStreamingConfiguration]))
+    val inPath = buildReaderPath(config.getSource)
 
-        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
-        env.setNumberOfExecutionRetries(0)
+    val outPath = buildWriterPath(config.getDestination)
 
-        val inPath = buildReaderPath(config.getSource)
+    val keyed_ids: KeyedStream[String, Int] = env.readTextFile(inPath).setParallelism(10).keyBy( id => (id.hashCode % 100).abs )
 
-        val outPath = buildWriterPath(config.getDestination)
+    // these datums contain 'Follow' objects
+    val followDatums: DataStream[StreamsDatum] =
+      keyed_ids.flatMap(new FollowingCollectorFlatMapFunction(config.getTwitter)).setParallelism(10)
 
-        val keyed_ids: KeyedStream[String, Int] = env.readTextFile(inPath).setParallelism(10).keyBy( id => (id.hashCode % 100).abs )
+    val follows: DataStream[Follow] = followDatums
+      .map(datum => datum.getDocument.asInstanceOf[Follow])
 
-        // these datums contain 'Follow' objects
-        val followDatums: DataStream[StreamsDatum] =
-            keyed_ids.flatMap(new FollowingCollectorFlatMapFunction(config.getTwitter)).setParallelism(10)
+    val jsons: DataStream[String] = follows
+      .map(follow => {
+        val MAPPER = StreamsJacksonMapper.getInstance
+        MAPPER.writeValueAsString(follow)
+      })
 
-        val follows: DataStream[Follow] = followDatums
-          .map(datum => datum.getDocument.asInstanceOf[Follow])
+    if( config.getTest == false )
+      jsons.addSink(new RollingSink[String](outPath)).setParallelism(3)
+    else
+      jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
+        .setParallelism(env.getParallelism)
 
-        val jsons: DataStream[String] = follows
-          .map(follow => {
-              val MAPPER = StreamsJacksonMapper.getInstance
-              MAPPER.writeValueAsString(follow)
-          })
+    // if( test == true ) jsons.print();
 
-        if( config.getTest == false )
-            jsons.addSink(new RollingSink[String](outPath)).setParallelism(3)
-        else
-            jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
-              .setParallelism(env.getParallelism)
+    env.execute(STREAMS_ID)
+  }
 
-        // if( test == true ) jsons.print();
+  class FollowingCollectorFlatMapFunction(
+                                           twitterConfiguration : TwitterFollowingConfiguration = new ComponentConfigurator[TwitterFollowingConfiguration](classOf[TwitterFollowingConfiguration]).detectConfiguration(StreamsConfigurator.getConfig.getConfig("twitter")),
+                                           flinkConfiguration : StreamsFlinkConfiguration = new ComponentConfigurator[StreamsFlinkConfiguration](classOf[StreamsFlinkConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)
+                                         ) extends RichFlatMapFunction[String, StreamsDatum] with Serializable {
 
-        env.execute(STREAMS_ID)
+    override def flatMap(input: String, out: Collector[StreamsDatum]): Unit = {
+      collectConnections(input, out)
     }
 
-    class FollowingCollectorFlatMapFunction(
-                                             twitterConfiguration : TwitterFollowingConfiguration = new ComponentConfigurator[TwitterFollowingConfiguration](classOf[TwitterFollowingConfiguration]).detectConfiguration(StreamsConfigurator.getConfig.getConfig("twitter")),
-                                             flinkConfiguration : StreamsFlinkConfiguration = new ComponentConfigurator[StreamsFlinkConfiguration](classOf[StreamsFlinkConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)
-                                           ) extends RichFlatMapFunction[String, StreamsDatum] with Serializable {
-
-        override def flatMap(input: String, out: Collector[StreamsDatum]): Unit = {
-            collectConnections(input, out)
-        }
-
-        def collectConnections(id : String, out : Collector[StreamsDatum]) = {
-            val twitProvider: TwitterFollowingProvider =
-                new TwitterFollowingProvider(
-                    twitterConfiguration.withIdsOnly(true).withInfo(List(toProviderId(id))).withMaxItems(5000l).asInstanceOf[TwitterFollowingConfiguration]
-                )
-            twitProvider.prepare(twitProvider)
-            twitProvider.startStream()
-            var iterator: Iterator[StreamsDatum] = null
-            do {
-                Uninterruptibles.sleepUninterruptibly(flinkConfiguration.getProviderWaitMs, TimeUnit.MILLISECONDS)
-                twitProvider.readCurrent().iterator().toList.map(out.collect(_))
-            } while( twitProvider.isRunning )
-        }
+    def collectConnections(id : String, out : Collector[StreamsDatum]) = {
+      val twitProvider: TwitterFollowingProvider =
+        new TwitterFollowingProvider(
+          twitterConfiguration.withInfo(List(toProviderId(id))).asInstanceOf[TwitterFollowingConfiguration]
+        )
+      twitProvider.prepare(twitProvider)
+      twitProvider.startStream()
+      var iterator: Iterator[StreamsDatum] = null
+      do {
+        Uninterruptibles.sleepUninterruptibly(flinkConfiguration.getProviderWaitMs, TimeUnit.MILLISECONDS)
+        twitProvider.readCurrent().iterator().toList.map(out.collect(_))
+      } while( twitProvider.isRunning )
     }
+  }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
index 8bb2997..549e048 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
@@ -23,35 +23,29 @@ import java.util.concurrent.TimeUnit
 import com.fasterxml.jackson.databind.ObjectMapper
 import com.google.common.base.{Preconditions, Strings}
 import com.google.common.util.concurrent.Uninterruptibles
-import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction}
-import org.apache.flink.api.common.restartstrategy.RestartStrategies
-import org.apache.flink.api.scala.{ExecutionEnvironment, _}
+import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.api.scala._
 import org.apache.flink.core.fs.FileSystem
-import org.apache.flink.runtime.state.filesystem.FsStateBackend
 import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.scala.function.AllWindowFunction
-import org.apache.flink.streaming.api.scala.{AllWindowedStream, DataStream, KeyedStream, StreamExecutionEnvironment}
-import org.apache.flink.streaming.api.windowing.time.Time
-import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger
-import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
+import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}
 import org.apache.flink.streaming.connectors.fs.RollingSink
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema
 import org.apache.flink.util.Collector
 import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
 import org.apache.streams.core.StreamsDatum
 import org.apache.streams.examples.flink.FlinkBase
 import org.apache.streams.examples.flink.twitter.TwitterPostsPipelineConfiguration
 import org.apache.streams.flink.FlinkStreamingConfiguration
-import org.apache.streams.hdfs.HdfsConfiguration
 import org.apache.streams.jackson.StreamsJacksonMapper
-import org.apache.streams.twitter.TwitterUserInformationConfiguration
-import org.apache.streams.twitter.pojo.{Tweet, User}
-import org.apache.streams.twitter.provider.{TwitterTimelineProvider, TwitterUserInformationProvider}
+import org.apache.streams.twitter.pojo.Tweet
+import org.apache.streams.twitter.provider.TwitterTimelineProvider
 import org.slf4j.{Logger, LoggerFactory}
-import org.apache.flink.api.scala._
 
 import scala.collection.JavaConversions._
 
+/**
+  * FlinkTwitterPostsPipeline collects recent posts from all profiles from a
+  * set of IDs, writing each post as a twitter:status in json format to dfs.
+  */
 object FlinkTwitterPostsPipeline extends FlinkBase {
 
   val STREAMS_ID: String = "FlinkTwitterPostsPipeline"
@@ -154,13 +148,13 @@ class FlinkTwitterPostsPipeline(config: TwitterPostsPipelineConfiguration = new
 
   class postCollectorFlatMapFunction extends RichFlatMapFunction[String, StreamsDatum] with Serializable {
     override def flatMap(input: String, out: Collector[StreamsDatum]): Unit = {
-        collectPosts(input, out)
+      collectPosts(input, out)
     }
     def collectPosts(id : String, out : Collector[StreamsDatum]) = {
       val twitterConfiguration = config.getTwitter
       val twitProvider: TwitterTimelineProvider =
         new TwitterTimelineProvider(
-          twitterConfiguration.withInfo(List(toProviderId(id))).withMaxItems(200l)
+          twitterConfiguration.withInfo(List(toProviderId(id)))
         )
       twitProvider.prepare(twitProvider)
       twitProvider.startStream()

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
index 56d892b..dbb8a33 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
@@ -25,11 +25,12 @@ import com.fasterxml.jackson.databind.ObjectMapper
 import com.google.common.base.{Preconditions, Strings}
 import com.google.common.util.concurrent.Uninterruptibles
 import org.apache.flink.api.common.functions.StoppableFunction
+import org.apache.flink.api.scala._
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.core.fs.FileSystem
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
-import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}
+import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
 import org.apache.flink.streaming.connectors.fs.RollingSink
 import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
 import org.apache.streams.core.StreamsDatum
@@ -38,13 +39,16 @@ import org.apache.streams.examples.flink.twitter.TwitterSpritzerPipelineConfigur
 import org.apache.streams.flink.FlinkStreamingConfiguration
 import org.apache.streams.jackson.StreamsJacksonMapper
 import org.apache.streams.twitter.TwitterStreamConfiguration
+import org.apache.streams.twitter.converter.TwitterDateTimeFormat
 import org.apache.streams.twitter.provider.TwitterStreamProvider
 import org.slf4j.{Logger, LoggerFactory}
-import org.apache.flink.api.scala._
-import org.apache.streams.twitter.converter.TwitterDateTimeFormat
 
 import scala.collection.JavaConversions._
 
+/**
+  * FlinkTwitterSpritzerPipeline opens a spritzer stream and writes
+  * each post received as a twitter:status in json format to dfs.
+  */
 object FlinkTwitterSpritzerPipeline extends FlinkBase {
 
   val STREAMS_ID: String = "FlinkTwitterSpritzerPipeline"

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
index 01425f6..c180089 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
@@ -22,15 +22,14 @@ import java.util.concurrent.TimeUnit
 
 import com.fasterxml.jackson.databind.ObjectMapper
 import com.google.common.base.{Preconditions, Strings}
-import org.apache.flink.core.fs.FileSystem
-import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.scala.function.{AllWindowFunction, WindowFunction}
-import org.apache.flink.streaming.api.windowing.assigners.{GlobalWindows, TumblingEventTimeWindows}
 import com.google.common.util.concurrent.Uninterruptibles
 import org.apache.flink.api.common.functions.RichFlatMapFunction
-import org.apache.flink.streaming.api.scala.{AllWindowedStream, DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
-import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, TimeWindow, Window}
 import org.apache.flink.api.scala._
+import org.apache.flink.core.fs.FileSystem
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.scala.function.WindowFunction
+import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
 import org.apache.flink.streaming.connectors.fs.RollingSink
 import org.apache.flink.util.Collector
 import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
@@ -45,6 +44,10 @@ import org.slf4j.{Logger, LoggerFactory}
 
 import scala.collection.JavaConversions._
 
+/**
+  * FlinkTwitterPostsPipeline collects the current user profile of a
+  * set of IDs, writing each as a twitter:user in json format to dfs.
+  */
 object FlinkTwitterUserInformationPipeline extends FlinkBase {
 
   val STREAMS_ID: String = "FlinkTwitterUserInformationPipeline"

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala
index 6cf8d9d..be22b82 100644
--- a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala
+++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala
@@ -34,7 +34,7 @@ import org.testng.annotations.Test
 import scala.io.Source
 
 /**
-  * Created by sblackmon on 3/13/16.
+  * FlinkTwitterFollowingPipelineFollowersIT is an integration test for FlinkTwitterFollowingPipeline.
   */
 class FlinkTwitterFollowingPipelineFollowersIT extends FlatSpec {
 

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala
index 8ea7f9c..9829ebc 100644
--- a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala
+++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala
@@ -21,24 +21,20 @@ package com.peoplepattern.streams.twitter.collection
 import java.io.File
 import java.nio.file.{Files, Paths}
 
-import org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration
 import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
 import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator}
-import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline._
-import org.apache.streams.examples.flink.twitter.collection.{FlinkTwitterFollowingPipeline, FlinkTwitterSpritzerPipeline}
-import org.apache.streams.hdfs.{HdfsConfiguration, HdfsReaderConfiguration, HdfsWriterConfiguration}
-import org.junit.Ignore
+import org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration
+import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterFollowingPipeline
+import org.scalatest.FlatSpec
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.SpanSugar._
 import org.slf4j.{Logger, LoggerFactory}
 import org.testng.annotations.Test
 
 import scala.io.Source
-import org.scalatest.FlatSpec
-import org.scalatest.concurrent.Eventually._
-import org.scalatest.time.{Seconds, Span}
-import org.scalatest.time.SpanSugar._
 
 /**
-  * Created by sblackmon on 3/13/16.
+  * FlinkTwitterFollowingPipelineFriendsIT is an integration test for FlinkTwitterFollowingPipeline.
   */
 class FlinkTwitterFollowingPipelineFriendsIT extends FlatSpec {
 

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala
index 7113c4c..987e82d 100644
--- a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala
+++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala
@@ -20,27 +20,21 @@ package com.peoplepattern.streams.twitter.collection
 
 import java.io.File
 import java.nio.file.{Files, Paths}
-import java.util.concurrent.TimeUnit
 
-import com.google.common.util.concurrent.{Monitor, Uninterruptibles}
 import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
-import org.apache.streams.examples.flink.twitter.{TwitterFollowingPipelineConfiguration, TwitterPostsPipelineConfiguration}
 import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator}
 import org.apache.streams.examples.flink.twitter.TwitterPostsPipelineConfiguration
-import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline._
-import org.apache.streams.examples.flink.twitter.collection.{FlinkTwitterPostsPipeline, FlinkTwitterUserInformationPipeline}
-import org.apache.streams.hdfs.{HdfsConfiguration, HdfsReaderConfiguration, HdfsWriterConfiguration}
-import org.slf4j.{Logger, LoggerFactory}
-
-import scala.io.Source
+import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterPostsPipeline
 import org.scalatest.FlatSpec
 import org.scalatest.concurrent.Eventually._
-import org.scalatest.time.{Seconds, Span}
 import org.scalatest.time.SpanSugar._
+import org.slf4j.{Logger, LoggerFactory}
 import org.testng.annotations.Test
 
+import scala.io.Source
+
 /**
-  * Created by sblackmon on 3/13/16.
+  * FlinkTwitterPostsPipelineIT is an integration test for FlinkTwitterPostsPipeline.
   */
 class FlinkTwitterPostsPipelineIT extends FlatSpec  {
 

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala
index 29625ca..7570bac 100644
--- a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala
+++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala
@@ -23,20 +23,18 @@ import java.nio.file.{Files, Paths}
 
 import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
 import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator}
-import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline._
-import org.apache.streams.examples.flink.twitter.collection.{FlinkTwitterSpritzerPipeline, FlinkTwitterUserInformationPipeline}
-import org.apache.streams.examples.flink.twitter.{TwitterPostsPipelineConfiguration, TwitterSpritzerPipelineConfiguration}
+import org.apache.streams.examples.flink.twitter.TwitterSpritzerPipelineConfiguration
+import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterSpritzerPipeline
+import org.scalatest.FlatSpec
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.SpanSugar._
 import org.slf4j.{Logger, LoggerFactory}
 import org.testng.annotations.Test
 
 import scala.io.Source
-import org.scalatest.FlatSpec
-import org.scalatest.concurrent.Eventually._
-import org.scalatest.time.{Seconds, Span}
-import org.scalatest.time.SpanSugar._
 
 /**
-  * Created by sblackmon on 3/13/16.
+  * FlinkTwitterSpritzerPipelineIT is an integration test for FlinkTwitterSpritzerPipeline.
   */
 class FlinkTwitterSpritzerPipelineIT extends FlatSpec {
 

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala
index b4387f9..ab88d48 100644
--- a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala
+++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala
@@ -22,25 +22,19 @@ import java.io.File
 import java.nio.file.{Files, Paths}
 
 import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
-import org.apache.streams.examples.flink.twitter.{TwitterSpritzerPipelineConfiguration, TwitterUserInformationPipelineConfiguration}
-import org.apache.flink.api.scala.ExecutionEnvironment
 import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator}
+import org.apache.streams.examples.flink.twitter.TwitterUserInformationPipelineConfiguration
 import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline
-import org.apache.streams.hdfs.{HdfsConfiguration, HdfsReaderConfiguration, HdfsWriterConfiguration}
 import org.scalatest.FlatSpec
-import org.scalatest._
-import org.scalatest.junit.JUnitRunner
-import org.slf4j.{Logger, LoggerFactory}
-
-import scala.io.Source
-import org.scalatest.Ignore
 import org.scalatest.concurrent.Eventually._
-import org.scalatest.time.{Seconds, Span}
 import org.scalatest.time.SpanSugar._
+import org.slf4j.{Logger, LoggerFactory}
 import org.testng.annotations.Test
 
+import scala.io.Source
+
 /**
-  * Created by sblackmon on 3/13/16.
+  * FlinkTwitterUserInformationPipelineIT is an integration test for FlinkTwitterUserInformationPipeline.
   */
 class FlinkTwitterUserInformationPipelineIT extends FlatSpec {
 

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/ElasticsearchHdfs.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/ElasticsearchHdfs.java b/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/ElasticsearchHdfs.java
index be79f4a..b859d60 100644
--- a/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/ElasticsearchHdfs.java
+++ b/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/ElasticsearchHdfs.java
@@ -23,59 +23,54 @@ import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamBuilder;
 import org.apache.streams.elasticsearch.ElasticsearchPersistReader;
 import org.apache.streams.hdfs.WebHdfsPersistWriter;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.local.LocalRuntimeConfiguration;
 import org.apache.streams.local.builders.LocalStreamBuilder;
 
 import com.google.common.collect.Maps;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 
 /**
- * Copies documents into a new index
+ * Copies documents from an elasticsearch index to new-line delimited json on dfs.
  */
 public class ElasticsearchHdfs implements Runnable {
 
-    public final static String STREAMS_ID = "ElasticsearchHdfs";
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchHdfs.class);
-
-    ElasticsearchHdfsConfiguration config;
-
-    public ElasticsearchHdfs() {
-       this(new ComponentConfigurator<>(ElasticsearchHdfsConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
-
-    }
-
-    public ElasticsearchHdfs(ElasticsearchHdfsConfiguration reindex) {
-        this.config = reindex;
-    }
+  public final static String STREAMS_ID = "ElasticsearchHdfs";
 
-    public static void main(String[] args)
-    {
-        LOGGER.info(StreamsConfigurator.config.toString());
+  private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchHdfs.class);
 
-        ElasticsearchHdfs backup = new ElasticsearchHdfs();
+  ElasticsearchHdfsConfiguration config;
 
-        new Thread(backup).start();
+  public ElasticsearchHdfs() {
+    this(new ComponentConfigurator<>(ElasticsearchHdfsConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
+  }
 
-    }
+  public ElasticsearchHdfs(ElasticsearchHdfsConfiguration reindex) {
+    this.config = reindex;
+  }
 
-    @Override
-    public void run() {
+  public static void main(String[] args)
+  {
+    LOGGER.info(StreamsConfigurator.config.toString());
+    ElasticsearchHdfs backup = new ElasticsearchHdfs();
+    new Thread(backup).start();
+  }
 
-        ElasticsearchPersistReader elasticsearchPersistReader = new ElasticsearchPersistReader(config.getSource());
+  @Override
+  public void run() {
 
-        WebHdfsPersistWriter hdfsPersistWriter = new WebHdfsPersistWriter(config.getDestination());
+    ElasticsearchPersistReader elasticsearchPersistReader = new ElasticsearchPersistReader(config.getSource());
+    WebHdfsPersistWriter hdfsPersistWriter = new WebHdfsPersistWriter(config.getDestination());
 
-        Map<String, Object> streamConfig = Maps.newHashMap();
-        streamConfig.put(LocalStreamBuilder.STREAM_IDENTIFIER_KEY, STREAMS_ID);
-        streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 7 * 24 * 60 * 1000);
-        StreamBuilder builder = new LocalStreamBuilder(1000, streamConfig);
+    LocalRuntimeConfiguration localRuntimeConfiguration =
+        StreamsJacksonMapper.getInstance().convertValue(StreamsConfigurator.detectConfiguration(), LocalRuntimeConfiguration.class);
+    StreamBuilder builder = new LocalStreamBuilder(localRuntimeConfiguration);
 
-        builder.newPerpetualStream(ElasticsearchPersistReader.class.getCanonicalName(), elasticsearchPersistReader);
-        builder.addStreamsPersistWriter(WebHdfsPersistWriter.class.getCanonicalName(), hdfsPersistWriter, 1, ElasticsearchPersistReader.class.getCanonicalName());
-        builder.start();
-    }
+    builder.newPerpetualStream(ElasticsearchPersistReader.class.getCanonicalName(), elasticsearchPersistReader);
+    builder.addStreamsPersistWriter(WebHdfsPersistWriter.class.getCanonicalName(), hdfsPersistWriter, 1, ElasticsearchPersistReader.class.getCanonicalName());
+    builder.start();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/HdfsElasticsearch.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/HdfsElasticsearch.java b/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/HdfsElasticsearch.java
index 375665c..caf9cbc 100644
--- a/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/HdfsElasticsearch.java
+++ b/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/HdfsElasticsearch.java
@@ -23,59 +23,54 @@ import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamBuilder;
 import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
 import org.apache.streams.hdfs.WebHdfsPersistReader;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.local.LocalRuntimeConfiguration;
 import org.apache.streams.local.builders.LocalStreamBuilder;
 
 import com.google.common.collect.Maps;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 
 /**
- * Copies documents into a new index
+ * Copies documents from new-line delimited json on dfs to an elasticsearch index.
  */
 public class HdfsElasticsearch implements Runnable {
 
-    public final static String STREAMS_ID = "HdfsElasticsearch";
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(HdfsElasticsearch.class);
-
-    HdfsElasticsearchConfiguration config;
-
-    public HdfsElasticsearch() {
-       this(new ComponentConfigurator<>(HdfsElasticsearchConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
-
-    }
-
-    public HdfsElasticsearch(HdfsElasticsearchConfiguration reindex) {
-        this.config = reindex;
-    }
+  public final static String STREAMS_ID = "HdfsElasticsearch";
 
-    public static void main(String[] args)
-    {
-        LOGGER.info(StreamsConfigurator.config.toString());
+  private final static Logger LOGGER = LoggerFactory.getLogger(HdfsElasticsearch.class);
 
-        HdfsElasticsearch restore = new HdfsElasticsearch();
+  HdfsElasticsearchConfiguration config;
 
-        new Thread(restore).start();
+  public HdfsElasticsearch() {
+    this(new ComponentConfigurator<>(HdfsElasticsearchConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
+  }
 
-    }
+  public HdfsElasticsearch(HdfsElasticsearchConfiguration reindex) {
+    this.config = reindex;
+  }
 
-    @Override
-    public void run() {
+  public static void main(String[] args)
+  {
+    LOGGER.info(StreamsConfigurator.config.toString());
+    HdfsElasticsearch restore = new HdfsElasticsearch();
+    new Thread(restore).start();
+  }
 
-        WebHdfsPersistReader webHdfsPersistReader = new WebHdfsPersistReader(config.getSource());
+  @Override
+  public void run() {
 
-        ElasticsearchPersistWriter elasticsearchPersistWriter = new ElasticsearchPersistWriter(config.getDestination());
+    WebHdfsPersistReader webHdfsPersistReader = new WebHdfsPersistReader(config.getSource());
+    ElasticsearchPersistWriter elasticsearchPersistWriter = new ElasticsearchPersistWriter(config.getDestination());
 
-        Map<String, Object> streamConfig = Maps.newHashMap();
-        streamConfig.put(LocalStreamBuilder.STREAM_IDENTIFIER_KEY, STREAMS_ID);
-        streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 1000 * 1000);
-        StreamBuilder builder = new LocalStreamBuilder(1000, streamConfig);
+    LocalRuntimeConfiguration localRuntimeConfiguration =
+        StreamsJacksonMapper.getInstance().convertValue(StreamsConfigurator.detectConfiguration(), LocalRuntimeConfiguration.class);
+    StreamBuilder builder = new LocalStreamBuilder(localRuntimeConfiguration);
 
-        builder.newPerpetualStream(WebHdfsPersistReader.class.getCanonicalName(), webHdfsPersistReader);
-        builder.addStreamsPersistWriter(ElasticsearchPersistWriter.class.getCanonicalName(), elasticsearchPersistWriter, 1, WebHdfsPersistReader.class.getCanonicalName());
-        builder.start();
-    }
+    builder.newPerpetualStream(WebHdfsPersistReader.class.getCanonicalName(), webHdfsPersistReader);
+    builder.addStreamsPersistWriter(ElasticsearchPersistWriter.class.getCanonicalName(), elasticsearchPersistWriter, 1, WebHdfsPersistReader.class.getCanonicalName());
+    builder.start();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ElasticsearchHdfsIT.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ElasticsearchHdfsIT.java b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ElasticsearchHdfsIT.java
index 437ebf6..9b70440 100644
--- a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ElasticsearchHdfsIT.java
+++ b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ElasticsearchHdfsIT.java
@@ -28,7 +28,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigParseOptions;
-
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
 import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
@@ -50,56 +49,54 @@ import static org.hamcrest.core.Is.is;
 import static org.testng.Assert.assertNotEquals;
 
 /**
- * Test copying documents between hdfs and elasticsearch
+ * ElasticsearchHdfsIT is an integration test for ElasticsearchHdfs.
  */
 public class ElasticsearchHdfsIT {
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchHdfsIT.class);
-
-    ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+  private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchHdfsIT.class);
 
-    protected ElasticsearchHdfsConfiguration testConfiguration;
-    protected Client testClient;
+  ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
 
-    private int count = 0;
+  protected ElasticsearchHdfsConfiguration testConfiguration;
+  protected Client testClient;
 
-    @BeforeClass
-    public void prepareTest() throws Exception {
+  private int count = 0;
 
-        Config reference  = ConfigFactory.load();
-        File conf_file = new File("target/test-classes/ElasticsearchHdfsIT.conf");
-        assert(conf_file.exists());
-        Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
-        Config typesafe  = testResourceConfig.withFallback(reference).resolve();
-        testConfiguration = new ComponentConfigurator<>(ElasticsearchHdfsConfiguration.class).detectConfiguration(typesafe);
-        testClient = ElasticsearchClientManager.getInstance(testConfiguration.getSource()).client();
+  @BeforeClass
+  public void prepareTest() throws Exception {
 
-        ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
-        ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
-        assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
+    Config reference  = ConfigFactory.load();
+    File conf_file = new File("target/test-classes/ElasticsearchHdfsIT.conf");
+    assert(conf_file.exists());
+    Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+    Config typesafe  = testResourceConfig.withFallback(reference).resolve();
+    testConfiguration = new ComponentConfigurator<>(ElasticsearchHdfsConfiguration.class).detectConfiguration(typesafe);
+    testClient = ElasticsearchClientManager.getInstance(testConfiguration.getSource()).client();
 
-        IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getSource().getIndexes().get(0));
-        IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
-        assertThat(indicesExistsResponse.isExists(), is(true));
+    ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
+    ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
+    assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
 
-        SearchRequestBuilder countRequest = testClient
-                .prepareSearch(testConfiguration.getSource().getIndexes().get(0))
-                .setTypes(testConfiguration.getSource().getTypes().get(0));
-        SearchResponse countResponse = countRequest.execute().actionGet();
+    IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getSource().getIndexes().get(0));
+    IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+    assertThat(indicesExistsResponse.isExists(), is(true));
 
-        count = (int)countResponse.getHits().getTotalHits();
+    SearchRequestBuilder countRequest = testClient
+        .prepareSearch(testConfiguration.getSource().getIndexes().get(0))
+        .setTypes(testConfiguration.getSource().getTypes().get(0));
+    SearchResponse countResponse = countRequest.execute().actionGet();
 
-        assertNotEquals(count, 0);
-    }
+    count = (int)countResponse.getHits().getTotalHits();
 
-    @Test
-    public void ElasticsearchHdfsIT() throws Exception {
+    assertNotEquals(count, 0);
+  }
 
-        ElasticsearchHdfs backup = new ElasticsearchHdfs(testConfiguration);
+  @Test
+  public void ElasticsearchHdfsIT() throws Exception {
 
-        backup.run();
+    ElasticsearchHdfs backup = new ElasticsearchHdfs(testConfiguration);
+    backup.run();
 
-        // assert lines in file
-    }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/HdfsElasticsearchIT.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/HdfsElasticsearchIT.java b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/HdfsElasticsearchIT.java
index a629025..d5f6a29 100644
--- a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/HdfsElasticsearchIT.java
+++ b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/HdfsElasticsearchIT.java
@@ -28,7 +28,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigParseOptions;
-
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
@@ -52,59 +51,59 @@ import static org.testng.Assert.assertNotEquals;
 import static org.testng.AssertJUnit.assertTrue;
 
 /**
- * Test copying documents between hdfs and elasticsearch
+ * HdfsElasticsearchIT is an integration test for HdfsElasticsearch.
  */
 public class HdfsElasticsearchIT {
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(HdfsElasticsearchIT.class);
+  private final static Logger LOGGER = LoggerFactory.getLogger(HdfsElasticsearchIT.class);
 
-    ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+  ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
 
-    protected HdfsElasticsearchConfiguration testConfiguration;
-    protected Client testClient;
+  protected HdfsElasticsearchConfiguration testConfiguration;
+  protected Client testClient;
 
-    @BeforeClass
-    public void prepareTest() throws Exception {
+  @BeforeClass
+  public void prepareTest() throws Exception {
 
-        Config reference  = ConfigFactory.load();
-        File conf_file = new File("target/test-classes/HdfsElasticsearchIT.conf");
-        assert(conf_file.exists());
-        Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
-        Config typesafe  = testResourceConfig.withFallback(reference).resolve();
-        testConfiguration = new ComponentConfigurator<>(HdfsElasticsearchConfiguration.class).detectConfiguration(typesafe);
-        testClient = ElasticsearchClientManager.getInstance(testConfiguration.getDestination()).client();
+    Config reference  = ConfigFactory.load();
+    File conf_file = new File("target/test-classes/HdfsElasticsearchIT.conf");
+    assert(conf_file.exists());
+    Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+    Config typesafe  = testResourceConfig.withFallback(reference).resolve();
+    testConfiguration = new ComponentConfigurator<>(HdfsElasticsearchConfiguration.class).detectConfiguration(typesafe);
+    testClient = ElasticsearchClientManager.getInstance(testConfiguration.getDestination()).client();
 
-        ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
-        ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
-        assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
+    ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
+    ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
+    assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
 
-        IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getDestination().getIndex());
-        IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
-        if(indicesExistsResponse.isExists()) {
-            DeleteIndexRequest deleteIndexRequest = Requests.deleteIndexRequest(testConfiguration.getDestination().getIndex());
-            DeleteIndexResponse deleteIndexResponse = testClient.admin().indices().delete(deleteIndexRequest).actionGet();
-            assertTrue(deleteIndexResponse.isAcknowledged());
-        };
-    }
+    IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getDestination().getIndex());
+    IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+    if(indicesExistsResponse.isExists()) {
+      DeleteIndexRequest deleteIndexRequest = Requests.deleteIndexRequest(testConfiguration.getDestination().getIndex());
+      DeleteIndexResponse deleteIndexResponse = testClient.admin().indices().delete(deleteIndexRequest).actionGet();
+      assertTrue(deleteIndexResponse.isAcknowledged());
+    };
+  }
 
-    @Test
-    public void ElasticsearchHdfsIT() throws Exception {
+  @Test
+  public void ElasticsearchHdfsIT() throws Exception {
 
-        HdfsElasticsearch restore = new HdfsElasticsearch(testConfiguration);
+    HdfsElasticsearch restore = new HdfsElasticsearch(testConfiguration);
 
-        restore.run();
+    restore.run();
 
-        IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getDestination().getIndex());
-        IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
-        assertTrue(indicesExistsResponse.isExists());
+    IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getDestination().getIndex());
+    IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+    assertTrue(indicesExistsResponse.isExists());
 
-        SearchRequestBuilder countRequest = testClient
-                .prepareSearch(testConfiguration.getDestination().getIndex())
-                .setTypes(testConfiguration.getDestination().getType());
-        SearchResponse countResponse = countRequest.execute().actionGet();
+    SearchRequestBuilder countRequest = testClient
+        .prepareSearch(testConfiguration.getDestination().getIndex())
+        .setTypes(testConfiguration.getDestination().getType());
+    SearchResponse countResponse = countRequest.execute().actionGet();
 
-        assertEquals(countResponse.getHits().getTotalHits(), 89);
+    assertEquals(countResponse.getHits().getTotalHits(), 89);
 
-    }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/local/elasticsearch-hdfs/src/test/resources/ElasticsearchHdfsIT.conf
----------------------------------------------------------------------
diff --git a/local/elasticsearch-hdfs/src/test/resources/ElasticsearchHdfsIT.conf b/local/elasticsearch-hdfs/src/test/resources/ElasticsearchHdfsIT.conf
index 80ef53a..42cb01c 100644
--- a/local/elasticsearch-hdfs/src/test/resources/ElasticsearchHdfsIT.conf
+++ b/local/elasticsearch-hdfs/src/test/resources/ElasticsearchHdfsIT.conf
@@ -27,3 +27,4 @@ destination {
   path = "target/test-classes"
   writerPath = "elasticsearch_hdfs_it"
 }
+taskTimeoutMs = 60000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/local/elasticsearch-hdfs/src/test/resources/HdfsElasticsearchIT.conf
----------------------------------------------------------------------
diff --git a/local/elasticsearch-hdfs/src/test/resources/HdfsElasticsearchIT.conf b/local/elasticsearch-hdfs/src/test/resources/HdfsElasticsearchIT.conf
index 072a024..30be89b 100644
--- a/local/elasticsearch-hdfs/src/test/resources/HdfsElasticsearchIT.conf
+++ b/local/elasticsearch-hdfs/src/test/resources/HdfsElasticsearchIT.conf
@@ -28,4 +28,5 @@ destination {
   type = "activity"
   refresh = true
   forceUseConfig = true
-}
\ No newline at end of file
+}
+taskTimeoutMs = 60000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/local/elasticsearch-reindex/src/main/java/org/apache/streams/example/ElasticsearchReindex.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-reindex/src/main/java/org/apache/streams/example/ElasticsearchReindex.java b/local/elasticsearch-reindex/src/main/java/org/apache/streams/example/ElasticsearchReindex.java
index 676a272..476e369 100644
--- a/local/elasticsearch-reindex/src/main/java/org/apache/streams/example/ElasticsearchReindex.java
+++ b/local/elasticsearch-reindex/src/main/java/org/apache/streams/example/ElasticsearchReindex.java
@@ -23,59 +23,59 @@ import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamBuilder;
 import org.apache.streams.elasticsearch.ElasticsearchPersistReader;
 import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.local.LocalRuntimeConfiguration;
 import org.apache.streams.local.builders.LocalStreamBuilder;
 
 import com.google.common.collect.Maps;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 
 /**
- * Copies documents into a new index
+ * Copies documents from the source index to the destination index.
  */
 public class ElasticsearchReindex implements Runnable {
 
-    public final static String STREAMS_ID = "ElasticsearchReindex";
+  public final static String STREAMS_ID = "ElasticsearchReindex";
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchReindex.class);
+  private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchReindex.class);
 
-    ElasticsearchReindexConfiguration config;
+  ElasticsearchReindexConfiguration config;
 
-    public ElasticsearchReindex() {
-       this(new ComponentConfigurator<>(ElasticsearchReindexConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
+  public ElasticsearchReindex() {
+    this(new ComponentConfigurator<>(ElasticsearchReindexConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
 
-    }
+  }
 
-    public ElasticsearchReindex(ElasticsearchReindexConfiguration reindex) {
-        this.config = reindex;
-    }
+  public ElasticsearchReindex(ElasticsearchReindexConfiguration reindex) {
+    this.config = reindex;
+  }
 
-    public static void main(String[] args)
-    {
-        LOGGER.info(StreamsConfigurator.config.toString());
+  public static void main(String[] args)
+  {
+    LOGGER.info(StreamsConfigurator.config.toString());
 
-        ElasticsearchReindex reindex = new ElasticsearchReindex();
+    ElasticsearchReindex reindex = new ElasticsearchReindex();
 
-        new Thread(reindex).start();
+    new Thread(reindex).start();
 
-    }
+  }
 
-    @Override
-    public void run() {
+  @Override
+  public void run() {
 
-        ElasticsearchPersistReader elasticsearchPersistReader = new ElasticsearchPersistReader(config.getSource());
+    ElasticsearchPersistReader elasticsearchPersistReader = new ElasticsearchPersistReader(config.getSource());
 
-        ElasticsearchPersistWriter elasticsearchPersistWriter = new ElasticsearchPersistWriter(config.getDestination());
+    ElasticsearchPersistWriter elasticsearchPersistWriter = new ElasticsearchPersistWriter(config.getDestination());
 
-        Map<String, Object> streamConfig = Maps.newHashMap();
-        streamConfig.put(LocalStreamBuilder.STREAM_IDENTIFIER_KEY, STREAMS_ID);
-        streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 7 * 24 * 60 * 1000);
-        StreamBuilder builder = new LocalStreamBuilder(1000, streamConfig);
+    LocalRuntimeConfiguration localRuntimeConfiguration =
+        StreamsJacksonMapper.getInstance().convertValue(StreamsConfigurator.detectConfiguration(), LocalRuntimeConfiguration.class);
+    StreamBuilder builder = new LocalStreamBuilder(localRuntimeConfiguration);
 
-        builder.newPerpetualStream(ElasticsearchPersistReader.class.getCanonicalName(), elasticsearchPersistReader);
-        builder.addStreamsPersistWriter(ElasticsearchPersistWriter.class.getCanonicalName(), elasticsearchPersistWriter, 1, ElasticsearchPersistReader.class.getCanonicalName());
-        builder.start();
-    }
+    builder.newPerpetualStream(ElasticsearchPersistReader.class.getCanonicalName(), elasticsearchPersistReader);
+    builder.addStreamsPersistWriter(ElasticsearchPersistWriter.class.getCanonicalName(), elasticsearchPersistWriter, 1, ElasticsearchPersistReader.class.getCanonicalName());
+    builder.start();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexChildIT.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexChildIT.java b/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexChildIT.java
index 3260949..04d7cd6 100644
--- a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexChildIT.java
+++ b/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexChildIT.java
@@ -28,7 +28,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigParseOptions;
-
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
 import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
@@ -50,64 +49,64 @@ import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertNotEquals;
 
 /**
- * Test copying parent/child associated documents between two indexes on same cluster
+ * Test copying parent/child associated documents between two indexes on same cluster.
  */
 public class ElasticsearchReindexChildIT {
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchReindexIT.class);
+  private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchReindexIT.class);
 
-    ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+  ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
 
-    protected ElasticsearchReindexConfiguration testConfiguration;
-    protected Client testClient;
+  protected ElasticsearchReindexConfiguration testConfiguration;
+  protected Client testClient;
 
-    private int count = 0;
+  private int count = 0;
 
-    @BeforeClass
-    public void prepareTest() throws Exception {
+  @BeforeClass
+  public void prepareTest() throws Exception {
 
-        Config reference  = ConfigFactory.load();
-        File conf_file = new File("target/test-classes/ElasticsearchReindexChildIT.conf");
-        assert(conf_file.exists());
-        Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
-        Config typesafe = testResourceConfig.withFallback(reference).resolve();
-        testConfiguration = new ComponentConfigurator<>(ElasticsearchReindexConfiguration.class).detectConfiguration(typesafe);
-        testClient = ElasticsearchClientManager.getInstance(testConfiguration.getSource()).client();
+    Config reference  = ConfigFactory.load();
+    File conf_file = new File("target/test-classes/ElasticsearchReindexChildIT.conf");
+    assert(conf_file.exists());
+    Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+    Config typesafe = testResourceConfig.withFallback(reference).resolve();
+    testConfiguration = new ComponentConfigurator<>(ElasticsearchReindexConfiguration.class).detectConfiguration(typesafe);
+    testClient = ElasticsearchClientManager.getInstance(testConfiguration.getSource()).client();
 
-        ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
-        ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
-        assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
+    ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
+    ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
+    assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
 
-        IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getSource().getIndexes().get(0));
-        IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
-        assertThat(indicesExistsResponse.isExists(), is(true));
+    IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getSource().getIndexes().get(0));
+    IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+    assertThat(indicesExistsResponse.isExists(), is(true));
 
-        SearchRequestBuilder countRequest = testClient
-                .prepareSearch(testConfiguration.getSource().getIndexes().get(0))
-                .setTypes(testConfiguration.getSource().getTypes().get(0));
-        SearchResponse countResponse = countRequest.execute().actionGet();
+    SearchRequestBuilder countRequest = testClient
+        .prepareSearch(testConfiguration.getSource().getIndexes().get(0))
+        .setTypes(testConfiguration.getSource().getTypes().get(0));
+    SearchResponse countResponse = countRequest.execute().actionGet();
 
-        count = (int)countResponse.getHits().getTotalHits();
+    count = (int)countResponse.getHits().getTotalHits();
 
-        assertNotEquals(count, 0);
+    assertNotEquals(count, 0);
 
-    }
+  }
 
-    @Test
-    public void testReindex() throws Exception {
+  @Test
+  public void testReindex() throws Exception {
 
-        ElasticsearchReindex reindex = new ElasticsearchReindex(testConfiguration);
+    ElasticsearchReindex reindex = new ElasticsearchReindex(testConfiguration);
 
-        reindex.run();
+    reindex.run();
 
-        // assert lines in file
-        SearchRequestBuilder countRequest = testClient
-                .prepareSearch(testConfiguration.getDestination().getIndex())
-                .setTypes(testConfiguration.getDestination().getType());
-        SearchResponse countResponse = countRequest.execute().actionGet();
+    // assert lines in file
+    SearchRequestBuilder countRequest = testClient
+        .prepareSearch(testConfiguration.getDestination().getIndex())
+        .setTypes(testConfiguration.getDestination().getType());
+    SearchResponse countResponse = countRequest.execute().actionGet();
 
-        assertThat((int)countResponse.getHits().getTotalHits(), is(count));
+    assertThat((int)countResponse.getHits().getTotalHits(), is(count));
 
-    }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexIT.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexIT.java b/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexIT.java
index a324e24..6c69388 100644
--- a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexIT.java
+++ b/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexIT.java
@@ -53,59 +53,59 @@ import static org.hamcrest.core.IsNot.not;
  */
 public class ElasticsearchReindexIT {
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchReindexIT.class);
+  private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchReindexIT.class);
 
-    ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+  ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
 
-    protected ElasticsearchReindexConfiguration testConfiguration;
-    protected Client testClient;
+  protected ElasticsearchReindexConfiguration testConfiguration;
+  protected Client testClient;
 
-    private int count = 0;
+  private int count = 0;
 
-    @BeforeClass
-    public void prepareTest() throws Exception {
+  @BeforeClass
+  public void prepareTest() throws Exception {
 
-        Config reference  = ConfigFactory.load();
-        File conf_file = new File("target/test-classes/ElasticsearchReindexIT.conf");
-        assert(conf_file.exists());
-        Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
-        Config typesafe  = testResourceConfig.withFallback(reference).resolve();
-        testConfiguration = new ComponentConfigurator<>(ElasticsearchReindexConfiguration.class).detectConfiguration(typesafe);
-        testClient = ElasticsearchClientManager.getInstance(testConfiguration.getSource()).client();
+    Config reference  = ConfigFactory.load();
+    File conf_file = new File("target/test-classes/ElasticsearchReindexIT.conf");
+    assert(conf_file.exists());
+    Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+    Config typesafe  = testResourceConfig.withFallback(reference).resolve();
+    testConfiguration = new ComponentConfigurator<>(ElasticsearchReindexConfiguration.class).detectConfiguration(typesafe);
+    testClient = ElasticsearchClientManager.getInstance(testConfiguration.getSource()).client();
 
-        ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
-        ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
-        assertThat(clusterHealthResponse.getStatus(), not(ClusterHealthStatus.RED));
+    ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
+    ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
+    assertThat(clusterHealthResponse.getStatus(), not(ClusterHealthStatus.RED));
 
-        IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getSource().getIndexes().get(0));
-        IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
-        assertThat(indicesExistsResponse.isExists(), is(true));
+    IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getSource().getIndexes().get(0));
+    IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+    assertThat(indicesExistsResponse.isExists(), is(true));
 
-        SearchRequestBuilder countRequest = testClient
-            .prepareSearch(testConfiguration.getSource().getIndexes().get(0))
-            .setTypes(testConfiguration.getSource().getTypes().get(0));
-        SearchResponse countResponse = countRequest.execute().actionGet();
+    SearchRequestBuilder countRequest = testClient
+        .prepareSearch(testConfiguration.getSource().getIndexes().get(0))
+        .setTypes(testConfiguration.getSource().getTypes().get(0));
+    SearchResponse countResponse = countRequest.execute().actionGet();
 
-        count = (int)countResponse.getHits().getTotalHits();
+    count = (int)countResponse.getHits().getTotalHits();
 
-        assertThat(count, not(0));
+    assertThat(count, not(0));
 
-    }
+  }
 
-    @Test
-    public void testReindex() throws Exception {
+  @Test
+  public void testReindex() throws Exception {
 
-        ElasticsearchReindex reindex = new ElasticsearchReindex(testConfiguration);
+    ElasticsearchReindex reindex = new ElasticsearchReindex(testConfiguration);
 
-        reindex.run();
+    reindex.run();
 
-        // assert lines in file
-        SearchRequestBuilder countRequest = testClient
-            .prepareSearch(testConfiguration.getDestination().getIndex())
-            .setTypes(testConfiguration.getDestination().getType());
-        SearchResponse countResponse = countRequest.execute().actionGet();
+    // assert lines in file
+    SearchRequestBuilder countRequest = testClient
+        .prepareSearch(testConfiguration.getDestination().getIndex())
+        .setTypes(testConfiguration.getDestination().getType());
+    SearchResponse countResponse = countRequest.execute().actionGet();
 
-        assertThat((int)countResponse.getHits().getTotalHits(), is(count));
+    assertThat((int)countResponse.getHits().getTotalHits(), is(count));
 
-    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexParentIT.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexParentIT.java b/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexParentIT.java
index 988852a..e53c057 100644
--- a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexParentIT.java
+++ b/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexParentIT.java
@@ -30,7 +30,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigParseOptions;
-
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
 import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
@@ -59,68 +58,68 @@ import static org.testng.Assert.assertTrue;
  */
 public class ElasticsearchReindexParentIT {
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchReindexIT.class);
+  private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchReindexIT.class);
 
-    ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+  ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
 
-    protected ElasticsearchReindexConfiguration testConfiguration;
-    protected Client testClient;
+  protected ElasticsearchReindexConfiguration testConfiguration;
+  protected Client testClient;
 
-    private int count = 0;
+  private int count = 0;
 
-    @BeforeClass
-    public void prepareTest() throws Exception {
+  @BeforeClass
+  public void prepareTest() throws Exception {
 
-        Config reference  = ConfigFactory.load();
-        File conf_file = new File("target/test-classes/ElasticsearchReindexParentIT.conf");
-        assert(conf_file.exists());
-        Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
-        Config typesafe  = testResourceConfig.withFallback(reference).resolve();
-        testConfiguration = new ComponentConfigurator<>(ElasticsearchReindexConfiguration.class).detectConfiguration(typesafe);
-        testClient = ElasticsearchClientManager.getInstance(testConfiguration.getSource()).client();
+    Config reference  = ConfigFactory.load();
+    File conf_file = new File("target/test-classes/ElasticsearchReindexParentIT.conf");
+    assert(conf_file.exists());
+    Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+    Config typesafe  = testResourceConfig.withFallback(reference).resolve();
+    testConfiguration = new ComponentConfigurator<>(ElasticsearchReindexConfiguration.class).detectConfiguration(typesafe);
+    testClient = ElasticsearchClientManager.getInstance(testConfiguration.getSource()).client();
 
-        ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
-        ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
-        assertThat(clusterHealthResponse.getStatus(), not(ClusterHealthStatus.RED));
+    ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
+    ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
+    assertThat(clusterHealthResponse.getStatus(), not(ClusterHealthStatus.RED));
 
-        IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getSource().getIndexes().get(0));
-        IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
-        assertTrue(indicesExistsResponse.isExists());
+    IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getSource().getIndexes().get(0));
+    IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+    assertTrue(indicesExistsResponse.isExists());
 
-        SearchRequestBuilder countRequest = testClient
-                .prepareSearch(testConfiguration.getSource().getIndexes().get(0))
-                .setTypes(testConfiguration.getSource().getTypes().get(0));
-        SearchResponse countResponse = countRequest.execute().actionGet();
+    SearchRequestBuilder countRequest = testClient
+        .prepareSearch(testConfiguration.getSource().getIndexes().get(0))
+        .setTypes(testConfiguration.getSource().getTypes().get(0));
+    SearchResponse countResponse = countRequest.execute().actionGet();
 
-        count = (int)countResponse.getHits().getTotalHits();
+    count = (int)countResponse.getHits().getTotalHits();
 
-        PutIndexTemplateRequestBuilder putTemplateRequestBuilder = testClient.admin().indices().preparePutTemplate("mappings");
-        URL templateURL = ElasticsearchParentChildWriterIT.class.getResource("/ActivityChildObjectParent.json");
-        ObjectNode template = MAPPER.readValue(templateURL, ObjectNode.class);
-        String templateSource = MAPPER.writeValueAsString(template);
-        putTemplateRequestBuilder.setSource(templateSource);
+    PutIndexTemplateRequestBuilder putTemplateRequestBuilder = testClient.admin().indices().preparePutTemplate("mappings");
+    URL templateURL = ElasticsearchParentChildWriterIT.class.getResource("/ActivityChildObjectParent.json");
+    ObjectNode template = MAPPER.readValue(templateURL, ObjectNode.class);
+    String templateSource = MAPPER.writeValueAsString(template);
+    putTemplateRequestBuilder.setSource(templateSource);
 
-        testClient.admin().indices().putTemplate(putTemplateRequestBuilder.request()).actionGet();
+    testClient.admin().indices().putTemplate(putTemplateRequestBuilder.request()).actionGet();
 
-        assertThat(count, not(0));
+    assertThat(count, not(0));
 
-    }
+  }
 
-    @Test
-    public void testReindex() throws Exception {
+  @Test
+  public void testReindex() throws Exception {
 
-        ElasticsearchReindex reindex = new ElasticsearchReindex(testConfiguration);
+    ElasticsearchReindex reindex = new ElasticsearchReindex(testConfiguration);
 
-        reindex.run();
+    reindex.run();
 
-        // assert lines in file
-        SearchRequestBuilder countRequest = testClient
-                .prepareSearch(testConfiguration.getDestination().getIndex())
-                .setTypes(testConfiguration.getDestination().getType());
-        SearchResponse countResponse = countRequest.execute().actionGet();
+    // assert lines in file
+    SearchRequestBuilder countRequest = testClient
+        .prepareSearch(testConfiguration.getDestination().getIndex())
+        .setTypes(testConfiguration.getDestination().getType());
+    SearchResponse countResponse = countRequest.execute().actionGet();
 
-        assertThat((int)countResponse.getHits().getTotalHits(), is(count));
+    assertThat((int)countResponse.getHits().getTotalHits(), is(count));
 
-    }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexChildIT.conf
----------------------------------------------------------------------
diff --git a/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexChildIT.conf b/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexChildIT.conf
index 424d725..159b7d5 100644
--- a/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexChildIT.conf
+++ b/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexChildIT.conf
@@ -33,4 +33,5 @@
     "type": "activity",
     "forceUseConfig": true
   }
+  taskTimeoutMs = 60000
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexIT.conf
----------------------------------------------------------------------
diff --git a/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexIT.conf b/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexIT.conf
index 0062f0f..faac23e 100644
--- a/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexIT.conf
+++ b/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexIT.conf
@@ -33,4 +33,5 @@
     "type": "activity",
     "forceUseConfig": true
   }
+  taskTimeoutMs = 60000
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexParentIT.conf
----------------------------------------------------------------------
diff --git a/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexParentIT.conf b/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexParentIT.conf
index 424d725..159b7d5 100644
--- a/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexParentIT.conf
+++ b/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexParentIT.conf
@@ -33,4 +33,5 @@
     "type": "activity",
     "forceUseConfig": true
   }
+  taskTimeoutMs = 60000
 }