You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2020/08/12 16:02:51 UTC

[streams] 01/04: STREAMS-668 upgrade flink examples to latest flink version

This is an automated email from the ASF dual-hosted git repository.

sblackmon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/streams.git

commit c8f58e2ad23c33092337357854443f1bfeef2606
Author: Steve Blackmon <st...@peoplepatternmacpro.lan>
AuthorDate: Tue Aug 4 14:25:50 2020 -0500

    STREAMS-668 upgrade flink examples to latest flink version
    
    STREAMS-668
    upgrade flink examples to latest flink version 1.11.1
    also improved threading throughout streams-provider-twitter
    added accumulators to twitter flink pipelines
---
 .../twitter/provider/TwitterFollowingProvider.java |   4 +-
 .../twitter/provider/TwitterTimelineProvider.java  |  31 +++--
 .../provider/TwitterUserInformationProvider.java   | 127 +++++++++++++--------
 .../TwitterUserInformationProviderTask.java        |  29 ++++-
 .../flink-twitter-collection/pom.xml               |  26 +++--
 .../apache/streams/examples/flink/FlinkBase.scala  |  13 ++-
 .../collection/FlinkTwitterFollowingPipeline.scala | 103 ++++++++---------
 .../collection/FlinkTwitterPostsPipeline.scala     |  63 +++++++---
 .../collection/FlinkTwitterSpritzerPipeline.scala  |  58 ++++++----
 .../FlinkTwitterUserInformationPipeline.scala      |  62 +++++++---
 .../FollowingCollectorFlatMapFunction.scala        |  52 +++++++++
 .../TimelineCollectorFlatMapFunction.scala         |  52 +++++++++
 .../UserInformationCollectorFlatMapFunction.scala  |  48 ++++++++
 .../FlinkTwitterFollowingPipelineFollowersIT.conf  |   3 +-
 .../FlinkTwitterFollowingPipelineFriendsIT.conf    |   1 +
 .../resources/FlinkTwitterPostsPipelineIT.conf     |   1 +
 .../FlinkTwitterUserInformationPipelineIT.conf     |   9 +-
 .../src/test/resources/testng.xml                  |  54 +++++++++
 .../FlinkTwitterFollowingPipelineFollowersIT.scala |   2 +-
 .../FlinkTwitterFollowingPipelineFriendsIT.scala   |   4 +-
 .../twitter/test/FlinkTwitterPostsPipelineIT.scala |   4 +-
 .../FlinkTwitterUserInformationPipelineIT.scala    |   4 +-
 22 files changed, 562 insertions(+), 188 deletions(-)

diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
index 4603803..eb472ea 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
@@ -332,6 +332,8 @@ public class TwitterFollowingProvider implements Callable<Iterator<Follow>>, Str
   }
 
   public boolean isRunning() {
+    LOGGER.debug("providerQueue.isEmpty: {}", providerQueue.isEmpty());
+    LOGGER.debug("providerQueue.size: {}", providerQueue.size());
     LOGGER.debug("executor.isShutdown: {}", executor.isShutdown());
     LOGGER.debug("executor.isTerminated: {}", executor.isTerminated());
     LOGGER.debug("tasks.size(): {}", tasks.size());
@@ -346,7 +348,7 @@ public class TwitterFollowingProvider implements Callable<Iterator<Follow>>, Str
       allTasksComplete = false;
     }
     LOGGER.debug("allTasksComplete: {}", allTasksComplete);
-    boolean finished = allTasksComplete && tasks.size() > 0 && tasks.size() == futures.size() && executor.isShutdown() && executor.isTerminated();
+    boolean finished = tasks.size() > 0 && tasks.size() == futures.size() && executor.isShutdown() && executor.isTerminated() && allTasksComplete && providerQueue.size() == 0;
     LOGGER.debug("finished: {}", finished);
     if ( finished ) {
       running.set(false);
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index eee81c4..126683c 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -206,6 +206,7 @@ public class TwitterTimelineProvider implements Callable<Iterator<Tweet>>, Strea
     }
 
     Objects.requireNonNull(providerQueue);
+    Objects.requireNonNull(config);
     Objects.requireNonNull(config.getOauth().getConsumerKey());
     Objects.requireNonNull(config.getOauth().getConsumerSecret());
     Objects.requireNonNull(config.getOauth().getAccessToken());
@@ -244,6 +245,8 @@ public class TwitterTimelineProvider implements Callable<Iterator<Tweet>>, Strea
         )
     );
 
+    Objects.requireNonNull(executor);
+
     submitTimelineThreads(ids, names);
 
     LOGGER.info("tasks: {}", tasks.size());
@@ -323,6 +326,8 @@ public class TwitterTimelineProvider implements Callable<Iterator<Tweet>>, Strea
       running.set(false);
 
       LOGGER.info("Exiting");
+    } else {
+      LOGGER.info("Not Finished Yet...");
     }
 
     return result;
@@ -350,25 +355,37 @@ public class TwitterTimelineProvider implements Callable<Iterator<Tweet>>, Strea
   }
 
   @Override
