You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2020/08/12 16:02:50 UTC

[streams] branch master updated (512835f -> 2e4e52f)

This is an automated email from the ASF dual-hosted git repository.

sblackmon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/streams.git.


    from 512835f  STREAMS-666: ensure rat check passes to prepare for 0.7.0 release
     new c8f58e2  STREAMS-668 upgrade flink examples to latest flink version
     new 76ed665  name file sink
     new 132504b  use newer flink file sink
     new 2e4e52f  don’t interfere with flink environment-level parallelism

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../twitter/provider/TwitterFollowingProvider.java |   4 +-
 .../twitter/provider/TwitterTimelineProvider.java  |  31 +++--
 .../provider/TwitterUserInformationProvider.java   | 127 +++++++++++++--------
 .../TwitterUserInformationProviderTask.java        |  29 ++++-
 .../flink-twitter-collection/pom.xml               |  26 +++--
 .../apache/streams/examples/flink/FlinkBase.scala  |  23 +++-
 .../collection/FlinkTwitterFollowingPipeline.scala | 104 ++++++++---------
 .../collection/FlinkTwitterPostsPipeline.scala     |  63 +++++++---
 .../collection/FlinkTwitterSpritzerPipeline.scala  |  60 ++++++----
 .../FlinkTwitterUserInformationPipeline.scala      |  63 +++++++---
 .../FollowingCollectorFlatMapFunction.scala        |  52 +++++++++
 .../TimelineCollectorFlatMapFunction.scala         |  52 +++++++++
 .../UserInformationCollectorFlatMapFunction.scala  |  48 ++++++++
 .../FlinkTwitterFollowingPipelineFollowersIT.conf  |   3 +-
 .../FlinkTwitterFollowingPipelineFriendsIT.conf    |   1 +
 .../resources/FlinkTwitterPostsPipelineIT.conf     |   1 +
 .../FlinkTwitterUserInformationPipelineIT.conf     |   9 +-
 .../src/test/resources/testng.xml                  |  24 ++--
 .../FlinkTwitterFollowingPipelineFollowersIT.scala |  16 ++-
 .../FlinkTwitterFollowingPipelineFriendsIT.scala   |  18 +--
 .../twitter/test/FlinkTwitterPostsPipelineIT.scala |  26 +++--
 .../test/FlinkTwitterSpritzerPipelineIT.scala      |  24 ++--
 .../FlinkTwitterUserInformationPipelineIT.scala    |  26 +++--
 23 files changed, 595 insertions(+), 235 deletions(-)
 create mode 100644 streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FollowingCollectorFlatMapFunction.scala
 create mode 100644 streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/TimelineCollectorFlatMapFunction.scala
 create mode 100644 streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/UserInformationCollectorFlatMapFunction.scala
 copy streams-examples/{streams-examples-local/elasticsearch-reindex => streams-examples-flink/flink-twitter-collection}/src/test/resources/testng.xml (54%)


[streams] 02/04: name file sink

Posted by sb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sblackmon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/streams.git

commit 76ed6652c13cb79fcbd98627154df6b44c708f6f
Author: Steve Blackmon <st...@peoplepatternmacpro.lan>
AuthorDate: Tue Aug 4 19:44:18 2020 -0500

    name file sink
---
 .../flink/twitter/collection/FlinkTwitterFollowingPipeline.scala        | 2 +-
 .../examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala   | 2 +-
 .../flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala         | 2 +-
 .../flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala  | 2 +-
 4 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
index bae988c..6f20216 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
@@ -164,7 +164,7 @@ class FlinkTwitterFollowingPipeline(config: TwitterFollowingPipelineConfiguratio
     if( config.getTest == true ) {
       keyed_jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
     } else {
-      keyed_jsons.name("fileSink").addSink(fileSink)
+      keyed_jsons.addSink(fileSink).name("fileSink")
     }
 
     val result: JobExecutionResult = env.execute("FlinkTwitterFollowingPipeline")
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
index 14eebc5..ed45a9f 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
@@ -167,7 +167,7 @@ class FlinkTwitterPostsPipeline(config: TwitterPostsPipelineConfiguration = new
     if( config.getTest == true ) {
       keyed_jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
     } else {
-      keyed_jsons.addSink(fileSink)
+      keyed_jsons.addSink(fileSink).name("fileSink")
     }
 
     val result: JobExecutionResult = env.execute("FlinkTwitterPostsPipeline")
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
index 6e38591..6235672 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
@@ -141,7 +141,7 @@ class FlinkTwitterSpritzerPipeline(config: TwitterSpritzerPipelineConfiguration
     if( config.getTest == true ) {
       keyed_jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
     } else {
-      keyed_jsons.addSink(fileSink)
+      keyed_jsons.addSink(fileSink).name("fileSink")
     }
 
     val result: JobExecutionResult = env.execute("FlinkTwitterPostsPipeline")
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
index 32cf232..21bc911 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
@@ -170,7 +170,7 @@ class FlinkTwitterUserInformationPipeline(config: TwitterUserInformationPipeline
     if( config.getTest == true ) {
       keyed_jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
     } else {
-      keyed_jsons.addSink(fileSink)
+      keyed_jsons.addSink(fileSink).name("fileSink")
     }
 
     val result: JobExecutionResult = env.execute("FlinkTwitterUserInformationPipeline")


[streams] 04/04: don’t interfere with flink environment-level parallelism

Posted by sb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sblackmon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/streams.git

commit 2e4e52f5b80372f564150eb9955231c5d388f1f8
Author: sblackmon <sb...@apache.org>
AuthorDate: Sun Aug 9 11:02:51 2020 -0500

    don’t interfere with flink environment-level parallelism
---
 .../flink/twitter/collection/FlinkTwitterFollowingPipeline.scala         | 1 -
 .../examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala    | 1 -
 .../flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala   | 1 -
 3 files changed, 3 deletions(-)

diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
index 96a7739..fce134d 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
@@ -120,7 +120,6 @@ class FlinkTwitterFollowingPipeline(config: TwitterFollowingPipelineConfiguratio
 
     val env: StreamExecutionEnvironment = streamEnvironment(config)
 
-    env.setParallelism(streamsConfig.getParallelism().toInt)
     env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
     env.setNumberOfExecutionRetries(0)
 
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
index aad7f32..94540dd 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
@@ -129,7 +129,6 @@ class FlinkTwitterPostsPipeline(config: TwitterPostsPipelineConfiguration = new
 
     val env: StreamExecutionEnvironment = streamEnvironment(config)
 
-    env.setParallelism(streamsConfig.getParallelism().toInt)
     env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
     env.setNumberOfExecutionRetries(0)
 
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
index 080ce57..eb15556 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
@@ -133,7 +133,6 @@ class FlinkTwitterUserInformationPipeline(config: TwitterUserInformationPipeline
 
     val env: StreamExecutionEnvironment = streamEnvironment(config)
 
-    env.setParallelism(streamsConfig.getParallelism().toInt)
     env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
     env.setNumberOfExecutionRetries(0)
 


[streams] 01/04: STREAMS-668 upgrade flink examples to latest flink version

Posted by sb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sblackmon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/streams.git

commit c8f58e2ad23c33092337357854443f1bfeef2606
Author: Steve Blackmon <st...@peoplepatternmacpro.lan>
AuthorDate: Tue Aug 4 14:25:50 2020 -0500

    STREAMS-668 upgrade flink examples to latest flink version
    
    STREAMS-668
    upgrade flink examples to latest flink version 1.11.1
    also improved threading throughout streams-provider-twitter
    added accumulators to twitter flink pipelines
---
 .../twitter/provider/TwitterFollowingProvider.java |   4 +-
 .../twitter/provider/TwitterTimelineProvider.java  |  31 +++--
 .../provider/TwitterUserInformationProvider.java   | 127 +++++++++++++--------
 .../TwitterUserInformationProviderTask.java        |  29 ++++-
 .../flink-twitter-collection/pom.xml               |  26 +++--
 .../apache/streams/examples/flink/FlinkBase.scala  |  13 ++-
 .../collection/FlinkTwitterFollowingPipeline.scala | 103 ++++++++---------
 .../collection/FlinkTwitterPostsPipeline.scala     |  63 +++++++---
 .../collection/FlinkTwitterSpritzerPipeline.scala  |  58 ++++++----
 .../FlinkTwitterUserInformationPipeline.scala      |  62 +++++++---
 .../FollowingCollectorFlatMapFunction.scala        |  52 +++++++++
 .../TimelineCollectorFlatMapFunction.scala         |  52 +++++++++
 .../UserInformationCollectorFlatMapFunction.scala  |  48 ++++++++
 .../FlinkTwitterFollowingPipelineFollowersIT.conf  |   3 +-
 .../FlinkTwitterFollowingPipelineFriendsIT.conf    |   1 +
 .../resources/FlinkTwitterPostsPipelineIT.conf     |   1 +
 .../FlinkTwitterUserInformationPipelineIT.conf     |   9 +-
 .../src/test/resources/testng.xml                  |  54 +++++++++
 .../FlinkTwitterFollowingPipelineFollowersIT.scala |   2 +-
 .../FlinkTwitterFollowingPipelineFriendsIT.scala   |   4 +-
 .../twitter/test/FlinkTwitterPostsPipelineIT.scala |   4 +-
 .../FlinkTwitterUserInformationPipelineIT.scala    |   4 +-
 22 files changed, 562 insertions(+), 188 deletions(-)

diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
index 4603803..eb472ea 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
@@ -332,6 +332,8 @@ public class TwitterFollowingProvider implements Callable<Iterator<Follow>>, Str
   }
 
   public boolean isRunning() {
+    LOGGER.debug("providerQueue.isEmpty: {}", providerQueue.isEmpty());
+    LOGGER.debug("providerQueue.size: {}", providerQueue.size());
     LOGGER.debug("executor.isShutdown: {}", executor.isShutdown());
     LOGGER.debug("executor.isTerminated: {}", executor.isTerminated());
     LOGGER.debug("tasks.size(): {}", tasks.size());
@@ -346,7 +348,7 @@ public class TwitterFollowingProvider implements Callable<Iterator<Follow>>, Str
       allTasksComplete = false;
     }
     LOGGER.debug("allTasksComplete: {}", allTasksComplete);
-    boolean finished = allTasksComplete && tasks.size() > 0 && tasks.size() == futures.size() && executor.isShutdown() && executor.isTerminated();
+    boolean finished = tasks.size() > 0 && tasks.size() == futures.size() && executor.isShutdown() && executor.isTerminated() && allTasksComplete && providerQueue.size() == 0;
     LOGGER.debug("finished: {}", finished);
     if ( finished ) {
       running.set(false);
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index eee81c4..126683c 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -206,6 +206,7 @@ public class TwitterTimelineProvider implements Callable<Iterator<Tweet>>, Strea
     }
 
     Objects.requireNonNull(providerQueue);
+    Objects.requireNonNull(config);
     Objects.requireNonNull(config.getOauth().getConsumerKey());
     Objects.requireNonNull(config.getOauth().getConsumerSecret());
     Objects.requireNonNull(config.getOauth().getAccessToken());
@@ -244,6 +245,8 @@ public class TwitterTimelineProvider implements Callable<Iterator<Tweet>>, Strea
         )
     );
 
+    Objects.requireNonNull(executor);
+
     submitTimelineThreads(ids, names);
 
     LOGGER.info("tasks: {}", tasks.size());
@@ -323,6 +326,8 @@ public class TwitterTimelineProvider implements Callable<Iterator<Tweet>>, Strea
       running.set(false);
 
       LOGGER.info("Exiting");
+    } else {
+      LOGGER.info("Not Finished Yet...");
     }
 
     return result;
@@ -350,25 +355,37 @@ public class TwitterTimelineProvider implements Callable<Iterator<Tweet>>, Strea
   }
 
   @Override
-  public void cleanUp() {
-    ExecutorUtils.shutdownAndAwaitTermination(executor);
-  }
-
-  @Override
   public boolean isRunning() {
     LOGGER.debug("providerQueue.isEmpty: {}", providerQueue.isEmpty());
     LOGGER.debug("providerQueue.size: {}", providerQueue.size());
+    LOGGER.debug("executor.isShutdown: {}", executor.isShutdown());
     LOGGER.debug("executor.isTerminated: {}", executor.isTerminated());
     LOGGER.debug("tasks.size(): {}", tasks.size());
     LOGGER.debug("futures.size(): {}", futures.size());
-    if ( tasks.size() > 0 && tasks.size() == futures.size() && executor.isShutdown() && executor.isTerminated() ) {
+    boolean allTasksComplete;
+    if( futures.size() > 0) {
+      allTasksComplete = true;
+      for(Future<?> future : futures){
+        allTasksComplete |= !future.isDone(); // check if future is done
+      }
+    } else {
+      allTasksComplete = false;
+    }
+    LOGGER.debug("allTasksComplete: {}", allTasksComplete);
+    boolean finished = tasks.size() > 0 && tasks.size() == futures.size() && executor.isShutdown() && executor.isTerminated() && allTasksComplete && providerQueue.size() == 0;
+    LOGGER.debug("finished: {}", finished);
+    if ( finished ) {
       running.set(false);
     }
-    LOGGER.debug("isRunning: ", running.get());
     return running.get();
   }
 
   @Override
+  public void cleanUp() {
+    ExecutorUtils.shutdownAndAwaitTermination(executor);
+  }
+
+  @Override
   public Iterator<Tweet> call() {
     prepare(config);
     startStream();
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
index 6ceeb88..dc0d050 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
@@ -40,6 +40,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
+
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.typesafe.config.Config;
@@ -56,6 +57,7 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.PrintStream;
 import java.io.Serializable;
+import java.lang.Runnable;
 import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -64,6 +66,8 @@ import java.util.List;
 import java.util.Objects;
 import java.util.Queue;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -103,14 +107,15 @@ public class TwitterUserInformationProvider implements Callable<Iterator<User>>,
 
   protected Twitter client;
 
-  protected ListeningExecutorService executor;
+  protected ExecutorService executor;
 
   protected DateTime start;
   protected DateTime end;
 
   protected final AtomicBoolean running = new AtomicBoolean();
 
-  private List<ListenableFuture<Object>> futures = new ArrayList<>();
+  private List<Runnable> tasks = new ArrayList<>();
+  private List<Future<Object>> futures = new ArrayList<>();
 
   /**
    * To use from command line:
@@ -165,10 +170,11 @@ public class TwitterUserInformationProvider implements Callable<Iterator<User>>,
           System.err.println(ex.getMessage());
         }
       }
+      outStream.flush();
     }
     while ( provider.isRunning());
     provider.cleanUp();
-    outStream.flush();
+    outStream.close();
   }
 
   // TODO: this should be abstracted out
@@ -199,9 +205,19 @@ public class TwitterUserInformationProvider implements Callable<Iterator<User>>,
   public void prepare(Object configurationObject) {
 
     if ( configurationObject instanceof TwitterFollowingConfiguration ) {
-      config = (TwitterUserInformationConfiguration) configurationObject;
+      this.config = (TwitterUserInformationConfiguration) configurationObject;
     }
 
+    streamsConfiguration = StreamsConfigurator.detectConfiguration(StreamsConfigurator.getConfig());
+
+    try {
+      lock.writeLock().lock();
+      providerQueue = QueueUtils.constructQueue();
+    } finally {
+      lock.writeLock().unlock();
+    }
+
+    Objects.requireNonNull(providerQueue);
     Objects.requireNonNull(config);
     Objects.requireNonNull(config.getOauth());
     Objects.requireNonNull(config.getOauth().getConsumerKey());
@@ -211,10 +227,6 @@ public class TwitterUserInformationProvider implements Callable<Iterator<User>>,
     Objects.requireNonNull(config.getInfo());
     Objects.requireNonNull(config.getThreadsPerProvider());
 
-    streamsConfiguration = StreamsConfigurator.detectConfiguration();
-
-    Objects.requireNonNull(streamsConfiguration.getQueueSize());
-
     try {
       client = getTwitterClient();
     } catch (InstantiationException e) {
@@ -223,15 +235,6 @@ public class TwitterUserInformationProvider implements Callable<Iterator<User>>,
 
     Objects.requireNonNull(client);
 
-    try {
-      lock.writeLock().lock();
-      providerQueue = QueueUtils.constructQueue();
-    } finally {
-      lock.writeLock().unlock();
-    }
-
-    Objects.requireNonNull(providerQueue);
-
     for (String s : config.getInfo()) {
       if (s != null) {
         String potentialScreenName = s.replaceAll("@", "").trim().toLowerCase();
@@ -247,7 +250,7 @@ public class TwitterUserInformationProvider implements Callable<Iterator<User>>,
     }
 
     executor = MoreExecutors.listeningDecorator(
-        ExecutorUtils.newFixedThreadPoolWithQueueSize(
+      ExecutorUtils.newFixedThreadPoolWithQueueSize(
             config.getThreadsPerProvider().intValue(),
             streamsConfiguration.getQueueSize().intValue()
         )
@@ -255,12 +258,38 @@ public class TwitterUserInformationProvider implements Callable<Iterator<User>>,
 
     Objects.requireNonNull(executor);
 
-    // Twitter allows for batches up to 100 per request, but you cannot mix types
     submitUserInformationThreads(ids, names);
+
+    LOGGER.info("tasks: {}", tasks.size());
+    LOGGER.info("futures: {}", futures.size());
+
+  }
+
+  @Override
+  public void startStream() {
+
+    Objects.requireNonNull(executor);
+
+    LOGGER.info("startStream");
+
+    running.set(true);
+
+    LOGGER.info("running: {}", running.get());
+
+    ExecutorUtils.shutdownAndAwaitTermination(executor);
+
+    LOGGER.info("running: {}", running.get());
+
   }
 
   protected void submitUserInformationThreads(List<Long> ids, List<String> names) {
 
+    /*
+    while( idsIndex < ids.size() ) {
+      from = idsIndex
+      to = Math.min( idsIndex + 100, ids.size() - idsIndex
+    }
+     */
     int idsIndex = 0;
     while( idsIndex + 100 < ids.size() ) {
       List<Long> batchIds = ids.subList(idsIndex, idsIndex + 100);
@@ -268,7 +297,8 @@ public class TwitterUserInformationProvider implements Callable<Iterator<User>>,
           this,
           client,
           new UsersLookupRequest().withUserId(batchIds));
-      ListenableFuture future = executor.submit(providerTask);
+      tasks.add(providerTask);
+      Future future = executor.submit((Callable)providerTask);
       futures.add(future);
       LOGGER.info("Thread Submitted: {}", providerTask.request);
       idsIndex += 100;
@@ -279,7 +309,8 @@ public class TwitterUserInformationProvider implements Callable<Iterator<User>>,
           this,
           client,
           new UsersLookupRequest().withUserId(batchIds));
-      ListenableFuture future = executor.submit(providerTask);
+      tasks.add(providerTask);
+      Future future = executor.submit((Callable)providerTask);
       futures.add(future);
       LOGGER.info("Thread Submitted: {}", providerTask.request);
     }
@@ -291,7 +322,8 @@ public class TwitterUserInformationProvider implements Callable<Iterator<User>>,
           this,
           client,
           new UsersLookupRequest().withScreenName(batchNames));
-      ListenableFuture future = executor.submit(providerTask);
+      tasks.add(providerTask);
+      Future future = executor.submit((Callable)providerTask);
       futures.add(future);
       LOGGER.info("Thread Submitted: {}", providerTask.request);
       namesIndex += 100;
@@ -302,7 +334,8 @@ public class TwitterUserInformationProvider implements Callable<Iterator<User>>,
           this,
           client,
           new UsersLookupRequest().withScreenName(batchNames));
-      ListenableFuture future = executor.submit(providerTask);
+      tasks.add(providerTask);
+      Future future = executor.submit((Callable)providerTask);
       futures.add(future);
       LOGGER.info("Thread Submitted: {}", providerTask.request);
     }
@@ -310,27 +343,16 @@ public class TwitterUserInformationProvider implements Callable<Iterator<User>>,
   }
 
   @Override
-  public void startStream() {
-
-    Objects.requireNonNull(executor);
-
-    LOGGER.info("startStream: {} Threads", futures.size());
-
-    running.set(true);
-
-    executor.shutdown();
-  }
-
-  @Override
   public StreamsResultSet readCurrent() {
 
     StreamsResultSet result;
 
+    LOGGER.debug("Providing {} docs", providerQueue.size());
+
     try {
       lock.writeLock().lock();
       result = new StreamsResultSet(providerQueue);
       providerQueue = QueueUtils.constructQueue();
-      LOGGER.debug("readCurrent: {} Documents", result.size());
     } finally {
       lock.writeLock().unlock();
     }
@@ -347,27 +369,38 @@ public class TwitterUserInformationProvider implements Callable<Iterator<User>>,
   @Override
   public StreamsResultSet readRange(DateTime start, DateTime end) {
     LOGGER.debug("{} readRange", STREAMS_ID);
-    this.start = start;
-    this.end = end;
-    readCurrent();
-    return (StreamsResultSet)providerQueue.iterator();
+    throw new NotImplementedException();
   }
 
+  protected Twitter getTwitterClient() throws InstantiationException {
+    return Twitter.getInstance(config);
+  }
 
   @Override
   public boolean isRunning() {
-    if ( providerQueue.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone() ) {
-      LOGGER.info("All Threads Completed");
+    LOGGER.debug("providerQueue.isEmpty: {}", providerQueue.isEmpty());
+    LOGGER.debug("providerQueue.size: {}", providerQueue.size());
+    LOGGER.debug("executor.isTerminated: {}", executor.isTerminated());
+    LOGGER.debug("tasks.size(): {}", tasks.size());
+    LOGGER.debug("futures.size(): {}", futures.size());
+    boolean allTasksComplete;
+    if( futures.size() > 0) {
+      allTasksComplete = true;
+      for(Future<?> future : futures){
+        allTasksComplete |= !future.isDone(); // check if future is done
+      }
+    } else {
+      allTasksComplete = false;
+    }
+    LOGGER.debug("allTasksComplete: {}", allTasksComplete);
+    boolean finished = allTasksComplete && tasks.size() > 0 && tasks.size() == futures.size() && executor.isShutdown() && executor.isTerminated();
+    LOGGER.debug("finished: {}", finished);
+    if ( finished ) {
       running.set(false);
-      LOGGER.info("Exiting");
     }
     return running.get();
   }
 
-  protected Twitter getTwitterClient() throws InstantiationException {
-    return Twitter.getInstance(config);
-  }
-
   @Override
   public void cleanUp() {
     ExecutorUtils.shutdownAndAwaitTermination(executor);
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProviderTask.java
index 418ba83..40ebfcf 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProviderTask.java
@@ -23,6 +23,7 @@ import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.twitter.api.Twitter;
 import org.apache.streams.twitter.api.UsersLookupRequest;
 import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
+import org.apache.streams.twitter.pojo.Tweet;
 import org.apache.streams.twitter.pojo.User;
 import org.apache.streams.util.ComponentUtils;
 
@@ -31,14 +32,17 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.Callable;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 /**
  *  Retrieve recent posts for a single user id.
  */
-public class TwitterUserInformationProviderTask implements Runnable {
+public class TwitterUserInformationProviderTask implements Callable<Iterator<User>>, Runnable {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(TwitterUserInformationProviderTask.class);
 
@@ -47,6 +51,7 @@ public class TwitterUserInformationProviderTask implements Runnable {
   protected TwitterUserInformationProvider provider;
   protected Twitter client;
   protected UsersLookupRequest request;
+  protected List<User> responseList;
 
   /**
    * TwitterTimelineProviderTask constructor.
@@ -60,19 +65,37 @@ public class TwitterUserInformationProviderTask implements Runnable {
     this.request = request;
   }
 
+  int item_count = 0;
+
   @Override
   public void run() {
 
     LOGGER.info("Thread Starting: {}", request.toString());
 
+    responseList = new ArrayList<>();
+
     List<User> users = client.lookup(request);
 
-    for (User user : users) {
-      ComponentUtils.offerUntilSuccess(new StreamsDatum(user), provider.providerQueue);
+    responseList.addAll(users);
+
+    int item_count = 0;
+
+    if( users.size() > 0 ) {
+      for (User user : users) {
+        ComponentUtils.offerUntilSuccess(new StreamsDatum(user), provider.providerQueue);
+        LOGGER.debug("User: {}", user.getIdStr());
+      }
     }
 
     LOGGER.info("Thread Finished: {}", request.toString());
 
+    LOGGER.info("item_count: {} ", item_count);
+
   }
 
+  @Override
+  public Iterator<User> call() throws Exception {
+    run();
+    return responseList.iterator();
+  }
 }
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/pom.xml b/streams-examples/streams-examples-flink/flink-twitter-collection/pom.xml
index f39ae0a..1e2de10 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/pom.xml
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/pom.xml
@@ -34,7 +34,7 @@
     <properties>
         <testng.version>6.9.10</testng.version>
         <hdfs.version>2.7.0</hdfs.version>
-        <flink.version>1.4.2</flink.version>
+        <flink.version>1.11.1</flink.version>
         <scala.version>2.11.12</scala.version>
         <scala.binary.version>2.11</scala.binary.version>
         <scalapc.version>1.1.0</scalapc.version>
@@ -127,6 +127,18 @@
             </exclusions>
         </dependency>
         <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs-httpfs</artifactId>
+            <type>war</type>
+            <version>${hdfs.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>commons-logging</groupId>
+                    <artifactId>commons-logging</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-java</artifactId>
             <version>${flink.version}</version>
@@ -433,15 +445,9 @@
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-failsafe-plugin</artifactId>
                 <configuration>
-                    <!-- Run integration test suite rather than individual tests. -->
-                    <excludes>
-                        <exclude>**/*Test.java</exclude>
-                        <exclude>**/*Tests.java</exclude>
-                    </excludes>
-                    <includes>
-                        <include>**/*IT.java</include>
-                        <include>**/*ITs.java</include>
-                    </includes>
+                    <suiteXmlFiles>
+                        <suiteXmlFile>target/test-classes/testng.xml</suiteXmlFile>
+                    </suiteXmlFiles>
                 </configuration>
                 <dependencies>
                     <dependency>
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
index 1709d4c..3acca05 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
@@ -66,7 +66,7 @@ trait FlinkBase {
     if(StringUtils.isNotEmpty(configUrl)) {
       BASELOGGER.info("StreamsConfigurator.resolveConfig(configUrl): {}", StreamsConfigurator.resolveConfig(configUrl))
       try {
-        typesafe = StreamsConfigurator.resolveConfig(configUrl).withFallback(StreamsConfigurator.getConfig).resolve()
+        typesafe = StreamsConfigurator.resolveConfig(configUrl)
       } catch {
         case mue: MalformedURLException => {
           BASELOGGER.error("Invalid Configuration URL: ", mue)
@@ -77,6 +77,7 @@ trait FlinkBase {
           return false
         }
       }
+      StreamsConfigurator.addConfig(typesafe)
     }
     else {
       typesafe = StreamsConfigurator.getConfig
@@ -177,9 +178,10 @@ trait FlinkBase {
     }
     else if (configObject.getScheme.toString.equals("s3")) {
       inPathBuilder = configObject.getScheme + "://" + configObject.getPath + "/" + configObject.getReaderPath
-    } else {
-      throw new Exception("scheme not recognized: " + configObject.getScheme)
     }
+//    else {
+//      throw new Exception("scheme not recognized: " + configObject.getScheme)
+//    }
     inPathBuilder
   }
 
@@ -193,9 +195,10 @@ trait FlinkBase {
     }
     else if( configObject.getScheme.toString.equals("s3")) {
       outPathBuilder = configObject.getScheme + "://" + configObject.getPath + "/" + configObject.getWriterPath
-    } else {
-      throw new Exception("output scheme not recognized: " + configObject.getScheme)
     }
+//    else {
+//      throw new Exception("output scheme not recognized: " + configObject.getScheme)
+//    }
     outPathBuilder
   }
 
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
index bbf83f8..bae988c 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
@@ -19,32 +19,28 @@
 package org.apache.streams.examples.flink.twitter.collection
 
 import java.util.Objects
-import java.util.concurrent.TimeUnit
 
 import com.fasterxml.jackson.databind.ObjectMapper
-import com.google.common.util.concurrent.Uninterruptibles
 import org.apache.commons.lang3.StringUtils
-import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.api.common.JobExecutionResult
+import org.apache.flink.api.common.serialization.SimpleStringEncoder
 import org.apache.flink.api.scala._
 import org.apache.flink.core.fs.FileSystem
+import org.apache.flink.core.fs.Path
 import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}
-import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink
-import org.apache.flink.util.Collector
-import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
+import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+import org.apache.flink.streaming.api.scala.DataStream
+import org.apache.flink.streaming.api.scala.KeyedStream
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.streams.config.StreamsConfigurator
 import org.apache.streams.core.StreamsDatum
 import org.apache.streams.examples.flink.FlinkBase
 import org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration
-import org.apache.streams.flink.{FlinkStreamingConfiguration, StreamsFlinkConfiguration}
-import org.apache.streams.hdfs.{HdfsReaderConfiguration, HdfsWriterConfiguration}
 import org.apache.streams.jackson.StreamsJacksonMapper
-import org.apache.streams.twitter.config.TwitterFollowingConfiguration
 import org.apache.streams.twitter.pojo.Follow
-import org.apache.streams.twitter.provider.TwitterFollowingProvider
 import org.hamcrest.MatcherAssert
-import org.slf4j.{Logger, LoggerFactory}
-
-import scala.collection.JavaConversions._
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
 
 /**
   * FlinkTwitterFollowingPipeline collects friends or followers of all profiles from a
@@ -58,8 +54,12 @@ object FlinkTwitterFollowingPipeline extends FlinkBase {
   private val MAPPER: ObjectMapper = StreamsJacksonMapper.getInstance()
 
   override def main(args: Array[String]) = {
-    super.main(args)
-    val jobConfig = new ComponentConfigurator[TwitterFollowingPipelineConfiguration](classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(typesafe)
+    if( args.length > 0 ) {
+      LOGGER.info("Args: {}", args)
+      configUrl = args(0)
+    }
+    if( !setup(configUrl) ) System.exit(1)
+    val jobConfig = new StreamsConfigurator(classOf[TwitterFollowingPipelineConfiguration]).detectCustomConfiguration()
     if( !setup(jobConfig) ) System.exit(1)
     val pipeline: FlinkTwitterFollowingPipeline = new FlinkTwitterFollowingPipeline(jobConfig)
     val thread = new Thread(pipeline)
@@ -119,6 +119,7 @@ class FlinkTwitterFollowingPipeline(config: TwitterFollowingPipelineConfiguratio
 
     val env: StreamExecutionEnvironment = streamEnvironment(config)
 
+    env.setParallelism(streamsConfig.getParallelism().toInt)
     env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
     env.setNumberOfExecutionRetries(0)
 
@@ -126,54 +127,54 @@ class FlinkTwitterFollowingPipeline(config: TwitterFollowingPipelineConfiguratio
 
     val outPath = buildWriterPath(config.getDestination)
 
-    val keyed_ids: KeyedStream[String, Int] = env.readTextFile(inPath).setParallelism(10).keyBy( id => (id.hashCode % 100).abs )
+    val ids: DataStream[String] = env.readTextFile(inPath)
+
+    val keyed_ids: KeyedStream[String, Int] = ids.
+      name("keyed_ids").
+      setParallelism(streamsConfig.getParallelism().toInt).
+      keyBy( id => (id.hashCode % streamsConfig.getParallelism().toInt).abs )
 
     // these datums contain 'Follow' objects
-    val followDatums: DataStream[StreamsDatum] =
-      keyed_ids.flatMap(new FollowingCollectorFlatMapFunction(config.getTwitter)).setParallelism(10)
+    val followDatums: DataStream[StreamsDatum] = keyed_ids.
+      flatMap(new FollowingCollectorFlatMapFunction(streamsConfig, config.getTwitter, streamsFlinkConfiguration)).
+      name("followDatums").
+      setParallelism(streamsConfig.getParallelism().toInt)
 
-    val follows: DataStream[Follow] = followDatums
+    val follows: DataStream[Follow] = followDatums.
+      name("follows")
       .map(datum => datum.getDocument.asInstanceOf[Follow])
 
-    val jsons: DataStream[String] = follows
+    val jsons: DataStream[String] = follows.
+      name("jsons")
       .map(follow => {
         val MAPPER = StreamsJacksonMapper.getInstance
         MAPPER.writeValueAsString(follow)
-      })
-
-    if( config.getTest == false )
-      jsons.addSink(new BucketingSink[String](outPath)).setParallelism(3)
-    else
-      jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
-        .setParallelism(env.getParallelism)
+      }).
+      setParallelism(streamsConfig.getParallelism().toInt)
+
+    val keyed_jsons: KeyedStream[String, Int] = jsons.
+      name("keyed_jsons").
+      setParallelism(streamsConfig.getParallelism().toInt).
+      keyBy( id => (id.hashCode % streamsConfig.getParallelism().toInt).abs )
+
+    val fileSink : StreamingFileSink[String] = StreamingFileSink.
+      forRowFormat(new Path(outPath), new SimpleStringEncoder[String]("UTF-8")).
+      build()
+
+    if( config.getTest == true ) {
+      keyed_jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
+    } else {
+      keyed_jsons.name("fileSink").addSink(fileSink)
+    }
 
-    // if( test == true ) jsons.print();
+    val result: JobExecutionResult = env.execute("FlinkTwitterFollowingPipeline")
 
-    env.execute(STREAMS_ID)
-  }
+    LOGGER.info("JobExecutionResult: {}", result.getJobExecutionResult)
 
-  class FollowingCollectorFlatMapFunction(
-                                           twitterConfiguration : TwitterFollowingConfiguration = new ComponentConfigurator(classOf[TwitterFollowingConfiguration]).detectConfiguration(),
-                                           flinkConfiguration : StreamsFlinkConfiguration = new ComponentConfigurator(classOf[StreamsFlinkConfiguration]).detectConfiguration()
-                                         ) extends RichFlatMapFunction[String, StreamsDatum] with Serializable {
+    LOGGER.info("JobExecutionResult.getNetRuntime: {}", result.getNetRuntime())
 
-    override def flatMap(input: String, out: Collector[StreamsDatum]): Unit = {
-      collectConnections(input, out)
-    }
+    LOGGER.info("JobExecutionResult.getAllAccumulatorResults: {}", MAPPER.writeValueAsString(result.getAllAccumulatorResults()))
 
-    def collectConnections(id : String, out : Collector[StreamsDatum]) = {
-      val twitProvider: TwitterFollowingProvider =
-        new TwitterFollowingProvider(
-          twitterConfiguration.withInfo(List(toProviderId(id))).asInstanceOf[TwitterFollowingConfiguration]
-        )
-      twitProvider.prepare(twitProvider)
-      twitProvider.startStream()
-      var iterator: Iterator[StreamsDatum] = null
-      do {
-        Uninterruptibles.sleepUninterruptibly(flinkConfiguration.getProviderWaitMs, TimeUnit.MILLISECONDS)
-        twitProvider.readCurrent().iterator().toList.map(out.collect(_))
-      } while( twitProvider.isRunning )
-    }
   }
 
 }
\ No newline at end of file
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
index 28d2576..14eebc5 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
@@ -24,24 +24,31 @@ import java.util.concurrent.TimeUnit
 import com.fasterxml.jackson.databind.ObjectMapper
 import com.google.common.util.concurrent.Uninterruptibles
 import org.apache.commons.lang3.StringUtils
+import org.apache.flink.api.common.JobExecutionResult
 import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.api.common.serialization.SimpleStringEncoder
 import org.apache.flink.api.scala._
 import org.apache.flink.core.fs.FileSystem
+import org.apache.flink.core.fs.Path
 import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}
-import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink
+import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+import org.apache.flink.streaming.api.scala.DataStream
+import org.apache.flink.streaming.api.scala.KeyedStream
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.util.Collector
-import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
+import org.apache.streams.config.ComponentConfigurator
+import org.apache.streams.config.StreamsConfigurator
 import org.apache.streams.core.StreamsDatum
 import org.apache.streams.examples.flink.FlinkBase
 import org.apache.streams.examples.flink.twitter.TwitterPostsPipelineConfiguration
-import org.apache.streams.flink.FlinkStreamingConfiguration
-import org.apache.streams.hdfs.{HdfsReaderConfiguration, HdfsWriterConfiguration}
+import org.apache.streams.hdfs.HdfsReaderConfiguration
+import org.apache.streams.hdfs.HdfsWriterConfiguration
 import org.apache.streams.jackson.StreamsJacksonMapper
 import org.apache.streams.twitter.pojo.Tweet
 import org.apache.streams.twitter.provider.TwitterTimelineProvider
 import org.hamcrest.MatcherAssert
-import org.slf4j.{Logger, LoggerFactory}
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
 
 import scala.collection.JavaConversions._
 
@@ -57,8 +64,12 @@ object FlinkTwitterPostsPipeline extends FlinkBase {
   private val MAPPER: ObjectMapper = StreamsJacksonMapper.getInstance()
 
   override def main(args: Array[String]) = {
-    super.main(args)
-    val jobConfig = new ComponentConfigurator[TwitterPostsPipelineConfiguration](classOf[TwitterPostsPipelineConfiguration]).detectConfiguration(typesafe)
+    if( args.length > 0 ) {
+      LOGGER.info("Args: {}", args)
+      configUrl = args(0)
+    }
+    if( !setup(configUrl) ) System.exit(1)
+    val jobConfig = new StreamsConfigurator(classOf[TwitterPostsPipelineConfiguration]).detectCustomConfiguration()
     if( !setup(jobConfig) ) System.exit(1)
     val pipeline: FlinkTwitterPostsPipeline = new FlinkTwitterPostsPipeline(jobConfig)
     val thread = new Thread(pipeline)
@@ -118,6 +129,7 @@ class FlinkTwitterPostsPipeline(config: TwitterPostsPipelineConfiguration = new
 
     val env: StreamExecutionEnvironment = streamEnvironment(config)
 
+    env.setParallelism(streamsConfig.getParallelism().toInt)
     env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
     env.setNumberOfExecutionRetries(0)
 
@@ -125,13 +137,15 @@ class FlinkTwitterPostsPipeline(config: TwitterPostsPipelineConfiguration = new
 
     val outPath = buildWriterPath(new ComponentConfigurator(classOf[HdfsWriterConfiguration]).detectConfiguration())
 
-    val ids: DataStream[String] = env.readTextFile(inPath).setParallelism(10).name("ids")
+    val ids: DataStream[String] = env.readTextFile(inPath).name("ids")
 
-    val keyed_ids: KeyedStream[String, Int] = env.readTextFile(inPath).setParallelism(10).name("keyed_ids").keyBy( id => (id.hashCode % 100).abs )
+    val keyed_ids: KeyedStream[String, Int] = ids.name("keyed_ids").
+      setParallelism(streamsConfig.getParallelism().toInt).
+      keyBy( id => (id.hashCode % streamsConfig.getParallelism().toInt).abs )
 
     // these datums contain 'Tweet' objects
     val tweetDatums: DataStream[StreamsDatum] =
-      keyed_ids.flatMap(new postCollectorFlatMapFunction).setParallelism(10).name("tweetDatums")
+      keyed_ids.flatMap(new postCollectorFlatMapFunction).setParallelism(env.getParallelism).name("tweetDatums")
 
     val tweets: DataStream[Tweet] = tweetDatums
       .map(datum => datum.getDocument.asInstanceOf[Tweet]).name("tweets")
@@ -142,15 +156,28 @@ class FlinkTwitterPostsPipeline(config: TwitterPostsPipelineConfiguration = new
         MAPPER.writeValueAsString(tweet)
       }).name("json")
 
-    if( config.getTest == false )
-      jsons.addSink(new BucketingSink[String](outPath)).setParallelism(3).name("hdfs")
-    else
-      jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
-        .setParallelism(env.getParallelism)
+    val keyed_jsons: KeyedStream[String, Int] = jsons.
+      setParallelism(streamsConfig.getParallelism().toInt).
+      keyBy( id => (id.hashCode % streamsConfig.getParallelism().toInt).abs )
+
+    val fileSink : StreamingFileSink[String] = StreamingFileSink.
+      forRowFormat(new Path(outPath), new SimpleStringEncoder[String]("UTF-8")).
+      build()
+
+    if( config.getTest == true ) {
+      keyed_jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
+    } else {
+      keyed_jsons.addSink(fileSink)
+    }
+
+    val result: JobExecutionResult = env.execute("FlinkTwitterPostsPipeline")
+
+    LOGGER.info("JobExecutionResult: {}", result.getJobExecutionResult)
+
+    LOGGER.info("JobExecutionResult.getNetRuntime: {}", result.getNetRuntime())
 
-    // if( test == true ) jsons.print();
+    LOGGER.info("JobExecutionResult.getAllAccumulatorResults: {}", MAPPER.writeValueAsString(result.getAllAccumulatorResults()))
 
-    env.execute(STREAMS_ID)
   }
 
   class postCollectorFlatMapFunction extends RichFlatMapFunction[String, StreamsDatum] with Serializable {
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
index 79c95d6..6e38591 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
@@ -25,26 +25,32 @@ import java.util.concurrent.TimeUnit
 import com.fasterxml.jackson.databind.ObjectMapper
 import com.google.common.util.concurrent.Uninterruptibles
 import org.apache.commons.lang3.StringUtils
-import org.apache.flink.api.common.functions.StoppableFunction
+import org.apache.flink.api.common.JobExecutionResult
+import org.apache.flink.api.common.serialization.SimpleStringEncoder
 import org.apache.flink.api.scala._
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.core.fs.FileSystem
+import org.apache.flink.core.fs.Path
 import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
-import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
-import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink
-import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
+import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.streaming.api.scala.KeyedStream
+import org.apache.flink.streaming.api.scala.DataStream
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.streams.config.ComponentConfigurator
+import org.apache.streams.config.StreamsConfigurator
 import org.apache.streams.core.StreamsDatum
 import org.apache.streams.examples.flink.FlinkBase
 import org.apache.streams.examples.flink.twitter.TwitterSpritzerPipelineConfiguration
-import org.apache.streams.flink.FlinkStreamingConfiguration
 import org.apache.streams.hdfs.HdfsWriterConfiguration
 import org.apache.streams.jackson.StreamsJacksonMapper
 import org.apache.streams.twitter.config.TwitterStreamConfiguration
 import org.apache.streams.twitter.converter.TwitterDateTimeFormat
 import org.apache.streams.twitter.provider.TwitterStreamProvider
 import org.hamcrest.MatcherAssert
-import org.slf4j.{Logger, LoggerFactory}
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
 
 import scala.collection.JavaConversions._
 
@@ -122,25 +128,37 @@ class FlinkTwitterSpritzerPipeline(config: TwitterSpritzerPipelineConfiguration
 
     val outPath = buildWriterPath(new ComponentConfigurator(classOf[HdfsWriterConfiguration]).detectConfiguration())
 
-    val streamSource : DataStream[String] = env.addSource(spritzerSource)
+    val jsons : DataStream[String] = env.addSource(spritzerSource)
 
-    if( config.getTest == false )
-      streamSource.addSink(new BucketingSink[String](outPath)).setParallelism(3).name("hdfs")
-    else
-      streamSource.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
-        .setParallelism(env.getParallelism)
+    val keyed_jsons: KeyedStream[String, Int] = jsons.
+      setParallelism(streamsConfig.getParallelism().toInt).
+      keyBy( id => (id.hashCode % streamsConfig.getParallelism().toInt).abs )
 
-    // if( test == true ) jsons.print();
+    val fileSink : StreamingFileSink[String] = StreamingFileSink.
+      forRowFormat(new Path(outPath), new SimpleStringEncoder[String]("UTF-8")).
+      build()
 
-    env.execute(STREAMS_ID)
+    if( config.getTest == true ) {
+      keyed_jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
+    } else {
+      keyed_jsons.addSink(fileSink)
+    }
+
+    val result: JobExecutionResult = env.execute("FlinkTwitterPostsPipeline")
+
+    LOGGER.info("JobExecutionResult: {}", result.getJobExecutionResult)
+
+    LOGGER.info("JobExecutionResult.getNetRuntime: {}", result.getNetRuntime())
+
+    LOGGER.info("JobExecutionResult.getAllAccumulatorResults: {}", MAPPER.writeValueAsString(result.getAllAccumulatorResults()))
 
   }
 
   def stop(): Unit = {
-    spritzerSource.stop()
+    spritzerSource.cancel()
   }
 
-  class SpritzerSource(sourceConfig: TwitterStreamConfiguration) extends RichSourceFunction[String] with Serializable with StoppableFunction {
+  class SpritzerSource(sourceConfig: TwitterStreamConfiguration) extends RichSourceFunction[String] with Serializable /*with StoppableFunction*/ {
 
     var mapper: ObjectMapper = _
 
@@ -167,9 +185,9 @@ class FlinkTwitterSpritzerPipeline(config: TwitterSpritzerPipelineConfiguration
       close()
     }
 
-    override def stop(): Unit = {
-      close()
-    }
+//    override def stop(): Unit = {
+//      close()
+//    }
   }
 
 
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
index 977fdf5..32cf232 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
@@ -24,26 +24,34 @@ import java.util.concurrent.TimeUnit
 import com.fasterxml.jackson.databind.ObjectMapper
 import com.google.common.util.concurrent.Uninterruptibles
 import org.apache.commons.lang3.StringUtils
+import org.apache.flink.api.common.JobExecutionResult
 import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.api.common.serialization.SimpleStringEncoder
 import org.apache.flink.api.scala._
 import org.apache.flink.core.fs.FileSystem
+import org.apache.flink.core.fs.Path
 import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+import org.apache.flink.streaming.api.scala.DataStream
+import org.apache.flink.streaming.api.scala.KeyedStream
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.scala.WindowedStream
 import org.apache.flink.streaming.api.scala.function.WindowFunction
-import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
-import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink
 import org.apache.flink.util.Collector
-import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
+import org.apache.streams.config.ComponentConfigurator
+import org.apache.streams.config.StreamsConfigurator
 import org.apache.streams.core.StreamsDatum
 import org.apache.streams.examples.flink.FlinkBase
 import org.apache.streams.examples.flink.twitter.TwitterUserInformationPipelineConfiguration
-import org.apache.streams.flink.FlinkStreamingConfiguration
-import org.apache.streams.hdfs.{HdfsReaderConfiguration, HdfsWriterConfiguration}
+import org.apache.streams.hdfs.HdfsReaderConfiguration
+import org.apache.streams.hdfs.HdfsWriterConfiguration
 import org.apache.streams.jackson.StreamsJacksonMapper
 import org.apache.streams.twitter.pojo.User
 import org.apache.streams.twitter.provider.TwitterUserInformationProvider
 import org.hamcrest.MatcherAssert
-import org.slf4j.{Logger, LoggerFactory}
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
 
 import scala.collection.JavaConversions._
 
@@ -59,8 +67,12 @@ object FlinkTwitterUserInformationPipeline extends FlinkBase {
   private val MAPPER: ObjectMapper = StreamsJacksonMapper.getInstance()
 
   override def main(args: Array[String]) = {
-    super.main(args)
-    val jobConfig = new ComponentConfigurator[TwitterUserInformationPipelineConfiguration](classOf[TwitterUserInformationPipelineConfiguration]).detectConfiguration(typesafe)
+    if( args.length > 0 ) {
+      LOGGER.info("Args: {}", args)
+      configUrl = args(0)
+    }
+    if( !setup(configUrl) ) System.exit(1)
+    val jobConfig = new StreamsConfigurator(classOf[TwitterUserInformationPipelineConfiguration]).detectCustomConfiguration()
     if( !setup(jobConfig) ) System.exit(1)
     val pipeline: FlinkTwitterUserInformationPipeline = new FlinkTwitterUserInformationPipeline(jobConfig)
     val thread = new Thread(pipeline)
@@ -115,11 +127,13 @@ object FlinkTwitterUserInformationPipeline extends FlinkBase {
 class FlinkTwitterUserInformationPipeline(config: TwitterUserInformationPipelineConfiguration = new StreamsConfigurator[TwitterUserInformationPipelineConfiguration](classOf[TwitterUserInformationPipelineConfiguration]).detectCustomConfiguration()) extends Runnable with java.io.Serializable {
 
   import FlinkTwitterUserInformationPipeline._
+  //import FlinkBase.streamsConfig
 
   override def run(): Unit = {
 
     val env: StreamExecutionEnvironment = streamEnvironment(config)
 
+    env.setParallelism(streamsConfig.getParallelism().toInt)
     env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
     env.setNumberOfExecutionRetries(0)
 
@@ -127,7 +141,7 @@ class FlinkTwitterUserInformationPipeline(config: TwitterUserInformationPipeline
 
     val outPath = buildWriterPath(new ComponentConfigurator(classOf[HdfsWriterConfiguration]).detectConfiguration())
 
-    val ids: DataStream[String] = env.readTextFile(inPath).setParallelism(10).name("ids")
+    val ids: DataStream[String] = env.readTextFile(inPath).setParallelism(env.getParallelism).name("ids")
 
     val keyed_ids: KeyedStream[String, Int] = ids.name("keyed_ids").keyBy( id => (id.hashCode % 100).abs )
 
@@ -135,7 +149,7 @@ class FlinkTwitterUserInformationPipeline(config: TwitterUserInformationPipeline
 
     val idLists: DataStream[List[String]] = idWindows.apply[List[String]] (new idListWindowFunction()).name("idLists")
 
-    val userDatums: DataStream[StreamsDatum] = idLists.flatMap(new profileCollectorFlatMapFunction).setParallelism(10).name("userDatums")
+    val userDatums: DataStream[StreamsDatum] = idLists.flatMap(new profileCollectorFlatMapFunction).setParallelism(env.getParallelism).name("userDatums")
 
     val user: DataStream[User] = userDatums.map(datum => datum.getDocument.asInstanceOf[User]).name("users")
 
@@ -145,15 +159,29 @@ class FlinkTwitterUserInformationPipeline(config: TwitterUserInformationPipeline
         MAPPER.writeValueAsString(user)
       }).name("jsons")
 
-    if( config.getTest == false )
-      jsons.addSink(new BucketingSink[String](outPath)).setParallelism(3).name("hdfs")
-    else
-      jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
-        .setParallelism(env.getParallelism)
+    val keyed_jsons: KeyedStream[String, Int] = jsons.
+      setParallelism(streamsConfig.getParallelism().toInt).
+      keyBy( id => (id.hashCode % streamsConfig.getParallelism().toInt).abs )
+
+    val fileSink : StreamingFileSink[String] = StreamingFileSink.
+      forRowFormat(new Path(outPath), new SimpleStringEncoder[String]("UTF-8")).
+      build()
+
+    if( config.getTest == true ) {
+      keyed_jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
+    } else {
+      keyed_jsons.addSink(fileSink)
+    }
+
+    val result: JobExecutionResult = env.execute("FlinkTwitterUserInformationPipeline")
+
+    LOGGER.info("JobExecutionResult: {}", result.getJobExecutionResult)
+
+    LOGGER.info("JobExecutionResult.getNetRuntime: {}", result.getNetRuntime())
+
+    LOGGER.info("JobExecutionResult.getAllAccumulatorResults: {}", MAPPER.writeValueAsString(result.getAllAccumulatorResults()))
 
-    LOGGER.info("StreamExecutionEnvironment: {}", env.toString )
 
-    env.execute(STREAMS_ID)
   }
 
   class idListWindowFunction extends WindowFunction[String, List[String], Int, GlobalWindow] {
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FollowingCollectorFlatMapFunction.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FollowingCollectorFlatMapFunction.scala
new file mode 100644
index 0000000..1223047
--- /dev/null
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FollowingCollectorFlatMapFunction.scala
@@ -0,0 +1,52 @@
+package org.apache.streams.examples.flink.twitter.collection
+
+import java.util.concurrent.TimeUnit
+
+import com.google.common.util.concurrent.Uninterruptibles
+import org.apache.flink.api.common.accumulators.IntCounter
+import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.Collector
+import org.apache.streams.config.ComponentConfigurator
+import org.apache.streams.config.StreamsConfiguration
+import org.apache.streams.core.StreamsDatum
+import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterFollowingPipeline.toProviderId
+import org.apache.streams.flink.StreamsFlinkConfiguration
+import org.apache.streams.twitter.config.TwitterFollowingConfiguration
+import org.apache.streams.twitter.provider.TwitterFollowingProvider
+
+import scala.collection.JavaConversions._
+
+class FollowingCollectorFlatMapFunction(
+                                         streamsConfiguration : StreamsConfiguration,
+                                         twitterConfiguration : TwitterFollowingConfiguration = new ComponentConfigurator(classOf[TwitterFollowingConfiguration]).detectConfiguration(),
+                                         flinkConfiguration : StreamsFlinkConfiguration = new ComponentConfigurator(classOf[StreamsFlinkConfiguration]).detectConfiguration()
+                                       ) extends RichFlatMapFunction[String, StreamsDatum] with Serializable {
+
+  var size : IntCounter = new IntCounter()
+  var counter : IntCounter = new IntCounter()
+
+  override def open(parameters: Configuration): Unit = {
+    getRuntimeContext().addAccumulator("FlinkTwitterFollowingPipeline.size", this.size)
+    getRuntimeContext().addAccumulator("FlinkTwitterFollowingPipeline.counter", this.counter)
+  }
+
+  override def flatMap(input: String, out: Collector[StreamsDatum]): Unit = {
+    size.add(input.size)
+    collectConnections(input, out)
+  }
+
+  def collectConnections(id : String, out : Collector[StreamsDatum]) = {
+    val conf = twitterConfiguration.withInfo(List(toProviderId(id))).asInstanceOf[TwitterFollowingConfiguration]
+    val twitProvider: TwitterFollowingProvider = new TwitterFollowingProvider(conf)
+    twitProvider.prepare(twitProvider)
+    twitProvider.startStream()
+    do {
+      Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getProviderWaitMs, TimeUnit.MILLISECONDS)
+      val current = twitProvider.readCurrent().iterator().toList
+      counter.add(current.size)
+      current.map(out.collect(_))
+    } while( twitProvider.isRunning )
+  }
+
+}
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/TimelineCollectorFlatMapFunction.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/TimelineCollectorFlatMapFunction.scala
new file mode 100644
index 0000000..8f21145
--- /dev/null
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/TimelineCollectorFlatMapFunction.scala
@@ -0,0 +1,52 @@
+package org.apache.streams.examples.flink.twitter.collection
+
+import java.util.concurrent.TimeUnit
+
+import com.google.common.util.concurrent.Uninterruptibles
+import org.apache.flink.api.common.accumulators.IntCounter
+import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.Collector
+import org.apache.streams.config.StreamsConfiguration
+import org.apache.streams.core.StreamsDatum
+import org.apache.streams.flink.StreamsFlinkConfiguration
+import org.apache.streams.twitter.config.TwitterTimelineProviderConfiguration
+import org.apache.streams.twitter.provider.TwitterTimelineProvider
+
+import scala.collection.JavaConversions._
+
+/**
+  * Created by sblackmon on 6/2/16.
+  */
+class TimelineCollectorFlatMapFunction(
+                                        streamsConfiguration : StreamsConfiguration,
+                                        twitterConfiguration : TwitterTimelineProviderConfiguration,
+                                        streamsFlinkConfiguration : StreamsFlinkConfiguration
+                                      ) extends RichFlatMapFunction[List[String], StreamsDatum] with Serializable {
+  var size : IntCounter = new IntCounter()
+  var counter : IntCounter = new IntCounter()
+  override def open(parameters: Configuration): Unit = {
+    getRuntimeContext().addAccumulator("TimelineCollectorFlatMapFunction.size", this.size)
+    getRuntimeContext().addAccumulator("TimelineCollectorFlatMapFunction.counter", this.counter)
+  }
+  override def flatMap(input: List[String], out: Collector[StreamsDatum]): Unit = {
+    size.add(input.size)
+    collectPosts(input, out)
+  }
+  def collectPosts(ids : List[String], out : Collector[StreamsDatum]) = {
+    try {
+      val conf = twitterConfiguration.withInfo(ids).asInstanceOf[TwitterTimelineProviderConfiguration]
+      val twitProvider: TwitterTimelineProvider = new TwitterTimelineProvider(conf)
+      twitProvider.prepare(conf)
+      twitProvider.startStream()
+      do {
+        Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getProviderWaitMs, TimeUnit.MILLISECONDS)
+        val current = twitProvider.readCurrent().iterator().toList
+        counter.add(current.size)
+        current.map(out.collect(_))
+      } while( twitProvider.isRunning )
+    } finally {
+
+    }
+  }
+}
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/UserInformationCollectorFlatMapFunction.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/UserInformationCollectorFlatMapFunction.scala
new file mode 100644
index 0000000..46e0d4a
--- /dev/null
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/UserInformationCollectorFlatMapFunction.scala
@@ -0,0 +1,48 @@
+package org.apache.streams.examples.flink.twitter.collection
+
+import java.util.concurrent.TimeUnit
+
+import com.google.common.util.concurrent.Uninterruptibles
+import org.apache.flink.api.common.accumulators.IntCounter
+import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.Collector
+import org.apache.streams.config.StreamsConfiguration
+import org.apache.streams.core.StreamsDatum
+import org.apache.streams.flink.StreamsFlinkConfiguration
+import org.apache.streams.twitter.config.TwitterUserInformationConfiguration
+import org.apache.streams.twitter.provider.TwitterUserInformationProvider
+
+import scala.collection.JavaConversions._
+
+/**
+  * Created by sblackmon on 6/2/16.
+  */
+class UserInformationCollectorFlatMapFunction(
+                                               streamsConfiguration : StreamsConfiguration,
+                                               twitterConfiguration : TwitterUserInformationConfiguration,
+                                               streamsFlinkConfiguration : StreamsFlinkConfiguration
+                                             ) extends RichFlatMapFunction[List[String], StreamsDatum] with Serializable {
+  var size : IntCounter = new IntCounter()
+  var counter : IntCounter = new IntCounter()
+  override def open(parameters: Configuration): Unit = {
+    getRuntimeContext().addAccumulator("UserInformationCollectorFlatMapFunction.size", this.size)
+    getRuntimeContext().addAccumulator("UserInformationCollectorFlatMapFunction.counter", this.counter)
+  }
+  override def flatMap(input: List[String], out: Collector[StreamsDatum]): Unit = {
+    size.add(input.size)
+    collectProfiles(input, out)
+  }
+  def collectProfiles(ids : List[String], out : Collector[StreamsDatum]) = {
+    val conf = twitterConfiguration.withInfo(ids)
+    val twitProvider: TwitterUserInformationProvider = new TwitterUserInformationProvider(conf)
+    twitProvider.prepare(conf)
+    twitProvider.startStream()
+    do {
+      Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getProviderWaitMs, TimeUnit.MILLISECONDS)
+      val current = twitProvider.readCurrent().iterator().toList
+      counter.add(current.size)
+      current.map(out.collect(_))
+    } while( twitProvider.isRunning )
+  }
+}
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf
index 67f7c9c..ec4dc4e 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf
@@ -32,9 +32,10 @@ org.apache.streams.twitter.config.TwitterFollowingConfiguration {
   max_items = 5000
 }
 org.apache.streams.config.StreamsConfiguration {
+  parallelism = 1
   providerWaitMs = 1000
 }
 org.apache.streams.flink.StreamsFlinkConfiguration {
   local = true
   test = true
-}
\ No newline at end of file
+}
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf
index 724cd43..fd4b1db 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf
@@ -31,6 +31,7 @@ org.apache.streams.twitter.config.TwitterFollowingConfiguration {
   ids_only = true
 }
 org.apache.streams.config.StreamsConfiguration {
+  parallelism = 1
   providerWaitMs = 1000
 }
 org.apache.streams.flink.StreamsFlinkConfiguration {
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipelineIT.conf b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipelineIT.conf
index 626d8f6..8f5aadf 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipelineIT.conf
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipelineIT.conf
@@ -27,6 +27,7 @@ org.apache.streams.hdfs.HdfsWriterConfiguration {
   writerPath = "FlinkTwitterPostsPipelineIT"
 }
 org.apache.streams.config.StreamsConfiguration {
+  parallelism = 1
   providerWaitMs = 1000
 }
 org.apache.streams.flink.StreamsFlinkConfiguration {
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf
index ea8125a..cbdb650 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf
@@ -28,8 +28,15 @@ org.apache.streams.hdfs.HdfsWriterConfiguration {
 }
 org.apache.streams.config.StreamsConfiguration {
   providerWaitMs = 1000
+  queueSize = 10000
+  batchSize = 1000
+  identifier = "FlinkTwitterUserInformationPipelineIT"
+  parallelism = 1
+  providerTimeoutMs = 60000
+  shutdownCheckDelay = 30000
+  shutdownCheckInterval = 30000
 }
 org.apache.streams.flink.StreamsFlinkConfiguration {
   local = true
   test = true
-}
\ No newline at end of file
+}
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/testng.xml b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/testng.xml
new file mode 100644
index 0000000..bdb250d
--- /dev/null
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/testng.xml
@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE suite SYSTEM "http://testng.org/testng-1.0.dtd" >
+
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<suite name="ExampleFlinkITs" preserve-order="true">
+
+    <test name="FlinkTwitterUserInformationPipelineIT">
+        <classes>
+            <class name="org.apache.streams.examples.flink.twitter.test.FlinkTwitterUserInformationPipelineIT" />
+        </classes>
+    </test>
+
+    <test name="FlinkTwitterPostsPipelineIT">
+        <classes>
+            <class name="org.apache.streams.examples.flink.twitter.test.FlinkTwitterPostsPipelineIT" />
+        </classes>
+    </test>
+
+    <test name="FlinkTwitterFollowingPipelineFriendsIT">
+        <classes>
+            <class name="org.apache.streams.examples.flink.twitter.test.FlinkTwitterFollowingPipelineFriendsIT" />
+        </classes>
+    </test>
+
+    <test name="FlinkTwitterFollowingPipelineFollowersIT">
+        <classes>
+            <class name="org.apache.streams.examples.flink.twitter.test.FlinkTwitterFollowingPipelineFollowersIT" />
+        </classes>
+    </test>
+
+    <test name="FlinkTwitterSpritzerPipelineIT">
+        <classes>
+            <class name="org.apache.streams.examples.flink.twitter.test.FlinkTwitterSpritzerPipelineIT" />
+        </classes>
+    </test>
+
+</suite>  
\ No newline at end of file
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala
index 617815e..bfc6940 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala
@@ -36,7 +36,7 @@ import scala.io.Source
 /**
   * FlinkTwitterFollowingPipelineFollowersIT is an integration test for FlinkTwitterFollowingPipeline.
   */
-class FlinkTwitterFollowingPipelineFollowersIT extends FlatSpec {
+class FlinkTwitterFollowingPipelineFollowersIT {
 
   private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterFollowingPipelineFollowersIT])
 
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala
index 5ebea94..6d52d44 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala
@@ -16,7 +16,7 @@
  * under the License.
  */
 
-package com.peoplepattern.streams.twitter.collection
+package org.apache.streams.examples.flink.twitter.test
 
 import java.io.File
 import java.nio.file.{Files, Paths}
@@ -36,7 +36,7 @@ import scala.io.Source
 /**
   * FlinkTwitterFollowingPipelineFriendsIT is an integration test for FlinkTwitterFollowingPipeline.
   */
-class FlinkTwitterFollowingPipelineFriendsIT extends FlatSpec {
+class FlinkTwitterFollowingPipelineFriendsIT {
 
   private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterFollowingPipelineFriendsIT])
 
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala
index ed3edf3..a43c758 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala
@@ -16,7 +16,7 @@
  * under the License.
  */
 
-package com.peoplepattern.streams.twitter.collection
+package org.apache.streams.examples.flink.twitter.test
 
 import java.io.File
 import java.nio.file.{Files, Paths}
@@ -36,7 +36,7 @@ import scala.io.Source
 /**
   * FlinkTwitterPostsPipelineIT is an integration test for FlinkTwitterPostsPipeline.
   */
-class FlinkTwitterPostsPipelineIT extends FlatSpec  {
+class FlinkTwitterPostsPipelineIT {
 
   private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterPostsPipelineIT])
 
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala
index 0652bc1..48b876a 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala
@@ -16,7 +16,7 @@
  * under the License.
  */
 
-package com.peoplepattern.streams.twitter.collection
+package org.apache.streams.examples.flink.twitter.test
 
 import java.io.File
 import java.nio.file.{Files, Paths}
@@ -36,7 +36,7 @@ import scala.io.Source
 /**
   * FlinkTwitterUserInformationPipelineIT is an integration test for FlinkTwitterUserInformationPipeline.
   */
-class FlinkTwitterUserInformationPipelineIT extends FlatSpec {
+class FlinkTwitterUserInformationPipelineIT {
 
   private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterUserInformationPipelineIT])
 


[streams] 03/04: use newer flink file sink

Posted by sb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sblackmon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/streams.git

commit 132504b7f22ced3312c1cb7fc050a678a1693597
Author: sblackmon <sb...@apache.org>
AuthorDate: Sun Aug 9 06:57:02 2020 -0500

    use newer flink file sink
---
 .../apache/streams/examples/flink/FlinkBase.scala  | 10 +++++++++
 .../collection/FlinkTwitterFollowingPipeline.scala |  4 +++-
 .../collection/FlinkTwitterPostsPipeline.scala     |  3 ++-
 .../collection/FlinkTwitterSpritzerPipeline.scala  |  6 ++++--
 .../FlinkTwitterUserInformationPipeline.scala      |  4 +++-
 .../src/test/resources/testng.xml                  |  2 +-
 .../FlinkTwitterFollowingPipelineFollowersIT.scala | 14 ++++++++-----
 .../FlinkTwitterFollowingPipelineFriendsIT.scala   | 14 ++++++++-----
 .../twitter/test/FlinkTwitterPostsPipelineIT.scala | 22 ++++++++++++--------
 .../test/FlinkTwitterSpritzerPipelineIT.scala      | 24 ++++++++++++++--------
 .../FlinkTwitterUserInformationPipelineIT.scala    | 22 ++++++++++++--------
 11 files changed, 84 insertions(+), 41 deletions(-)

diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
index 3acca05..86e99ab 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
@@ -19,12 +19,15 @@
 package org.apache.streams.examples.flink
 
 import java.net.MalformedURLException
+import java.util.concurrent.TimeUnit
 
 import com.typesafe.config.Config
 import org.apache.commons.lang3.StringUtils
 import org.apache.flink.api.common.restartstrategy.RestartStrategies
 import org.apache.flink.api.scala.ExecutionEnvironment
 import org.apache.flink.streaming.api.CheckpointingMode
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
 import org.apache.streams.flink.{FlinkBatchConfiguration, FlinkStreamingConfiguration, StreamsFlinkConfiguration}
@@ -48,6 +51,13 @@ trait FlinkBase {
   var executionEnvironment: ExecutionEnvironment = _
   var streamExecutionEnvironment: StreamExecutionEnvironment = _
 
+  final val basePathBucketAssigner : BasePathBucketAssigner[String] = new BasePathBucketAssigner()
+  final val rollingPolicy: DefaultRollingPolicy[String, String] = DefaultRollingPolicy.builder()
+    .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
+    .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
+    .withMaxPartSize(1024 * 1024 * 1024)
+    .build()
+
   /*
    Basic stuff for every flink job
    */
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
index 6f20216..96a7739 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
@@ -114,6 +114,7 @@ object FlinkTwitterFollowingPipeline extends FlinkBase {
 class FlinkTwitterFollowingPipeline(config: TwitterFollowingPipelineConfiguration = new StreamsConfigurator[TwitterFollowingPipelineConfiguration](classOf[TwitterFollowingPipelineConfiguration]).detectCustomConfiguration()) extends Runnable with java.io.Serializable {
 
   import FlinkTwitterFollowingPipeline._
+  import FlinkTwitterFollowingPipeline.rollingPolicy
 
   override def run(): Unit = {
 
@@ -159,7 +160,8 @@ class FlinkTwitterFollowingPipeline(config: TwitterFollowingPipelineConfiguratio
 
     val fileSink : StreamingFileSink[String] = StreamingFileSink.
       forRowFormat(new Path(outPath), new SimpleStringEncoder[String]("UTF-8")).
-      build()
+      withRollingPolicy(rollingPolicy).
+      withBucketAssigner(basePathBucketAssigner).build();
 
     if( config.getTest == true ) {
       keyed_jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
index ed45a9f..aad7f32 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
@@ -162,7 +162,8 @@ class FlinkTwitterPostsPipeline(config: TwitterPostsPipelineConfiguration = new
 
     val fileSink : StreamingFileSink[String] = StreamingFileSink.
       forRowFormat(new Path(outPath), new SimpleStringEncoder[String]("UTF-8")).
-      build()
+      withRollingPolicy(rollingPolicy).
+      withBucketAssigner(basePathBucketAssigner).build();
 
     if( config.getTest == true ) {
       keyed_jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
index 6235672..0a3de4f 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
@@ -35,8 +35,8 @@ import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction
 import org.apache.flink.streaming.api.functions.source.SourceFunction
-import org.apache.flink.streaming.api.scala.KeyedStream
 import org.apache.flink.streaming.api.scala.DataStream
+import org.apache.flink.streaming.api.scala.KeyedStream
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.streams.config.ComponentConfigurator
 import org.apache.streams.config.StreamsConfigurator
@@ -136,7 +136,9 @@ class FlinkTwitterSpritzerPipeline(config: TwitterSpritzerPipelineConfiguration
 
     val fileSink : StreamingFileSink[String] = StreamingFileSink.
       forRowFormat(new Path(outPath), new SimpleStringEncoder[String]("UTF-8")).
-      build()
+      withRollingPolicy(rollingPolicy).
+      withBucketAssigner(basePathBucketAssigner).build();
+
 
     if( config.getTest == true ) {
       keyed_jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
index 21bc911..080ce57 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
@@ -165,7 +165,9 @@ class FlinkTwitterUserInformationPipeline(config: TwitterUserInformationPipeline
 
     val fileSink : StreamingFileSink[String] = StreamingFileSink.
       forRowFormat(new Path(outPath), new SimpleStringEncoder[String]("UTF-8")).
-      build()
+      withRollingPolicy(rollingPolicy).
+      withBucketAssigner(basePathBucketAssigner).build();
+
 
     if( config.getTest == true ) {
       keyed_jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/testng.xml b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/testng.xml
index bdb250d..6c11e30 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/testng.xml
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/testng.xml
@@ -19,7 +19,7 @@
   ~ under the License.
   -->
 
-<suite name="ExampleFlinkITs" preserve-order="true">
+<suite name="ExampleFlinkITs" allow-return-values="true" preserve-order="true">
 
     <test name="FlinkTwitterUserInformationPipelineIT">
         <classes>
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala
index bfc6940..22ab957 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala
@@ -19,16 +19,20 @@
 package org.apache.streams.examples.flink.twitter.test
 
 import java.io.File
-import java.nio.file.{Files, Paths}
+import java.nio.file.Files
+import java.nio.file.Paths
 
-import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
-import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator}
+import com.typesafe.config.Config
+import com.typesafe.config.ConfigFactory
+import com.typesafe.config.ConfigParseOptions
+import org.apache.streams.config.StreamsConfigurator
 import org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration
 import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterFollowingPipeline
-import org.scalatest.FlatSpec
+import org.scalatest.Assertions._
 import org.scalatest.concurrent.Eventually._
 import org.scalatest.time.SpanSugar._
-import org.slf4j.{Logger, LoggerFactory}
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
 import org.testng.annotations.Test
 
 import scala.io.Source
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala
index 6d52d44..e71dbaf 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala
@@ -19,16 +19,20 @@
 package org.apache.streams.examples.flink.twitter.test
 
 import java.io.File
-import java.nio.file.{Files, Paths}
+import java.nio.file.Files
+import java.nio.file.Paths
 
-import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
-import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator}
+import com.typesafe.config.Config
+import com.typesafe.config.ConfigFactory
+import com.typesafe.config.ConfigParseOptions
+import org.apache.streams.config.StreamsConfigurator
 import org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration
 import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterFollowingPipeline
-import org.scalatest.FlatSpec
 import org.scalatest.concurrent.Eventually._
 import org.scalatest.time.SpanSugar._
-import org.slf4j.{Logger, LoggerFactory}
+import org.scalatest.Assertions._
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
 import org.testng.annotations.Test
 
 import scala.io.Source
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala
index a43c758..c061d43 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala
@@ -19,16 +19,20 @@
 package org.apache.streams.examples.flink.twitter.test
 
 import java.io.File
-import java.nio.file.{Files, Paths}
+import java.nio.file.Files
+import java.nio.file.Paths
 
-import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
-import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator}
+import com.typesafe.config.Config
+import com.typesafe.config.ConfigFactory
+import com.typesafe.config.ConfigParseOptions
+import org.apache.streams.config.StreamsConfigurator
 import org.apache.streams.examples.flink.twitter.TwitterPostsPipelineConfiguration
 import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterPostsPipeline
-import org.scalatest.FlatSpec
+import org.scalatest.Assertions._
 import org.scalatest.concurrent.Eventually._
 import org.scalatest.time.SpanSugar._
-import org.slf4j.{Logger, LoggerFactory}
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
 import org.testng.annotations.Test
 
 import scala.io.Source
@@ -63,9 +67,11 @@ class FlinkTwitterPostsPipelineIT {
 
     eventually (timeout(30 seconds), interval(1 seconds)) {
       assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath)))
-      assert(
-        Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size
-          >= 200)
+      val lines = Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.toList
+      assert(lines.size > 200)
+      lines foreach {
+        line => assert( line.contains("created_at") )
+      }
     }
 
   }
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala
index 8d45a66..00e32e6 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala
@@ -19,16 +19,20 @@
 package org.apache.streams.examples.flink.twitter.test
 
 import java.io.File
-import java.nio.file.{Files, Paths}
+import java.nio.file.Files
+import java.nio.file.Paths
 
-import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
-import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator}
+import com.typesafe.config.Config
+import com.typesafe.config.ConfigFactory
+import com.typesafe.config.ConfigParseOptions
+import org.apache.streams.config.StreamsConfigurator
 import org.apache.streams.examples.flink.twitter.TwitterSpritzerPipelineConfiguration
 import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterSpritzerPipeline
-import org.scalatest.FlatSpec
+import org.scalatest.Assertions._
 import org.scalatest.concurrent.Eventually._
 import org.scalatest.time.SpanSugar._
-import org.slf4j.{Logger, LoggerFactory}
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
 import org.testng.annotations.Test
 
 import scala.io.Source
@@ -36,7 +40,7 @@ import scala.io.Source
 /**
   * FlinkTwitterSpritzerPipelineIT is an integration test for FlinkTwitterSpritzerPipeline.
   */
-class FlinkTwitterSpritzerPipelineIT extends FlatSpec {
+class FlinkTwitterSpritzerPipelineIT {
 
   private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterSpritzerPipelineIT])
 
@@ -64,9 +68,11 @@ class FlinkTwitterSpritzerPipelineIT extends FlatSpec {
 
     eventually (timeout(60 seconds), interval(1 seconds)) {
       assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath)))
-      assert(
-        Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size
-          >= 10)
+      val lines = Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.toList
+      assert(lines.size > 10)
+      lines foreach {
+        line => assert( line.contains("created_at") )
+      }
     }
 
   }
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala
index 48b876a..fc582e1 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala
@@ -19,16 +19,20 @@
 package org.apache.streams.examples.flink.twitter.test
 
 import java.io.File
-import java.nio.file.{Files, Paths}
+import java.nio.file.Files
+import java.nio.file.Paths
 
-import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
-import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator}
+import com.typesafe.config.Config
+import com.typesafe.config.ConfigFactory
+import com.typesafe.config.ConfigParseOptions
+import org.apache.streams.config.StreamsConfigurator
 import org.apache.streams.examples.flink.twitter.TwitterUserInformationPipelineConfiguration
 import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline
-import org.scalatest.FlatSpec
+import org.scalatest.Assertions._
 import org.scalatest.concurrent.Eventually._
 import org.scalatest.time.SpanSugar._
-import org.slf4j.{Logger, LoggerFactory}
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
 import org.testng.annotations.Test
 
 import scala.io.Source
@@ -63,9 +67,11 @@ class FlinkTwitterUserInformationPipelineIT {
 
     eventually (timeout(30 seconds), interval(1 seconds)) {
       assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath)))
-      assert(
-        Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size
-          > 500)
+      val lines = Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.toList
+      assert(lines.size > 500)
+      lines foreach {
+        line => assert( line.contains("created_at") )
+      }
     }
 
   }