-  public void cleanUp() {
-    ExecutorUtils.shutdownAndAwaitTermination(executor);
-  }
-
-  @Override
   public boolean isRunning() {
     LOGGER.debug("providerQueue.isEmpty: {}", providerQueue.isEmpty());
     LOGGER.debug("providerQueue.size: {}", providerQueue.size());
+    LOGGER.debug("executor.isShutdown: {}", executor.isShutdown());
     LOGGER.debug("executor.isTerminated: {}", executor.isTerminated());
     LOGGER.debug("tasks.size(): {}", tasks.size());
     LOGGER.debug("futures.size(): {}", futures.size());
-    if ( tasks.size() > 0 && tasks.size() == futures.size() && executor.isShutdown() && executor.isTerminated() ) {
+    boolean allTasksComplete;
+    if( futures.size() > 0) {
+      allTasksComplete = true;
+      for(Future<?> future : futures){
+        allTasksComplete |= !future.isDone(); // check if future is done
+      }
+    } else {
+      allTasksComplete = false;
+    }
+    LOGGER.debug("allTasksComplete: {}", allTasksComplete);
+    boolean finished = tasks.size() > 0 && tasks.size() == futures.size() && executor.isShutdown() && executor.isTerminated() && allTasksComplete && providerQueue.size() == 0;
+    LOGGER.debug("finished: {}", finished);
+    if ( finished ) {
       running.set(false);
     }
-    LOGGER.debug("isRunning: ", running.get());
     return running.get();
   }
 
   @Override
+  public void cleanUp() {
+    ExecutorUtils.shutdownAndAwaitTermination(executor);
+  }
+
+  @Override
   public Iterator<Tweet> call() {
     prepare(config);
     startStream();
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
index 6ceeb88..dc0d050 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
@@ -40,6 +40,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
+
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.typesafe.config.Config;
@@ -56,6 +57,7 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.PrintStream;
 import java.io.Serializable;
+import java.lang.Runnable;
 import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -64,6 +66,8 @@ import java.util.List;
 import java.util.Objects;
 import java.util.Queue;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -103,14 +107,15 @@ public class TwitterUserInformationProvider implements Callable<Iterator<User>>,
 
   protected Twitter client;
 
-  protected ListeningExecutorService executor;
+  protected ExecutorService executor;
 
   protected DateTime start;
   protected DateTime end;
 
   protected final AtomicBoolean running = new AtomicBoolean();
 
-  private List<ListenableFuture<Object>> futures = new ArrayList<>();
+  private List<Runnable> tasks = new ArrayList<>();
+  private List<Future<Object>> futures = new ArrayList<>();
 
   /**
    * To use from command line:
@@ -165,10 +170,11 @@ public class TwitterUserInformationProvider implements Callable<Iterator<User>>,
           System.err.println(ex.getMessage());
         }
       }
+      outStream.flush();
     }
     while ( provider.isRunning());
     provider.cleanUp();
-    outStream.flush();
+    outStream.close();
   }
 
   // TODO: this should be abstracted out
@@ -199,9 +205,19 @@ public class TwitterUserInformationProvider implements Callable<Iterator<User>>,
   public void prepare(Object configurationObject) {
 
     if ( configurationObject instanceof TwitterFollowingConfiguration ) {
-      config = (TwitterUserInformationConfiguration) configurationObject;
+      this.config = (TwitterUserInformationConfiguration) configurationObject;
     }
 
+    streamsConfiguration = StreamsConfigurator.detectConfiguration(StreamsConfigurator.getConfig());
+
+    try {
+      lock.writeLock().lock();
+      providerQueue = QueueUtils.constructQueue();
+    } finally {
+      lock.writeLock().unlock();
+    }
+
+    Objects.requireNonNull(providerQueue);
     Objects.requireNonNull(config);
     Objects.requireNonNull(config.getOauth());
     Objects.requireNonNull(config.getOauth().getConsumerKey());
@@ -211,10 +227,6 @@ public class TwitterUserInformationProvider implements Callable<Iterator<User>>,
     Objects.requireNonNull(config.getInfo());
     Objects.requireNonNull(config.getThreadsPerProvider());
 
-    streamsConfiguration = StreamsConfigurator.detectConfiguration();
-
-    Objects.requireNonNull(streamsConfiguration.getQueueSize());
-
     try {
       client = getTwitterClient();
     } catch (InstantiationException e) {
@@ -223,15 +235,6 @@ public class TwitterUserInformationProvider implements Callable<Iterator<User>>,
 
     Objects.requireNonNull(client);
 
-    try {
-      lock.writeLock().lock();
-      providerQueue = QueueUtils.constructQueue();
-    } finally {
-      lock.writeLock().unlock();
-    }
-
-    Objects.requireNonNull(providerQueue);
-
     for (String s : config.getInfo()) {
       if (s != null) {
         String potentialScreenName = s.replaceAll("@", "").trim().toLowerCase();
@@ -247,7 +250,7 @@ public class TwitterUserInformationProvider implements Callable<Iterator<User>>,
     }
 
     executor = MoreExecutors.listeningDecorator(
-        ExecutorUtils.newFixedThreadPoolWithQueueSize(
+      ExecutorUtils.newFixedThreadPoolWithQueueSize(
             config.getThreadsPerProvider().intValue(),
             streamsConfiguration.getQueueSize().intValue()
         )
@@ -255,12 +258,38 @@ public class TwitterUserInformationProvider implements Callable<Iterator<User>>,
 
     Objects.requireNonNull(executor);
 
-    // Twitter allows for batches up to 100 per request, but you cannot mix types
     submitUserInformationThreads(ids, names);
+
+    LOGGER.info("tasks: {}", tasks.size());
+    LOGGER.info("futures: {}", futures.size());
+
+  }
+
+  @Override
+  public void startStream() {
+
+    Objects.requireNonNull(executor);
+
+    LOGGER.info("startStream");
+
+    running.set(true);
+
+    LOGGER.info("running: {}", running.get());
+
+    ExecutorUtils.shutdownAndAwaitTermination(executor);
+
+    LOGGER.info("running: {}", running.get());
+
   }
 
   protected void submitUserInformationThreads(List<Long> ids, List<String> names) {
 
+    /*
+    while( idsIndex < ids.size() ) {
+      from = idsIndex
+      to = Math.min( idsIndex + 100, ids.size() - idsIndex
+    }
+     */
     int idsIndex = 0;
     while( idsIndex + 100 < ids.size() ) {
       List<Long> batchIds = ids.subList(idsIndex, idsIndex + 100);
@@ -268,7 +297,8 @@ public class TwitterUserInformationProvider implements Callable<Iterator<User>>,
           this,
           client,
           new UsersLookupRequest().withUserId(batchIds));
-      ListenableFuture future = executor.submit(providerTask);
+      tasks.add(providerTask);
+      Future future = executor.submit((Callable)providerTask);
       futures.add(future);
       LOGGER.info("Thread Submitted: {}", providerTask.request);
       idsIndex += 100;
@@ -279,7 +309,8 @@ public class TwitterUserInformationProvider implements Callable<Iterator<User>>,
           this,
           client,
           new UsersLookupRequest().withUserId(batchIds));
-      ListenableFuture future = executor.submit(providerTask);
+      tasks.add(providerTask);
+      Future future = executor.submit((Callable)providerTask);
       futures.add(future);
       LOGGER.info("Thread Submitted: {}", providerTask.request);
     }
@@ -291,7 +322,8 @@ public class TwitterUserInformationProvider implements Callable<Iterator<User>>,
           this,
           client,
           new UsersLookupRequest().withScreenName(batchNames));
-      ListenableFuture future = executor.submit(providerTask);
+      tasks.add(providerTask);
+      Future future = executor.submit((Callable)providerTask);
       futures.add(future);
       LOGGER.info("Thread Submitted: {}", providerTask.request);
       namesIndex += 100;
@@ -302,7 +334,8 @@ public class TwitterUserInformationProvider implements Callable<Iterator<User>>,
           this,
           client,
           new UsersLookupRequest().withScreenName(batchNames));
-      ListenableFuture future = executor.submit(providerTask);
+      tasks.add(providerTask);
+      Future future = executor.submit((Callable)providerTask);
       futures.add(future);
       LOGGER.info("Thread Submitted: {}", providerTask.request);
     }
@@ -310,27 +343,16 @@ public class TwitterUserInformationProvider implements Callable<Iterator<User>>,
   }
 
   @Override
-  public void startStream() {
-
-    Objects.requireNonNull(executor);
-
-    LOGGER.info("startStream: {} Threads", futures.size());
-
-    running.set(true);
-
-    executor.shutdown();
-  }
-
-  @Override
   public StreamsResultSet readCurrent() {
 
     StreamsResultSet result;
 
+    LOGGER.debug("Providing {} docs", providerQueue.size());
+
     try {
       lock.writeLock().lock();
       result = new StreamsResultSet(providerQueue);
       providerQueue = QueueUtils.constructQueue();
-      LOGGER.debug("readCurrent: {} Documents", result.size());
     } finally {
       lock.writeLock().unlock();
     }
@@ -347,27 +369,38 @@ public class TwitterUserInformationProvider implements Callable<Iterator<User>>,
   @Override
   public StreamsResultSet readRange(DateTime start, DateTime end) {
     LOGGER.debug("{} readRange", STREAMS_ID);
-    this.start = start;
-    this.end = end;
-    readCurrent();
-    return (StreamsResultSet)providerQueue.iterator();
+    throw new NotImplementedException();
   }
 
+  protected Twitter getTwitterClient() throws InstantiationException {
+    return Twitter.getInstance(config);
+  }
 
   @Override
   public boolean isRunning() {
-    if ( providerQueue.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone() ) {
-      LOGGER.info("All Threads Completed");
+    LOGGER.debug("providerQueue.isEmpty: {}", providerQueue.isEmpty());
+    LOGGER.debug("providerQueue.size: {}", providerQueue.size());
+    LOGGER.debug("executor.isTerminated: {}", executor.isTerminated());
+    LOGGER.debug("tasks.size(): {}", tasks.size());
+    LOGGER.debug("futures.size(): {}", futures.size());
+    boolean allTasksComplete;
+    if( futures.size() > 0) {
+      allTasksComplete = true;
+      for(Future<?> future : futures){
+        allTasksComplete |= !future.isDone(); // check if future is done
+      }
+    } else {
+      allTasksComplete = false;
+    }
+    LOGGER.debug("allTasksComplete: {}", allTasksComplete);
+    boolean finished = allTasksComplete && tasks.size() > 0 && tasks.size() == futures.size() && executor.isShutdown() && executor.isTerminated();
+    LOGGER.debug("finished: {}", finished);
+    if ( finished ) {
       running.set(false);
-      LOGGER.info("Exiting");
     }
     return running.get();
   }
 
-  protected Twitter getTwitterClient() throws InstantiationException {
-    return Twitter.getInstance(config);
-  }
-
   @Override
   public void cleanUp() {
     ExecutorUtils.shutdownAndAwaitTermination(executor);
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProviderTask.java
index 418ba83..40ebfcf 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProviderTask.java
@@ -23,6 +23,7 @@ import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.twitter.api.Twitter;
 import org.apache.streams.twitter.api.UsersLookupRequest;
 import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
+import org.apache.streams.twitter.pojo.Tweet;
 import org.apache.streams.twitter.pojo.User;
 import org.apache.streams.util.ComponentUtils;
 
@@ -31,14 +32,17 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.Callable;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 /**
  *  Retrieve recent posts for a single user id.
  */
-public class TwitterUserInformationProviderTask implements Runnable {
+public class TwitterUserInformationProviderTask implements Callable<Iterator<User>>, Runnable {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(TwitterUserInformationProviderTask.class);
 
@@ -47,6 +51,7 @@ public class TwitterUserInformationProviderTask implements Runnable {
   protected TwitterUserInformationProvider provider;
   protected Twitter client;
   protected UsersLookupRequest request;
+  protected List<User> responseList;
 
   /**
    * TwitterTimelineProviderTask constructor.
@@ -60,19 +65,37 @@ public class TwitterUserInformationProviderTask implements Runnable {
     this.request = request;
   }
 
+  int item_count = 0;
+
   @Override
   public void run() {
 
     LOGGER.info("Thread Starting: {}", request.toString());
 
+    responseList = new ArrayList<>();
+
     List<User> users = client.lookup(request);
 
-    for (User user : users) {
-      ComponentUtils.offerUntilSuccess(new StreamsDatum(user), provider.providerQueue);
+    responseList.addAll(users);
+
+    int item_count = 0;
+
+    if( users.size() > 0 ) {
+      for (User user : users) {
+        ComponentUtils.offerUntilSuccess(new StreamsDatum(user), provider.providerQueue);
+        LOGGER.debug("User: {}", user.getIdStr());
+      }
     }
 
     LOGGER.info("Thread Finished: {}", request.toString());
 
+    LOGGER.info("item_count: {} ", item_count);
+
   }
 
+  @Override
+  public Iterator<User> call() throws Exception {
+    run();
+    return responseList.iterator();
+  }
 }
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/pom.xml b/streams-examples/streams-examples-flink/flink-twitter-collection/pom.xml
index f39ae0a..1e2de10 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/pom.xml
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/pom.xml
@@ -34,7 +34,7 @@
     <properties>
         <testng.version>6.9.10</testng.version>
         <hdfs.version>2.7.0</hdfs.version>
-        <flink.version>1.4.2</flink.version>
+        <flink.version>1.11.1</flink.version>
         <scala.version>2.11.12</scala.version>
         <scala.binary.version>2.11</scala.binary.version>
         <scalapc.version>1.1.0</scalapc.version>
@@ -127,6 +127,18 @@
             </exclusions>
         </dependency>
         <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs-httpfs</artifactId>
+            <type>war</type>
+            <version>${hdfs.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>commons-logging</groupId>
+                    <artifactId>commons-logging</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-java</artifactId>
             <version>${flink.version}</version>
@@ -433,15 +445,9 @@
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-failsafe-plugin</artifactId>
                 <configuration>
-                    <!-- Run integration test suite rather than individual tests. -->
-                    <excludes>
-                        <exclude>**/*Test.java</exclude>
-                        <exclude>**/*Tests.java</exclude>
-                    </excludes>
-                    <includes>
-                        <include>**/*IT.java</include>
-                        <include>**/*ITs.java</include>
-                    </includes>
+                    <suiteXmlFiles>
+                        <suiteXmlFile>target/test-classes/testng.xml</suiteXmlFile>
+                    </suiteXmlFiles>
                 </configuration>
                 <dependencies>
                     <dependency>
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
index 1709d4c..3acca05 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
@@ -66,7 +66,7 @@ trait FlinkBase {
     if(StringUtils.isNotEmpty(configUrl)) {
       BASELOGGER.info("StreamsConfigurator.resolveConfig(configUrl): {}", StreamsConfigurator.resolveConfig(configUrl))
       try {
-        typesafe = StreamsConfigurator.resolveConfig(configUrl).withFallback(StreamsConfigurator.getConfig).resolve()
+        typesafe = StreamsConfigurator.resolveConfig(configUrl)
       } catch {
         case mue: MalformedURLException => {
           BASELOGGER.error("Invalid Configuration URL: ", mue)
@@ -77,6 +77,7 @@ trait FlinkBase {
           return false
         }
       }
+      StreamsConfigurator.addConfig(typesafe)
     }
     else {
       typesafe = StreamsConfigurator.getConfig
@@ -177,9 +178,10 @@ trait FlinkBase {
     }
     else if (configObject.getScheme.toString.equals("s3")) {
       inPathBuilder = configObject.getScheme + "://" + configObject.getPath + "/" + configObject.getReaderPath
-    } else {
-      throw new Exception("scheme not recognized: " + configObject.getScheme)
     }
+//    else {
+//      throw new Exception("scheme not recognized: " + configObject.getScheme)
+//    }
     inPathBuilder
   }
 
@@ -193,9 +195,10 @@ trait FlinkBase {
     }
     else if( configObject.getScheme.toString.equals("s3")) {
       outPathBuilder = configObject.getScheme + "://" + configObject.getPath + "/" + configObject.getWriterPath
-    } else {
-      throw new Exception("output scheme not recognized: " + configObject.getScheme)
     }
+//    else {
+//      throw new Exception("output scheme not recognized: " + configObject.getScheme)
+//    }
     outPathBuilder
   }
 
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
index bbf83f8..bae988c 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
@@ -19,32 +19,28 @@
 package org.apache.streams.examples.flink.twitter.collection
 
 import java.util.Objects
-import java.util.concurrent.TimeUnit
 
 import com.fasterxml.jackson.databind.ObjectMapper
-import com.google.common.util.concurrent.Uninterruptibles
 import org.apache.commons.lang3.StringUtils
-import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.api.common.JobExecutionResult
+import org.apache.flink.api.common.serialization.SimpleStringEncoder
 import org.apache.flink.api.scala._
 import org.apache.flink.core.fs.FileSystem
+import org.apache.flink.core.fs.Path
 import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}
-import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink
-import org.apache.flink.util.Collector
-import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
+import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+import org.apache.flink.streaming.api.scala.DataStream
+import org.apache.flink.streaming.api.scala.KeyedStream
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.streams.config.StreamsConfigurator
 import org.apache.streams.core.StreamsDatum
 import org.apache.streams.examples.flink.FlinkBase
 import org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration
-import org.apache.streams.flink.{FlinkStreamingConfiguration, StreamsFlinkConfiguration}
-import org.apache.streams.hdfs.{HdfsReaderConfiguration, HdfsWriterConfiguration}
 import org.apache.streams.jackson.StreamsJacksonMapper
-import org.apache.streams.twitter.config.TwitterFollowingConfiguration
 import org.apache.streams.twitter.pojo.Follow
-import org.apache.streams.twitter.provider.TwitterFollowingProvider
 import org.hamcrest.MatcherAssert
-import org.slf4j.{Logger, LoggerFactory}
-
-import scala.collection.JavaConversions._
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
 
 /**
   * FlinkTwitterFollowingPipeline collects friends or followers of all profiles from a
@@ -58,8 +54,12 @@ object FlinkTwitterFollowingPipeline extends FlinkBase {
   private val MAPPER: ObjectMapper = StreamsJacksonMapper.getInstance()
 
   override def main(args: Array[String]) = {
-    super.main(args)
-    val jobConfig = new ComponentConfigurator[TwitterFollowingPipelineConfiguration](classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(typesafe)
+    if( args.length > 0 ) {
+      LOGGER.info("Args: {}", args)
+      configUrl = args(0)
+    }
+    if( !setup(configUrl) ) System.exit(1)
+    val jobConfig = new StreamsConfigurator(classOf[TwitterFollowingPipelineConfiguration]).detectCustomConfiguration()
     if( !setup(jobConfig) ) System.exit(1)
     val pipeline: FlinkTwitterFollowingPipeline = new FlinkTwitterFollowingPipeline(jobConfig)
     val thread = new Thread(pipeline)
@@ -119,6 +119,7 @@ class FlinkTwitterFollowingPipeline(config: TwitterFollowingPipelineConfiguratio
 
     val env: StreamExecutionEnvironment = streamEnvironment(config)
 
+    env.setParallelism(streamsConfig.getParallelism().toInt)
     env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
     env.setNumberOfExecutionRetries(0)
 
@@ -126,54 +127,54 @@ class FlinkTwitterFollowingPipeline(config: TwitterFollowingPipelineConfiguratio
 
     val outPath = buildWriterPath(config.getDestination)
 
-    val keyed_ids: KeyedStream[String, Int] = env.readTextFile(inPath).setParallelism(10).keyBy( id => (id.hashCode % 100).abs )
+    val ids: DataStream[String] = env.readTextFile(inPath)
+
+    val keyed_ids: KeyedStream[String, Int] = ids.
+      name("keyed_ids").
+      setParallelism(streamsConfig.getParallelism().toInt).
+      keyBy( id => (id.hashCode % streamsConfig.getParallelism().toInt).abs )
 
     // these datums contain 'Follow' objects
-    val followDatums: DataStream[StreamsDatum] =
-      keyed_ids.flatMap(new FollowingCollectorFlatMapFunction(config.getTwitter)).setParallelism(10)
+    val followDatums: DataStream[StreamsDatum] = keyed_ids.
+      flatMap(new FollowingCollectorFlatMapFunction(streamsConfig, config.getTwitter, streamsFlinkConfiguration)).
+      name("followDatums").
+      setParallelism(streamsConfig.getParallelism().toInt)
 
-    val follows: DataStream[Follow] = followDatums
+    val follows: DataStream[Follow] = followDatums.
+      name("follows")
       .map(datum => datum.getDocument.asInstanceOf[Follow])
 
-    val jsons: DataStream[String] = follows
+    val jsons: DataStream[String] = follows.
+      name("jsons")
       .map(follow => {
         val MAPPER = StreamsJacksonMapper.getInstance
         MAPPER.writeValueAsString(follow)
-      })
-
-    if( config.getTest == false )
-      jsons.addSink(new BucketingSink[String](outPath)).setParallelism(3)
-    else
-      jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
-        .setParallelism(env.getParallelism)
+      }).
+      setParallelism(streamsConfig.getParallelism().toInt)
+
+    val keyed_jsons: KeyedStream[String, Int] = jsons.
+      name("keyed_jsons").
+      setParallelism(streamsConfig.getParallelism().toInt).
+      keyBy( id => (id.hashCode % streamsConfig.getParallelism().toInt).abs )
+
+    val fileSink : StreamingFileSink[String] = StreamingFileSink.
+      forRowFormat(new Path(outPath), new SimpleStringEncoder[String]("UTF-8")).
+      build()
+
+    if( config.getTest == true ) {
+      keyed_jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
+    } else {
+      keyed_jsons.name("fileSink").addSink(fileSink)
+    }
 
-    // if( test == true ) jsons.print();
+    val result: JobExecutionResult = env.execute("FlinkTwitterFollowingPipeline")
 
-    env.execute(STREAMS_ID)
-  }
+    LOGGER.info("JobExecutionResult: {}", result.getJobExecutionResult)
 
-  class FollowingCollectorFlatMapFunction(
-                                           twitterConfiguration : TwitterFollowingConfiguration = new ComponentConfigurator(classOf[TwitterFollowingConfiguration]).detectConfiguration(),
-                                           flinkConfiguration : StreamsFlinkConfiguration = new ComponentConfigurator(classOf[StreamsFlinkConfiguration]).detectConfiguration()
-                                         ) extends RichFlatMapFunction[String, StreamsDatum] with Serializable {
+    LOGGER.info("JobExecutionResult.getNetRuntime: {}", result.getNetRuntime())
 
-    override def flatMap(input: String, out: Collector[StreamsDatum]): Unit = {
-      collectConnections(input, out)
-    }
+    LOGGER.info("JobExecutionResult.getAllAccumulatorResults: {}", MAPPER.writeValueAsString(result.getAllAccumulatorResults()))
 
-    def collectConnections(id : String, out : Collector[StreamsDatum]) = {
-      val twitProvider: TwitterFollowingProvider =
-        new TwitterFollowingProvider(
-          twitterConfiguration.withInfo(List(toProviderId(id))).asInstanceOf[TwitterFollowingConfiguration]
-        )
-      twitProvider.prepare(twitProvider)
-      twitProvider.startStream()
-      var iterator: Iterator[StreamsDatum] = null
-      do {
-        Uninterruptibles.sleepUninterruptibly(flinkConfiguration.getProviderWaitMs, TimeUnit.MILLISECONDS)
-        twitProvider.readCurrent().iterator().toList.map(out.collect(_))
-      } while( twitProvider.isRunning )
-    }
   }
 
 }
\ No newline at end of file
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
index 28d2576..14eebc5 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
@@ -24,24 +24,31 @@ import java.util.concurrent.TimeUnit
 import com.fasterxml.jackson.databind.ObjectMapper
 import com.google.common.util.concurrent.Uninterruptibles
 import org.apache.commons.lang3.StringUtils
+import org.apache.flink.api.common.JobExecutionResult
 import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.api.common.serialization.SimpleStringEncoder
 import org.apache.flink.api.scala._
 import org.apache.flink.core.fs.FileSystem
+import org.apache.flink.core.fs.Path
 import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}
-import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink
+import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+import org.apache.flink.streaming.api.scala.DataStream
+import org.apache.flink.streaming.api.scala.KeyedStream
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.util.Collector
-import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
+import org.apache.streams.config.ComponentConfigurator
+import org.apache.streams.config.StreamsConfigurator
 import org.apache.streams.core.StreamsDatum
 import org.apache.streams.examples.flink.FlinkBase
 import org.apache.streams.examples.flink.twitter.TwitterPostsPipelineConfiguration
-import org.apache.streams.flink.FlinkStreamingConfiguration
-import org.apache.streams.hdfs.{HdfsReaderConfiguration, HdfsWriterConfiguration}
+import org.apache.streams.hdfs.HdfsReaderConfiguration
+import org.apache.streams.hdfs.HdfsWriterConfiguration
 import org.apache.streams.jackson.StreamsJacksonMapper
 import org.apache.streams.twitter.pojo.Tweet
 import org.apache.streams.twitter.provider.TwitterTimelineProvider
 import org.hamcrest.MatcherAssert
-import org.slf4j.{Logger, LoggerFactory}
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
 
 import scala.collection.JavaConversions._
 
@@ -57,8 +64,12 @@ object FlinkTwitterPostsPipeline extends FlinkBase {
   private val MAPPER: ObjectMapper = StreamsJacksonMapper.getInstance()
 
   override def main(args: Array[String]) = {
-    super.main(args)
-    val jobConfig = new ComponentConfigurator[TwitterPostsPipelineConfiguration](classOf[TwitterPostsPipelineConfiguration]).detectConfiguration(typesafe)
+    if( args.length > 0 ) {
+      LOGGER.info("Args: {}", args)
+      configUrl = args(0)
+    }
+    if( !setup(configUrl) ) System.exit(1)
+    val jobConfig = new StreamsConfigurator(classOf[TwitterPostsPipelineConfiguration]).detectCustomConfiguration()
     if( !setup(jobConfig) ) System.exit(1)
     val pipeline: FlinkTwitterPostsPipeline = new FlinkTwitterPostsPipeline(jobConfig)
     val thread = new Thread(pipeline)
@@ -118,6 +129,7 @@ class FlinkTwitterPostsPipeline(config: TwitterPostsPipelineConfiguration = new
 
     val env: StreamExecutionEnvironment = streamEnvironment(config)
 
+    env.setParallelism(streamsConfig.getParallelism().toInt)
     env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
     env.setNumberOfExecutionRetries(0)
 
@@ -125,13 +137,15 @@ class FlinkTwitterPostsPipeline(config: TwitterPostsPipelineConfiguration = new
 
     val outPath = buildWriterPath(new ComponentConfigurator(classOf[HdfsWriterConfiguration]).detectConfiguration())
 
-    val ids: DataStream[String] = env.readTextFile(inPath).setParallelism(10).name("ids")
+    val ids: DataStream[String] = env.readTextFile(inPath).name("ids")
 
-    val keyed_ids: KeyedStream[String, Int] = env.readTextFile(inPath).setParallelism(10).name("keyed_ids").keyBy( id => (id.hashCode % 100).abs )
+    val keyed_ids: KeyedStream[String, Int] = ids.name("keyed_ids").
+      setParallelism(streamsConfig.getParallelism().toInt).
+      keyBy( id => (id.hashCode % streamsConfig.getParallelism().toInt).abs )
 
     // these datums contain 'Tweet' objects
     val tweetDatums: DataStream[StreamsDatum] =
-      keyed_ids.flatMap(new postCollectorFlatMapFunction).setParallelism(10).name("tweetDatums")
+      keyed_ids.flatMap(new postCollectorFlatMapFunction).setParallelism(env.getParallelism).name("tweetDatums")
 
     val tweets: DataStream[Tweet] = tweetDatums
       .map(datum => datum.getDocument.asInstanceOf[Tweet]).name("tweets")
@@ -142,15 +156,28 @@ class FlinkTwitterPostsPipeline(config: TwitterPostsPipelineConfiguration = new
         MAPPER.writeValueAsString(tweet)
       }).name("json")
 
-    if( config.getTest == false )
-      jsons.addSink(new BucketingSink[String](outPath)).setParallelism(3).name("hdfs")
-    else
-      jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
-        .setParallelism(env.getParallelism)
+    val keyed_jsons: KeyedStream[String, Int] = jsons.
+      setParallelism(streamsConfig.getParallelism().toInt).
+      keyBy( id => (id.hashCode % streamsConfig.getParallelism().toInt).abs )
+
+    val fileSink : StreamingFileSink[String] = StreamingFileSink.
+      forRowFormat(new Path(outPath), new SimpleStringEncoder[String]("UTF-8")).
+      build()
+
+    if( config.getTest == true ) {
+      keyed_jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
+    } else {
+      keyed_jsons.addSink(fileSink)
+    }
+
+    val result: JobExecutionResult = env.execute("FlinkTwitterPostsPipeline")
+
+    LOGGER.info("JobExecutionResult: {}", result.getJobExecutionResult)
+
+    LOGGER.info("JobExecutionResult.getNetRuntime: {}", result.getNetRuntime())
 
-    // if( test == true ) jsons.print();
+    LOGGER.info("JobExecutionResult.getAllAccumulatorResults: {}", MAPPER.writeValueAsString(result.getAllAccumulatorResults()))
 
-    env.execute(STREAMS_ID)
   }
 
   class postCollectorFlatMapFunction extends RichFlatMapFunction[String, StreamsDatum] with Serializable {
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
index 79c95d6..6e38591 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
@@ -25,26 +25,32 @@ import java.util.concurrent.TimeUnit
 import com.fasterxml.jackson.databind.ObjectMapper
 import com.google.common.util.concurrent.Uninterruptibles
 import org.apache.commons.lang3.StringUtils
-import org.apache.flink.api.common.functions.StoppableFunction
+import org.apache.flink.api.common.JobExecutionResult
+import org.apache.flink.api.common.serialization.SimpleStringEncoder
 import org.apache.flink.api.scala._
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.core.fs.FileSystem
+import org.apache.flink.core.fs.Path
 import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
-import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
-import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink
-import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
+import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.streaming.api.scala.KeyedStream
+import org.apache.flink.streaming.api.scala.DataStream
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.streams.config.ComponentConfigurator
+import org.apache.streams.config.StreamsConfigurator
 import org.apache.streams.core.StreamsDatum
 import org.apache.streams.examples.flink.FlinkBase
 import org.apache.streams.examples.flink.twitter.TwitterSpritzerPipelineConfiguration
-import org.apache.streams.flink.FlinkStreamingConfiguration
 import org.apache.streams.hdfs.HdfsWriterConfiguration
 import org.apache.streams.jackson.StreamsJacksonMapper
 import org.apache.streams.twitter.config.TwitterStreamConfiguration
 import org.apache.streams.twitter.converter.TwitterDateTimeFormat
 import org.apache.streams.twitter.provider.TwitterStreamProvider
 import org.hamcrest.MatcherAssert
-import org.slf4j.{Logger, LoggerFactory}
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
 
 import scala.collection.JavaConversions._
 
@@ -122,25 +128,37 @@ class FlinkTwitterSpritzerPipeline(config: TwitterSpritzerPipelineConfiguration
 
     val outPath = buildWriterPath(new ComponentConfigurator(classOf[HdfsWriterConfiguration]).detectConfiguration())
 
-    val streamSource : DataStream[String] = env.addSource(spritzerSource)
+    val jsons : DataStream[String] = env.addSource(spritzerSource)
 
-    if( config.getTest == false )
-      streamSource.addSink(new BucketingSink[String](outPath)).setParallelism(3).name("hdfs")
-    else
-      streamSource.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
-        .setParallelism(env.getParallelism)
+    val keyed_jsons: KeyedStream[String, Int] = jsons.
+      setParallelism(streamsConfig.getParallelism().toInt).
+      keyBy( id => (id.hashCode % streamsConfig.getParallelism().toInt).abs )
 
-    // if( test == true ) jsons.print();
+    val fileSink : StreamingFileSink[String] = StreamingFileSink.
+      forRowFormat(new Path(outPath), new SimpleStringEncoder[String]("UTF-8")).
+      build()
 
-    env.execute(STREAMS_ID)
+    if( config.getTest == true ) {
+      keyed_jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
+    } else {
+      keyed_jsons.addSink(fileSink)
+    }
+
+    val result: JobExecutionResult = env.execute("FlinkTwitterPostsPipeline")
+
+    LOGGER.info("JobExecutionResult: {}", result.getJobExecutionResult)
+
+    LOGGER.info("JobExecutionResult.getNetRuntime: {}", result.getNetRuntime())
+
+    LOGGER.info("JobExecutionResult.getAllAccumulatorResults: {}", MAPPER.writeValueAsString(result.getAllAccumulatorResults()))
 
   }
 
   def stop(): Unit = {
-    spritzerSource.stop()
+    spritzerSource.cancel()
   }
 
-  class SpritzerSource(sourceConfig: TwitterStreamConfiguration) extends RichSourceFunction[String] with Serializable with StoppableFunction {
+  class SpritzerSource(sourceConfig: TwitterStreamConfiguration) extends RichSourceFunction[String] with Serializable /*with StoppableFunction*/ {
 
     var mapper: ObjectMapper = _
 
@@ -167,9 +185,9 @@ class FlinkTwitterSpritzerPipeline(config: TwitterSpritzerPipelineConfiguration
       close()
     }
 
-    override def stop(): Unit = {
-      close()
-    }
+//    override def stop(): Unit = {
+//      close()
+//    }
   }
 
 
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
index 977fdf5..32cf232 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
@@ -24,26 +24,34 @@ import java.util.concurrent.TimeUnit
 import com.fasterxml.jackson.databind.ObjectMapper
 import com.google.common.util.concurrent.Uninterruptibles
 import org.apache.commons.lang3.StringUtils
+import org.apache.flink.api.common.JobExecutionResult
 import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.api.common.serialization.SimpleStringEncoder
 import org.apache.flink.api.scala._
 import org.apache.flink.core.fs.FileSystem
+import org.apache.flink.core.fs.Path
 import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+import org.apache.flink.streaming.api.scala.DataStream
+import org.apache.flink.streaming.api.scala.KeyedStream
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.scala.WindowedStream
 import org.apache.flink.streaming.api.scala.function.WindowFunction
-import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
-import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink
 import org.apache.flink.util.Collector
-import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
+import org.apache.streams.config.ComponentConfigurator
+import org.apache.streams.config.StreamsConfigurator
 import org.apache.streams.core.StreamsDatum
 import org.apache.streams.examples.flink.FlinkBase
 import org.apache.streams.examples.flink.twitter.TwitterUserInformationPipelineConfiguration
-import org.apache.streams.flink.FlinkStreamingConfiguration
-import org.apache.streams.hdfs.{HdfsReaderConfiguration, HdfsWriterConfiguration}
+import org.apache.streams.hdfs.HdfsReaderConfiguration
+import org.apache.streams.hdfs.HdfsWriterConfiguration
 import org.apache.streams.jackson.StreamsJacksonMapper
 import org.apache.streams.twitter.pojo.User
 import org.apache.streams.twitter.provider.TwitterUserInformationProvider
 import org.hamcrest.MatcherAssert
-import org.slf4j.{Logger, LoggerFactory}
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
 
 import scala.collection.JavaConversions._
 
@@ -59,8 +67,12 @@ object FlinkTwitterUserInformationPipeline extends FlinkBase {
   private val MAPPER: ObjectMapper = StreamsJacksonMapper.getInstance()
 
   override def main(args: Array[String]) = {
-    super.main(args)
-    val jobConfig = new ComponentConfigurator[TwitterUserInformationPipelineConfiguration](classOf[TwitterUserInformationPipelineConfiguration]).detectConfiguration(typesafe)
+    if( args.length > 0 ) {
+      LOGGER.info("Args: {}", args)
+      configUrl = args(0)
+    }
+    if( !setup(configUrl) ) System.exit(1)
+    val jobConfig = new StreamsConfigurator(classOf[TwitterUserInformationPipelineConfiguration]).detectCustomConfiguration()
     if( !setup(jobConfig) ) System.exit(1)
     val pipeline: FlinkTwitterUserInformationPipeline = new FlinkTwitterUserInformationPipeline(jobConfig)
     val thread = new Thread(pipeline)
@@ -115,11 +127,13 @@ object FlinkTwitterUserInformationPipeline extends FlinkBase {
 class FlinkTwitterUserInformationPipeline(config: TwitterUserInformationPipelineConfiguration = new StreamsConfigurator[TwitterUserInformationPipelineConfiguration](classOf[TwitterUserInformationPipelineConfiguration]).detectCustomConfiguration()) extends Runnable with java.io.Serializable {
 
   import FlinkTwitterUserInformationPipeline._
+  //import FlinkBase.streamsConfig
 
   override def run(): Unit = {
 
     val env: StreamExecutionEnvironment = streamEnvironment(config)
 
+    env.setParallelism(streamsConfig.getParallelism().toInt)
     env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
     env.setNumberOfExecutionRetries(0)
 
@@ -127,7 +141,7 @@ class FlinkTwitterUserInformationPipeline(config: TwitterUserInformationPipeline
 
     val outPath = buildWriterPath(new ComponentConfigurator(classOf[HdfsWriterConfiguration]).detectConfiguration())
 
-    val ids: DataStream[String] = env.readTextFile(inPath).setParallelism(10).name("ids")
+    val ids: DataStream[String] = env.readTextFile(inPath).setParallelism(env.getParallelism).name("ids")
 
     val keyed_ids: KeyedStream[String, Int] = ids.name("keyed_ids").keyBy( id => (id.hashCode % 100).abs )
 
@@ -135,7 +149,7 @@ class FlinkTwitterUserInformationPipeline(config: TwitterUserInformationPipeline
 
     val idLists: DataStream[List[String]] = idWindows.apply[List[String]] (new idListWindowFunction()).name("idLists")
 
-    val userDatums: DataStream[StreamsDatum] = idLists.flatMap(new profileCollectorFlatMapFunction).setParallelism(10).name("userDatums")
+    val userDatums: DataStream[StreamsDatum] = idLists.flatMap(new profileCollectorFlatMapFunction).setParallelism(env.getParallelism).name("userDatums")
 
     val user: DataStream[User] = userDatums.map(datum => datum.getDocument.asInstanceOf[User]).name("users")
 
@@ -145,15 +159,29 @@ class FlinkTwitterUserInformationPipeline(config: TwitterUserInformationPipeline
         MAPPER.writeValueAsString(user)
       }).name("jsons")
 
-    if( config.getTest == false )
-      jsons.addSink(new BucketingSink[String](outPath)).setParallelism(3).name("hdfs")
-    else
-      jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
-        .setParallelism(env.getParallelism)
+    val keyed_jsons: KeyedStream[String, Int] = jsons.
+      setParallelism(streamsConfig.getParallelism().toInt).
+      keyBy( id => (id.hashCode % streamsConfig.getParallelism().toInt).abs )
+
+    val fileSink : StreamingFileSink[String] = StreamingFileSink.
+      forRowFormat(new Path(outPath), new SimpleStringEncoder[String]("UTF-8")).
+      build()
+
+    if( config.getTest == true ) {
+      keyed_jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
+    } else {
+      keyed_jsons.addSink(fileSink)
+    }
+
+    val result: JobExecutionResult = env.execute("FlinkTwitterUserInformationPipeline")
+
+    LOGGER.info("JobExecutionResult: {}", result.getJobExecutionResult)
+
+    LOGGER.info("JobExecutionResult.getNetRuntime: {}", result.getNetRuntime())
+
+    LOGGER.info("JobExecutionResult.getAllAccumulatorResults: {}", MAPPER.writeValueAsString(result.getAllAccumulatorResults()))
 
-    LOGGER.info("StreamExecutionEnvironment: {}", env.toString )
 
-    env.execute(STREAMS_ID)
   }
 
   class idListWindowFunction extends WindowFunction[String, List[String], Int, GlobalWindow] {
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FollowingCollectorFlatMapFunction.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FollowingCollectorFlatMapFunction.scala
new file mode 100644
index 0000000..1223047
--- /dev/null
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FollowingCollectorFlatMapFunction.scala
@@ -0,0 +1,52 @@
+package org.apache.streams.examples.flink.twitter.collection
+
+import java.util.concurrent.TimeUnit
+
+import com.google.common.util.concurrent.Uninterruptibles
+import org.apache.flink.api.common.accumulators.IntCounter
+import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.Collector
+import org.apache.streams.config.ComponentConfigurator
+import org.apache.streams.config.StreamsConfiguration
+import org.apache.streams.core.StreamsDatum
+import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterFollowingPipeline.toProviderId
+import org.apache.streams.flink.StreamsFlinkConfiguration
+import org.apache.streams.twitter.config.TwitterFollowingConfiguration
+import org.apache.streams.twitter.provider.TwitterFollowingProvider
+
+import scala.collection.JavaConversions._
+
+class FollowingCollectorFlatMapFunction(
+                                         streamsConfiguration : StreamsConfiguration,
+                                         twitterConfiguration : TwitterFollowingConfiguration = new ComponentConfigurator(classOf[TwitterFollowingConfiguration]).detectConfiguration(),
+                                         flinkConfiguration : StreamsFlinkConfiguration = new ComponentConfigurator(classOf[StreamsFlinkConfiguration]).detectConfiguration()
+                                       ) extends RichFlatMapFunction[String, StreamsDatum] with Serializable {
+
+  var size : IntCounter = new IntCounter()
+  var counter : IntCounter = new IntCounter()
+
+  override def open(parameters: Configuration): Unit = {
+    getRuntimeContext().addAccumulator("FlinkTwitterFollowingPipeline.size", this.size)
+    getRuntimeContext().addAccumulator("FlinkTwitterFollowingPipeline.counter", this.counter)
+  }
+
+  override def flatMap(input: String, out: Collector[StreamsDatum]): Unit = {
+    size.add(input.size)
+    collectConnections(input, out)
+  }
+
+  def collectConnections(id : String, out : Collector[StreamsDatum]) = {
+    val conf = twitterConfiguration.withInfo(List(toProviderId(id))).asInstanceOf[TwitterFollowingConfiguration]
+    val twitProvider: TwitterFollowingProvider = new TwitterFollowingProvider(conf)
+    twitProvider.prepare(twitProvider)
+    twitProvider.startStream()
+    do {
+      Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getProviderWaitMs, TimeUnit.MILLISECONDS)
+      val current = twitProvider.readCurrent().iterator().toList
+      counter.add(current.size)
+      current.map(out.collect(_))
+    } while( twitProvider.isRunning )
+  }
+
+}
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/TimelineCollectorFlatMapFunction.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/TimelineCollectorFlatMapFunction.scala
new file mode 100644
index 0000000..8f21145
--- /dev/null
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/TimelineCollectorFlatMapFunction.scala
@@ -0,0 +1,52 @@
+package org.apache.streams.examples.flink.twitter.collection
+
+import java.util.concurrent.TimeUnit
+
+import com.google.common.util.concurrent.Uninterruptibles
+import org.apache.flink.api.common.accumulators.IntCounter
+import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.Collector
+import org.apache.streams.config.StreamsConfiguration
+import org.apache.streams.core.StreamsDatum
+import org.apache.streams.flink.StreamsFlinkConfiguration
+import org.apache.streams.twitter.config.TwitterTimelineProviderConfiguration
+import org.apache.streams.twitter.provider.TwitterTimelineProvider
+
+import scala.collection.JavaConversions._
+
+/**
+  * Created by sblackmon on 6/2/16.
+  */
+class TimelineCollectorFlatMapFunction(
+                                        streamsConfiguration : StreamsConfiguration,
+                                        twitterConfiguration : TwitterTimelineProviderConfiguration,
+                                        streamsFlinkConfiguration : StreamsFlinkConfiguration
+                                      ) extends RichFlatMapFunction[List[String], StreamsDatum] with Serializable {
+  var size : IntCounter = new IntCounter()
+  var counter : IntCounter = new IntCounter()
+  override def open(parameters: Configuration): Unit = {
+    getRuntimeContext().addAccumulator("TimelineCollectorFlatMapFunction.size", this.size)
+    getRuntimeContext().addAccumulator("TimelineCollectorFlatMapFunction.counter", this.counter)
+  }
+  override def flatMap(input: List[String], out: Collector[StreamsDatum]): Unit = {
+    size.add(input.size)
+    collectPosts(input, out)
+  }
+  def collectPosts(ids : List[String], out : Collector[StreamsDatum]) = {
+    try {
+      val conf = twitterConfiguration.withInfo(ids).asInstanceOf[TwitterTimelineProviderConfiguration]
+      val twitProvider: TwitterTimelineProvider = new TwitterTimelineProvider(conf)
+      twitProvider.prepare(conf)
+      twitProvider.startStream()
+      do {
+        Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getProviderWaitMs, TimeUnit.MILLISECONDS)
+        val current = twitProvider.readCurrent().iterator().toList
+        counter.add(current.size)
+        current.map(out.collect(_))
+      } while( twitProvider.isRunning )
+    } finally {
+
+    }
+  }
+}
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/UserInformationCollectorFlatMapFunction.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/UserInformationCollectorFlatMapFunction.scala
new file mode 100644
index 0000000..46e0d4a
--- /dev/null
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/UserInformationCollectorFlatMapFunction.scala
@@ -0,0 +1,48 @@
+package org.apache.streams.examples.flink.twitter.collection
+
+import java.util.concurrent.TimeUnit
+
+import com.google.common.util.concurrent.Uninterruptibles
+import org.apache.flink.api.common.accumulators.IntCounter
+import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.Collector
+import org.apache.streams.config.StreamsConfiguration
+import org.apache.streams.core.StreamsDatum
+import org.apache.streams.flink.StreamsFlinkConfiguration
+import org.apache.streams.twitter.config.TwitterUserInformationConfiguration
+import org.apache.streams.twitter.provider.TwitterUserInformationProvider
+
+import scala.collection.JavaConversions._
+
+/**
+  * Created by sblackmon on 6/2/16.
+  */
+class UserInformationCollectorFlatMapFunction(
+                                               streamsConfiguration : StreamsConfiguration,
+                                               twitterConfiguration : TwitterUserInformationConfiguration,
+                                               streamsFlinkConfiguration : StreamsFlinkConfiguration
+                                             ) extends RichFlatMapFunction[List[String], StreamsDatum] with Serializable {
+  var size : IntCounter = new IntCounter()
+  var counter : IntCounter = new IntCounter()
+  override def open(parameters: Configuration): Unit = {
+    getRuntimeContext().addAccumulator("UserInformationCollectorFlatMapFunction.size", this.size)
+    getRuntimeContext().addAccumulator("UserInformationCollectorFlatMapFunction.counter", this.counter)
+  }
+  override def flatMap(input: List[String], out: Collector[StreamsDatum]): Unit = {
+    size.add(input.size)
+    collectProfiles(input, out)
+  }
+  def collectProfiles(ids : List[String], out : Collector[StreamsDatum]) = {
+    val conf = twitterConfiguration.withInfo(ids)
+    val twitProvider: TwitterUserInformationProvider = new TwitterUserInformationProvider(conf)
+    twitProvider.prepare(conf)
+    twitProvider.startStream()
+    do {
+      Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getProviderWaitMs, TimeUnit.MILLISECONDS)
+      val current = twitProvider.readCurrent().iterator().toList
+      counter.add(current.size)
+      current.map(out.collect(_))
+    } while( twitProvider.isRunning )
+  }
+}
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf
index 67f7c9c..ec4dc4e 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf
@@ -32,9 +32,10 @@ org.apache.streams.twitter.config.TwitterFollowingConfiguration {
   max_items = 5000
 }
 org.apache.streams.config.StreamsConfiguration {
+  parallelism = 1
   providerWaitMs = 1000
 }
 org.apache.streams.flink.StreamsFlinkConfiguration {
   local = true
   test = true
-}
\ No newline at end of file
+}
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf
index 724cd43..fd4b1db 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf
@@ -31,6 +31,7 @@ org.apache.streams.twitter.config.TwitterFollowingConfiguration {
   ids_only = true
 }
 org.apache.streams.config.StreamsConfiguration {
+  parallelism = 1
   providerWaitMs = 1000
 }
 org.apache.streams.flink.StreamsFlinkConfiguration {
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipelineIT.conf b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipelineIT.conf
index 626d8f6..8f5aadf 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipelineIT.conf
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipelineIT.conf
@@ -27,6 +27,7 @@ org.apache.streams.hdfs.HdfsWriterConfiguration {
   writerPath = "FlinkTwitterPostsPipelineIT"
 }
 org.apache.streams.config.StreamsConfiguration {
+  parallelism = 1
   providerWaitMs = 1000
 }
 org.apache.streams.flink.StreamsFlinkConfiguration {
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf
index ea8125a..cbdb650 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf
@@ -28,8 +28,15 @@ org.apache.streams.hdfs.HdfsWriterConfiguration {
 }
 org.apache.streams.config.StreamsConfiguration {
   providerWaitMs = 1000
+  queueSize = 10000
+  batchSize = 1000
+  identifier = "FlinkTwitterUserInformationPipelineIT"
+  parallelism = 1
+  providerTimeoutMs = 60000
+  shutdownCheckDelay = 30000
+  shutdownCheckInterval = 30000
 }
 org.apache.streams.flink.StreamsFlinkConfiguration {
   local = true
   test = true
-}
\ No newline at end of file
+}
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/testng.xml b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/testng.xml
new file mode 100644
index 0000000..bdb250d
--- /dev/null
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/testng.xml
@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE suite SYSTEM "http://testng.org/testng-1.0.dtd" >
+
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<suite name="ExampleFlinkITs" preserve-order="true">
+
+    <test name="FlinkTwitterUserInformationPipelineIT">
+        <classes>
+            <class name="org.apache.streams.examples.flink.twitter.test.FlinkTwitterUserInformationPipelineIT" />
+        </classes>
+    </test>
+
+    <test name="FlinkTwitterPostsPipelineIT">
+        <classes>
+            <class name="org.apache.streams.examples.flink.twitter.test.FlinkTwitterPostsPipelineIT" />
+        </classes>
+    </test>
+
+    <test name="FlinkTwitterFollowingPipelineFriendsIT">
+        <classes>
+            <class name="org.apache.streams.examples.flink.twitter.test.FlinkTwitterFollowingPipelineFriendsIT" />
+        </classes>
+    </test>
+
+    <test name="FlinkTwitterFollowingPipelineFollowersIT">
+        <classes>
+            <class name="org.apache.streams.examples.flink.twitter.test.FlinkTwitterFollowingPipelineFollowersIT" />
+        </classes>
+    </test>
+
+    <test name="FlinkTwitterSpritzerPipelineIT">
+        <classes>
+            <class name="org.apache.streams.examples.flink.twitter.test.FlinkTwitterSpritzerPipelineIT" />
+        </classes>
+    </test>
+
+</suite>  
\ No newline at end of file
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala
index 617815e..bfc6940 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala
@@ -36,7 +36,7 @@ import scala.io.Source
 /**
   * FlinkTwitterFollowingPipelineFollowersIT is an integration test for FlinkTwitterFollowingPipeline.
   */
-class FlinkTwitterFollowingPipelineFollowersIT extends FlatSpec {
+class FlinkTwitterFollowingPipelineFollowersIT {
 
   private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterFollowingPipelineFollowersIT])
 
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala
index 5ebea94..6d52d44 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala
@@ -16,7 +16,7 @@
  * under the License.
  */
 
-package com.peoplepattern.streams.twitter.collection
+package org.apache.streams.examples.flink.twitter.test
 
 import java.io.File
 import java.nio.file.{Files, Paths}
@@ -36,7 +36,7 @@ import scala.io.Source
 /**
   * FlinkTwitterFollowingPipelineFriendsIT is an integration test for FlinkTwitterFollowingPipeline.
   */
-class FlinkTwitterFollowingPipelineFriendsIT extends FlatSpec {
+class FlinkTwitterFollowingPipelineFriendsIT {
 
   private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterFollowingPipelineFriendsIT])
 
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala
index ed3edf3..a43c758 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala
@@ -16,7 +16,7 @@
  * under the License.
  */
 
-package com.peoplepattern.streams.twitter.collection
+package org.apache.streams.examples.flink.twitter.test
 
 import java.io.File
 import java.nio.file.{Files, Paths}
@@ -36,7 +36,7 @@ import scala.io.Source
 /**
   * FlinkTwitterPostsPipelineIT is an integration test for FlinkTwitterPostsPipeline.
   */
-class FlinkTwitterPostsPipelineIT extends FlatSpec  {
+class FlinkTwitterPostsPipelineIT {
 
   private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterPostsPipelineIT])
 
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala
index 0652bc1..48b876a 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala
@@ -16,7 +16,7 @@
  * under the License.
  */
 
-package com.peoplepattern.streams.twitter.collection
+package org.apache.streams.examples.flink.twitter.test
 
 import java.io.File
 import java.nio.file.{Files, Paths}
@@ -36,7 +36,7 @@ import scala.io.Source
 /**
   * FlinkTwitterUserInformationPipelineIT is an integration test for FlinkTwitterUserInformationPipeline.
   */
-class FlinkTwitterUserInformationPipelineIT extends FlatSpec {
+class FlinkTwitterUserInformationPipelineIT {
 
   private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterUserInformationPipelineIT])