You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2017/04/19 19:49:49 UTC

[2/3] incubator-streams git commit: STREAMS-496: Remove twitter4j dependency from streams-provider-twitter

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersListProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersListProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersListProviderTask.java
new file mode 100644
index 0000000..d6e30e4
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersListProviderTask.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.provider;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.twitter.api.FollowersListRequest;
+import org.apache.streams.twitter.api.FollowersListResponse;
+import org.apache.streams.twitter.api.Twitter;
+import org.apache.streams.twitter.pojo.Follow;
+import org.apache.streams.twitter.pojo.User;
+import org.apache.streams.util.ComponentUtils;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *  Retrieve friend or follower connections for a single user id.
+ */
+public class TwitterFollowersListProviderTask implements Runnable {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(TwitterFollowersListProviderTask.class);
+
+  private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+  protected Twitter client;
+  protected TwitterFollowingProvider provider;
+  protected FollowersListRequest request;
+
+  private int count = 0;
+
+  /**
+   * TwitterFollowersListProviderTask constructor.
+   * @param provider TwitterFollowingProvider
+   * @param twitter Twitter
+   * @param request FollowersListRequest
+   */
+  public TwitterFollowersListProviderTask(TwitterFollowingProvider provider, Twitter twitter, FollowersListRequest request) {
+    this.provider = provider;
+    this.client = twitter;
+    this.request = request;
+  }
+
+  @Override
+  public void run() {
+
+    LOGGER.info("Thread Starting: {}", request.toString());
+
+    getFollowersList(request);
+
+    LOGGER.info("Thread Finished: {}", request.toString());
+
+  }
+
+  int last_count = 0;
+  int page_count = 1;
+  int item_count = 0;
+  long cursor = 0;
+
+  private void getFollowersList(FollowersListRequest request) {
+
+    do {
+
+      FollowersListResponse response = client.list(request);
+
+      last_count = response.getUsers().size();
+
+      if (response.getUsers().size() > 0) {
+
+        for (User follower : response.getUsers()) {
+
+          Follow follow = new Follow()
+              .withFollowee(
+                  new User()
+                      .withId(request.getId())
+                      .withScreenName(request.getScreenName()))
+              .withFollower(follower);
+
+          if (item_count < provider.getConfig().getMaxItems()) {
+            ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue);
+            item_count++;
+          }
+
+        }
+
+      }
+      page_count++;
+      cursor = response.getNextCursor();
+      request.setCursor(cursor);
+
+    }
+    while (shouldContinuePulling(cursor, last_count, page_count, item_count));
+  }
+
+  public boolean shouldContinuePulling(long cursor, int count, int page_count, int item_count) {
+    return (
+        cursor > 0
+            && count > 0
+            && item_count < provider.getConfig().getMaxItems()
+            && page_count <= provider.getConfig().getMaxPages());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
index 16b6c03..a2be967 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
@@ -26,20 +26,26 @@ import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsResultSet;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.twitter.TwitterFollowingConfiguration;
+import org.apache.streams.twitter.TwitterUserInformationConfiguration;
+import org.apache.streams.twitter.api.FollowersIdsRequest;
+import org.apache.streams.twitter.api.FollowingIdsRequest;
+import org.apache.streams.twitter.api.FriendsIdsRequest;
+import org.apache.streams.twitter.api.Twitter;
 import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
+import org.apache.streams.twitter.pojo.User;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigParseOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import twitter4j.Status;
-import twitter4j.Twitter;
 
 import java.io.BufferedOutputStream;
 import java.io.File;
@@ -49,13 +55,18 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
  * Retrieve all follow adjacencies from a list of user ids or names.
  */
-public class TwitterFollowingProvider extends TwitterUserInformationProvider {
+public class TwitterFollowingProvider {
 
   public static final String STREAMS_ID = "TwitterFollowingProvider";
   private static final Logger LOGGER = LoggerFactory.getLogger(TwitterFollowingProvider.class);
@@ -64,8 +75,19 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider {
 
   private TwitterFollowingConfiguration config;
 
+  protected List<String> names = new ArrayList<>();
+  protected List<Long> ids = new ArrayList<>();
+
+  protected Twitter client;
+
+  protected ListeningExecutorService executor;
+
   private List<ListenableFuture<Object>> futures = new ArrayList<>();
 
+  protected final AtomicBoolean running = new AtomicBoolean();
+
+  protected volatile Queue<StreamsDatum> providerQueue;
+
   /**
    * To use from command line:
    *
@@ -139,67 +161,137 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider {
   }
 
   public TwitterFollowingProvider(TwitterFollowingConfiguration config) {
-    super(config);
     this.config = config;
   }
 
-  @Override
   public void prepare(Object configurationObject) {
-    super.prepare(config);
+
+    Objects.requireNonNull(config);
+    Objects.requireNonNull(config.getOauth());
+    Objects.requireNonNull(config.getOauth().getConsumerKey());
+    Objects.requireNonNull(config.getOauth().getConsumerSecret());
+    Objects.requireNonNull(config.getOauth().getAccessToken());
+    Objects.requireNonNull(config.getOauth().getAccessTokenSecret());
+    Objects.requireNonNull(config.getInfo());
+    Objects.requireNonNull(config.getThreadsPerProvider());
+
+    try {
+      client = getTwitterClient();
+    } catch (InstantiationException e) {
+      LOGGER.error("InstantiationException", e);
+    }
+
+    Objects.requireNonNull(client);
+
+    try {
+      lock.writeLock().lock();
+      providerQueue = constructQueue();
+    } finally {
+      lock.writeLock().unlock();
+    }
+
+    Objects.requireNonNull(providerQueue);
+
+    // abstract this out
+    for (String s : config.getInfo()) {
+      if (s != null) {
+        String potentialScreenName = s.replaceAll("@", "").trim().toLowerCase();
+
+        // See if it is a long, if it is, add it to the user iD list, if it is not, add it to the
+        // screen name list
+        try {
+          ids.add(Long.parseLong(potentialScreenName));
+        } catch (Exception ex) {
+          names.add(potentialScreenName);
+        }
+      }
+    }
+
     Objects.requireNonNull(getConfig().getEndpoint());
+
+    executor = MoreExecutors.listeningDecorator(TwitterUserInformationProvider.newFixedThreadPoolWithQueueSize(config.getThreadsPerProvider().intValue(), ids.size()));
+
     Preconditions.checkArgument(getConfig().getEndpoint().equals("friends") || getConfig().getEndpoint().equals("followers"));
+
+    if( config.getEndpoint().equals("friends")) {
+      submitFriendsThreads(ids, names);
+    } else if( config.getEndpoint().equals("followers")) {
+      submitFollowersThreads(ids, names);
+    }
   }
 
-  @Override
   public void startStream() {
 
     Objects.requireNonNull(executor);
 
-    Preconditions.checkArgument(idsBatches.hasNext() || screenNameBatches.hasNext());
-
     LOGGER.info("startStream");
 
     running.set(true);
 
-    while (idsBatches.hasNext()) {
-      submitFollowingThreads(idsBatches.next());
-    }
-    while (screenNameBatches.hasNext()) {
-      submitFollowingThreads(screenNameBatches.next());
-    }
-
     executor.shutdown();
 
   }
 
-  protected void submitFollowingThreads(Long[] ids) {
-    Twitter client = getTwitterClient();
+  protected void submitFollowersThreads(List<Long> ids, List<String> names) {
+
+    for (Long id : ids) {
+      TwitterFollowersIdsProviderTask providerTask =
+          new TwitterFollowersIdsProviderTask(
+              this,
+              client,
+              (FollowersIdsRequest)new FollowersIdsRequest().withId(id));
+
+      ListenableFuture future = executor.submit(providerTask);
+      futures.add(future);
+      LOGGER.info("Thread Submitted: {}", providerTask.request);
+    }
+
+    for (String name : names) {
+      TwitterFollowersIdsProviderTask providerTask =
+          new TwitterFollowersIdsProviderTask(
+              this,
+              client,
+              (FollowersIdsRequest)new FollowersIdsRequest().withScreenName(name));
 
-    for (int i = 0; i < ids.length; i++) {
-      TwitterFollowingProviderTask providerTask = new TwitterFollowingProviderTask(this, client, ids[i]);
       ListenableFuture future = executor.submit(providerTask);
       futures.add(future);
-      LOGGER.info("submitted {}", ids[i]);
+      LOGGER.info("Thread Submitted: {}", providerTask.request);
     }
   }
 
-  protected void submitFollowingThreads(String[] screenNames) {
-    Twitter client = getTwitterClient();
+  protected void submitFriendsThreads(List<Long> ids, List<String> names) {
+
+    for (Long id : ids) {
+      TwitterFriendsIdsProviderTask providerTask =
+          new TwitterFriendsIdsProviderTask(
+              this,
+              client,
+              (FriendsIdsRequest)new FriendsIdsRequest().withId(id));
+
+      ListenableFuture future = executor.submit(providerTask);
+      futures.add(future);
+      LOGGER.info("Thread Submitted: {}", providerTask.request);
+    }
+
+    for (String name : names) {
+      TwitterFriendsIdsProviderTask providerTask =
+          new TwitterFriendsIdsProviderTask(
+              this,
+              client,
+              (FriendsIdsRequest)new FriendsIdsRequest().withScreenName(name));
 
-    for (int i = 0; i < screenNames.length; i++) {
-      TwitterFollowingProviderTask providerTask = new TwitterFollowingProviderTask(this, client, screenNames[i]);
       ListenableFuture future = executor.submit(providerTask);
       futures.add(future);
-      LOGGER.info("submitted {}", screenNames[i]);
+      LOGGER.info("Thread Submitted: {}", providerTask.request);
     }
+  }
 
+  protected Twitter getTwitterClient() throws InstantiationException {
+    return Twitter.getInstance(config);
   }
 
-  @Override
   public StreamsResultSet readCurrent() {
 
-    LOGGER.info("{}{} - readCurrent", idsBatches, screenNameBatches);
-
     StreamsResultSet result;
 
     try {
@@ -207,7 +299,7 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider {
       result = new StreamsResultSet(providerQueue);
       result.setCounter(new DatumStatusCounter());
       providerQueue = constructQueue();
-      LOGGER.debug("{}{} - providing {} docs", idsBatches, screenNameBatches, result.size());
+      LOGGER.debug("readCurrent: {} Documents", result.size());
     } finally {
       lock.writeLock().unlock();
     }
@@ -216,17 +308,45 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider {
 
   }
 
-  public boolean shouldContinuePulling(List<twitter4j.User> users) {
+  public boolean shouldContinuePulling(List<User> users) {
     return (users != null) && (users.size() == config.getPageSize());
   }
 
-  @Override
   public boolean isRunning() {
-    if (providerQueue.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone()) {
-      LOGGER.info("Completed");
+    if ( providerQueue.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone() ) {
+      LOGGER.info("All Threads Completed");
       running.set(false);
       LOGGER.info("Exiting");
     }
     return running.get();
   }
+
+  // abstract this out
+  protected Queue<StreamsDatum> constructQueue() {
+    return new LinkedBlockingQueue<>();
+  }
+
+  // abstract this out
+  void shutdownAndAwaitTermination(ExecutorService pool) {
+    pool.shutdown(); // Disable new tasks from being submitted
+    try {
+      // Wait a while for existing tasks to terminate
+      if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+        pool.shutdownNow(); // Cancel currently executing tasks
+        // Wait a while for tasks to respond to being cancelled
+        if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+          System.err.println("Pool did not terminate");
+        }
+      }
+    } catch (InterruptedException ie) {
+      // (Re-)Cancel if current thread also interrupted
+      pool.shutdownNow();
+      // Preserve interrupt status
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  public void cleanUp() {
+    shutdownAndAwaitTermination(executor);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
deleted file mode 100644
index 313416a..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.provider;
-
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.twitter.pojo.Follow;
-import org.apache.streams.twitter.pojo.User;
-import org.apache.streams.util.ComponentUtils;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import twitter4j.PagableResponseList;
-import twitter4j.Twitter;
-import twitter4j.TwitterException;
-import twitter4j.TwitterObjectFactory;
-
-import java.util.List;
-import java.util.Objects;
-
-/**
- *  Retrieve friend or follower connections for a single user id.
- */
-public class TwitterFollowingProviderTask implements Runnable {
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(TwitterFollowingProviderTask.class);
-
-  private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-
-  protected TwitterFollowingProvider provider;
-  private Twitter client;
-  protected Long id;
-  private String screenName;
-
-  private int count = 0;
-
-  /**
-   * TwitterFollowingProviderTask constructor.
-   * @param provider TwitterFollowingProvider
-   * @param twitter Twitter
-   * @param id numeric id
-   */
-  public TwitterFollowingProviderTask(TwitterFollowingProvider provider, Twitter twitter, Long id) {
-    this.provider = provider;
-    this.client = twitter;
-    this.id = id;
-  }
-
-  /**
-   * TwitterFollowingProviderTask constructor.
-   * @param provider TwitterFollowingProvider
-   * @param twitter Twitter
-   * @param screenName screenName
-   */
-  public TwitterFollowingProviderTask(TwitterFollowingProvider provider, Twitter twitter, String screenName) {
-    this.provider = provider;
-    this.client = twitter;
-    this.screenName = screenName;
-  }
-
-  int page_count = 0;
-  int item_count = 0;
-
-  @Override
-  public void run() {
-
-    Preconditions.checkArgument(id != null || screenName != null);
-
-    if ( id != null ) {
-      getFollowing(id);
-    } else {
-      getFollowing(screenName);
-    }
-
-    LOGGER.info(id != null ? id.toString() : screenName + " Thread Finished");
-
-  }
-
-  private void getFollowing(Long id) {
-
-    Preconditions.checkArgument(
-        provider.getConfig().getEndpoint().equals("friends")
-        || provider.getConfig().getEndpoint().equals("followers")
-    );
-
-    if ( provider.getConfig().getIdsOnly() ) {
-      collectIds(id);
-    } else {
-      collectUsers(id);
-    }
-  }
-
-  private void getFollowing(String screenName) {
-
-    twitter4j.User user = null;
-    try {
-      user = client.users().showUser(screenName);
-    } catch (TwitterException ex) {
-      LOGGER.error("Failure looking up " + id);
-    }
-    Objects.requireNonNull(user);
-    getFollowing(user.getId());
-  }
-
-  private void collectUsers(Long id) {
-    int keepTrying = 0;
-    List<twitter4j.User> list = null;
-    long curser = -1;
-
-    twitter4j.User user;
-    String userJson;
-    try {
-      user = client.users().showUser(id);
-      userJson = TwitterObjectFactory.getRawJSON(user);
-    } catch (TwitterException ex) {
-      LOGGER.error("Failure looking up " + id);
-      return;
-    }
-
-    do {
-      try {
-
-        PagableResponseList<twitter4j.User> page = null;
-        if ( provider.getConfig().getEndpoint().equals("followers") ) {
-          page = client.friendsFollowers().getFollowersList(id, curser, provider.getConfig().getPageSize().intValue());
-        } else if ( provider.getConfig().getEndpoint().equals("friends") ) {
-          page = client.friendsFollowers().getFriendsList(id, curser, provider.getConfig().getPageSize().intValue());
-        }
-
-        Objects.requireNonNull(list);
-        Preconditions.checkArgument(list.size() > 0);
-
-        for (twitter4j.User other : list) {
-
-          String otherJson = TwitterObjectFactory.getRawJSON(other);
-
-          try {
-            Follow follow = null;
-            if ( provider.getConfig().getEndpoint().equals("followers") ) {
-              follow = new Follow()
-                  .withFollowee(mapper.readValue(userJson, User.class))
-                  .withFollower(mapper.readValue(otherJson, User.class));
-            } else if ( provider.getConfig().getEndpoint().equals("friends") ) {
-              follow = new Follow()
-                  .withFollowee(mapper.readValue(otherJson, User.class))
-                  .withFollower(mapper.readValue(userJson, User.class));
-            }
-
-            Objects.requireNonNull(follow);
-
-            if ( item_count < provider.getConfig().getMaxItems()) {
-              ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue);
-              item_count++;
-            }
-
-          } catch (Exception ex) {
-            LOGGER.warn("Exception: {}", ex);
-          }
-        }
-        if ( !page.hasNext() ) {
-          break;
-        }
-        if ( page.getNextCursor() == 0 ) {
-          break;
-        }
-        curser = page.getNextCursor();
-        page_count++;
-      } catch (Exception ex) {
-        keepTrying += TwitterErrorHandler.handleTwitterError(client, null, ex);
-      }
-    }
-    while (provider.shouldContinuePulling(list) && curser != 0 && keepTrying < provider.getConfig().getRetryMax() && count < provider.getConfig().getMaxItems());
-  }
-
-  private void collectIds(Long id) {
-    int keepTrying = 0;
-
-    long curser = -1;
-
-    twitter4j.User user;
-    String userJson;
-    try {
-      user = client.users().showUser(id);
-      userJson = TwitterObjectFactory.getRawJSON(user);
-    } catch (TwitterException ex) {
-      LOGGER.error("Failure looking up " + id);
-      return;
-    }
-
-    do {
-      try {
-        twitter4j.IDs ids = null;
-        if ( provider.getConfig().getEndpoint().equals("followers") ) {
-          ids = client.friendsFollowers().getFollowersIDs(id, curser, provider.getConfig().getMaxItems().intValue());
-        } else if ( provider.getConfig().getEndpoint().equals("friends") ) {
-          ids = client.friendsFollowers().getFriendsIDs(id, curser, provider.getConfig().getMaxItems().intValue());
-        }
-
-        Objects.requireNonNull(ids);
-        Preconditions.checkArgument(ids.getIDs().length > 0);
-
-        for (long otherId : ids.getIDs()) {
-
-          try {
-            Follow follow = null;
-            if ( provider.getConfig().getEndpoint().equals("followers") ) {
-              follow = new Follow()
-                  .withFollowee(new User().withId(id))
-                  .withFollower(new User().withId(otherId));
-            } else if ( provider.getConfig().getEndpoint().equals("friends") ) {
-              follow = new Follow()
-                  .withFollowee(new User().withId(otherId))
-                  .withFollower(new User().withId(id));
-            }
-
-            Objects.requireNonNull(follow);
-
-            if ( item_count < provider.getConfig().getMaxItems()) {
-              ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue);
-              item_count++;
-            }
-          } catch (Exception ex) {
-            LOGGER.warn("Exception: {}", ex);
-          }
-        }
-        if ( !ids.hasNext() ) {
-          break;
-        }
-        if ( ids.getNextCursor() == 0 ) {
-          break;
-        }
-        curser = ids.getNextCursor();
-        page_count++;
-      } catch (TwitterException twitterException) {
-        keepTrying += TwitterErrorHandler.handleTwitterError(client, id, twitterException);
-      } catch (Exception ex) {
-        keepTrying += TwitterErrorHandler.handleTwitterError(client, null, ex);
-      }
-    }
-    while (shouldContinuePulling() && curser != 0 && keepTrying < provider.getConfig().getRetryMax() );
-  }
-
-  public boolean shouldContinuePulling() {
-    return ( item_count < provider.getConfig().getMaxItems()
-              && page_count < provider.getConfig().getMaxPages());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsIdsProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsIdsProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsIdsProviderTask.java
new file mode 100644
index 0000000..84fb789
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsIdsProviderTask.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.provider;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.twitter.api.FriendsIdsRequest;
+import org.apache.streams.twitter.api.FriendsIdsResponse;
+import org.apache.streams.twitter.api.Twitter;
+import org.apache.streams.twitter.pojo.Follow;
+import org.apache.streams.twitter.pojo.User;
+import org.apache.streams.util.ComponentUtils;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *  Retrieve friend or follower connections for a single user id.
+ */
+public class TwitterFriendsIdsProviderTask implements Runnable {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(TwitterFriendsIdsProviderTask.class);
+
+  private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+  protected Twitter client;
+  protected TwitterFollowingProvider provider;
+  protected FriendsIdsRequest request;
+
+  private int count = 0;
+
+  /**
+   * TwitterFollowingProviderTask constructor.
+   * @param provider TwitterFollowingProvider
+   * @param twitter Twitter
+   * @param request FriendsIdsRequest
+   */
+  public TwitterFriendsIdsProviderTask(TwitterFollowingProvider provider, Twitter twitter, FriendsIdsRequest request) {
+    this.provider = provider;
+    this.client = twitter;
+    this.request = request;
+  }
+
+  @Override
+  public void run() {
+
+    Preconditions.checkArgument(request.getId() != null || request.getScreenName() != null);
+
+    LOGGER.info("Thread Starting: {}", request.toString());
+
+    getFriendsIds(request);
+
+    LOGGER.info("Thread Finished: {}", request.toString());
+
+  }
+
+  int last_count = 0;
+  int page_count = 1;
+  int item_count = 0;
+  long cursor = 0;
+
+  private void getFriendsIds(FriendsIdsRequest request) {
+
+    do {
+
+      FriendsIdsResponse response = client.ids(request);
+
+      last_count = response.getIds().size();
+
+      if (response.getIds().size() > 0) {
+
+        for (Long id : response.getIds()) {
+
+          Follow follow = new Follow()
+              .withFollowee(
+                  new User()
+                      .withId(id))
+              .withFollower(
+                  new User()
+                      .withId(request.getId())
+                      .withScreenName(request.getScreenName()));
+
+          if (item_count < provider.getConfig().getMaxItems()) {
+            ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue);
+            item_count++;
+          }
+
+        }
+
+      }
+      page_count++;
+      cursor = response.getNextCursor();
+      request.setCursor(cursor);
+
+    }
+    while (shouldContinuePulling(cursor, last_count, page_count, item_count));
+  }
+
+  public boolean shouldContinuePulling(long cursor, int count, int page_count, int item_count) {
+    return (
+        cursor > 0
+            && count > 0
+            && item_count < provider.getConfig().getMaxItems()
+            && page_count <= provider.getConfig().getMaxPages());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsListProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsListProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsListProviderTask.java
new file mode 100644
index 0000000..570705d
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsListProviderTask.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.provider;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.twitter.api.FriendsListRequest;
+import org.apache.streams.twitter.api.FriendsListResponse;
+import org.apache.streams.twitter.api.Twitter;
+import org.apache.streams.twitter.pojo.Follow;
+import org.apache.streams.twitter.pojo.User;
+import org.apache.streams.util.ComponentUtils;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *  Retrieve friend or follower connections for a single user id.
+ */
+public class TwitterFriendsListProviderTask implements Runnable {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(TwitterFriendsListProviderTask.class);
+
+  private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+  protected Twitter client;
+  protected TwitterFollowingProvider provider;
+  protected FriendsListRequest request;
+
+  private int count = 0;
+
+  /**
+   * TwitterFollowingProviderTask constructor.
+   * @param provider TwitterFollowingProvider
+   * @param twitter Twitter
+   * @param request FriendsListRequest
+   */
+  public TwitterFriendsListProviderTask(TwitterFollowingProvider provider, Twitter twitter, FriendsListRequest request) {
+    this.provider = provider;
+    this.client = twitter;
+    this.request = request;
+  }
+
+  int last_count = 0;
+  int page_count = 1;
+  int item_count = 0;
+  long cursor = 0;
+
+  @Override
+  public void run() {
+
+    Preconditions.checkArgument(request.getId() != null || request.getScreenName() != null);
+
+    LOGGER.info(request.getId() != null ? request.getId().toString() : request.getScreenName() + " Thread Finished");
+
+  }
+
+  private void getFriendsList(FriendsListRequest request) {
+
+    do {
+
+      FriendsListResponse response = client.list(request);
+
+      last_count = response.getUsers().size();
+
+      if (response.getUsers().size() > 0) {
+
+        for (User friend : response.getUsers()) {
+
+          Follow follow = new Follow()
+              .withFollower(friend)
+              .withFollowee(
+                  new User()
+                      .withId(request.getId())
+                      .withScreenName(request.getScreenName()));
+
+          if (item_count < provider.getConfig().getMaxItems()) {
+            ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue);
+            item_count++;
+          }
+
+        }
+
+      }
+      page_count++;
+      cursor = response.getNextCursor();
+      request.setCursor(cursor);
+
+    }
+    while (shouldContinuePulling(cursor, last_count, page_count, item_count));
+  }
+
+  public boolean shouldContinuePulling(long cursor, int count, int page_count, int item_count) {
+    return (
+        cursor > 0
+            && count > 0
+            && item_count < provider.getConfig().getMaxItems()
+            && page_count <= provider.getConfig().getMaxPages());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index 9f76fed..217b3d8 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -27,7 +27,11 @@ import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.twitter.TwitterTimelineProviderConfiguration;
+import org.apache.streams.twitter.api.StatusesUserTimelineRequest;
+import org.apache.streams.twitter.api.Twitter;
+import org.apache.streams.twitter.api.UsersLookupRequest;
 import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
+import org.apache.streams.twitter.pojo.User;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -44,12 +48,6 @@ import org.apache.commons.lang.NotImplementedException;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import twitter4j.Status;
-import twitter4j.Twitter;
-import twitter4j.TwitterException;
-import twitter4j.TwitterFactory;
-import twitter4j.User;
-import twitter4j.conf.ConfigurationBuilder;
 
 import java.io.BufferedOutputStream;
 import java.io.File;
@@ -92,8 +90,8 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
     return config;
   }
 
-  protected Collection<String[]> screenNameBatches;
-  protected Collection<Long> ids;
+  protected List<String> names = new ArrayList<>();
+  protected List<Long> ids = new ArrayList<>();
 
   protected volatile Queue<StreamsDatum> providerQueue;
 
@@ -109,9 +107,6 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
 
   private List<ListenableFuture<Object>> futures = new ArrayList<>();
 
-  Boolean jsonStoreEnabled;
-  Boolean includeEntitiesEnabled;
-
   /**
    * To use from command line:
    *
@@ -207,14 +202,34 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
     Objects.requireNonNull(config.getOauth().getAccessToken());
     Objects.requireNonNull(config.getOauth().getAccessTokenSecret());
     Objects.requireNonNull(config.getInfo());
+    Objects.requireNonNull(config.getThreadsPerProvider());
+
+    try {
+      client = getTwitterClient();
+    } catch (InstantiationException e) {
+      LOGGER.error("InstantiationException", e);
+    }
+
+    Objects.requireNonNull(client);
 
-    consolidateToIDs();
+    for (String s : config.getInfo()) {
+      if (s != null) {
+        String potentialScreenName = s.replaceAll("@", "").trim().toLowerCase();
 
-    if (ids.size() > 1) {
-      executor = MoreExecutors.listeningDecorator(TwitterUserInformationProvider.newFixedThreadPoolWithQueueSize(5, ids.size()));
-    } else {
-      executor = MoreExecutors.listeningDecorator(newSingleThreadExecutor());
+        // See if it is a long, if it is, add it to the user iD list, if it is not, add it to the
+        // screen name list
+        try {
+          ids.add(Long.parseLong(potentialScreenName));
+        } catch (Exception ex) {
+          names.add(potentialScreenName);
+        }
+      }
     }
+
+    executor = MoreExecutors.listeningDecorator(TwitterUserInformationProvider.newFixedThreadPoolWithQueueSize(config.getThreadsPerProvider().intValue(), config.getInfo().size()));
+
+    submitTimelineThreads(ids, names);
+
   }
 
   @Override
@@ -222,42 +237,40 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
 
     LOGGER.debug("{} startStream", STREAMS_ID);
 
-    Preconditions.checkArgument(!ids.isEmpty());
-
     running.set(true);
 
-    submitTimelineThreads(ids.toArray(new Long[0]));
-
     executor.shutdown();
 
   }
 
-  protected void submitTimelineThreads(Long[] ids) {
-
-    Twitter client = getTwitterClient();
-
-    for (int i = 0; i < ids.length; i++) {
-
-      TwitterTimelineProviderTask providerTask = new TwitterTimelineProviderTask(this, client, ids[i]);
+  protected void submitTimelineThreads(List<Long> ids, List<String> names) {
+
+    for (Long id : ids) {
+      StatusesUserTimelineRequest request = new StatusesUserTimelineRequest();
+      request.setUserId(id);
+      request.setCount(config.getPageSize());
+      TwitterTimelineProviderTask providerTask = new TwitterTimelineProviderTask(
+          this,
+          client,
+          request
+      );
       ListenableFuture future = executor.submit(providerTask);
       futures.add(future);
-      LOGGER.info("submitted {}", ids[i]);
+      LOGGER.info("Thread Submitted: {}", providerTask.request);
     }
-
-  }
-
-  private Collection<Long> retrieveIds(String[] screenNames) {
-    Twitter client = getTwitterClient();
-
-    List<Long> ids = new ArrayList<>();
-    try {
-      for (User twitterUser : client.lookupUsers(screenNames)) {
-        ids.add(twitterUser.getId());
-      }
-    } catch (TwitterException ex) {
-      LOGGER.error("Failure retrieving user details.", ex.getMessage());
+    for (String name : names) {
+      StatusesUserTimelineRequest request = new StatusesUserTimelineRequest();
+      request.setScreenName(name);
+      request.setCount(config.getPageSize());
+      TwitterTimelineProviderTask providerTask = new TwitterTimelineProviderTask(
+          this,
+          client,
+          request
+      );
+      ListenableFuture future = executor.submit(providerTask);
+      futures.add(future);
+      LOGGER.info("Thread Submitted: {}", providerTask.request);
     }
-    return ids;
   }
 
   @Override
@@ -302,66 +315,14 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
     throw new NotImplementedException();
   }
 
-
-
-  /**
-   * Using the "info" list that is contained in the configuration, ensure that all
-   * account identifiers are converted to IDs (Longs) instead of screenNames (Strings).
-   */
-  protected void consolidateToIDs() {
-    List<String> screenNames = new ArrayList<>();
-    ids = new ArrayList<>();
-
-    for ( String account : config.getInfo() ) {
-      try {
-        if ( new Long(account) != null ) {
-          ids.add(Long.parseLong(Objects.toString(account, null)));
-        }
-      } catch ( NumberFormatException ex ) {
-        screenNames.add(account);
-      } catch ( Exception ex ) {
-        LOGGER.error("Exception while trying to add ID: {{}}, {}", account, ex);
-      }
-    }
-
-    // Twitter allows for batches up to 100 per request, but you cannot mix types
-    screenNameBatches = new ArrayList<>();
-    while ( screenNames.size() >= 100 ) {
-      screenNameBatches.add(screenNames.subList(0, 100).toArray(new String[0]));
-      screenNames = screenNames.subList(100, screenNames.size());
-    }
-
-    if (screenNames.size() > 0) {
-      screenNameBatches.add(screenNames.toArray(new String[ids.size()]));
-    }
-
-    for ( String[] screenNameBatche : screenNameBatches ) {
-      Collection<Long> batchIds = retrieveIds(screenNameBatche);
-      ids.addAll(batchIds);
-    }
-  }
-
   /**
    * get Twitter Client from TwitterUserInformationConfiguration.
    * @return result
    */
-  public Twitter getTwitterClient() {
-
-    String baseUrl = TwitterProviderUtil.baseUrl(config);
-
-    ConfigurationBuilder builder = new ConfigurationBuilder()
-        .setOAuthConsumerKey(config.getOauth().getConsumerKey())
-        .setOAuthConsumerSecret(config.getOauth().getConsumerSecret())
-        .setOAuthAccessToken(config.getOauth().getAccessToken())
-        .setOAuthAccessTokenSecret(config.getOauth().getAccessTokenSecret())
-        .setIncludeEntitiesEnabled(true)
-        .setJSONStoreEnabled(true)
-        .setAsyncNumThreads(3)
-        .setRestBaseURL(baseUrl)
-        .setIncludeMyRetweetEnabled(Boolean.TRUE)
-        .setPrettyDebugEnabled(Boolean.TRUE);
-
-    return new TwitterFactory(builder.build()).getInstance();
+  public Twitter getTwitterClient() throws InstantiationException {
+
+    return Twitter.getInstance(config);
+
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
index 8cb2b46..ffb90b7 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
@@ -20,17 +20,15 @@ package org.apache.streams.twitter.provider;
 
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.twitter.api.StatusesUserTimelineRequest;
+import org.apache.streams.twitter.api.Twitter;
 import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
+import org.apache.streams.twitter.pojo.Tweet;
 import org.apache.streams.util.ComponentUtils;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import twitter4j.Paging;
-import twitter4j.ResponseList;
-import twitter4j.Status;
-import twitter4j.Twitter;
-import twitter4j.TwitterObjectFactory;
 
 import java.util.List;
 import java.util.stream.Collectors;
@@ -47,81 +45,64 @@ public class TwitterTimelineProviderTask implements Runnable {
 
   protected TwitterTimelineProvider provider;
   protected Twitter client;
-  protected Long id;
+  protected StatusesUserTimelineRequest request;
 
   /**
    * TwitterTimelineProviderTask constructor.
    * @param provider TwitterTimelineProvider
    * @param twitter Twitter
-   * @param id Long
+   * @param request StatusesUserTimelineRequest
    */
-  public TwitterTimelineProviderTask(TwitterTimelineProvider provider, Twitter twitter, Long id) {
+  public TwitterTimelineProviderTask(TwitterTimelineProvider provider, Twitter twitter, StatusesUserTimelineRequest request) {
     this.provider = provider;
     this.client = twitter;
-    this.id = id;
+    this.request = request;
   }
 
-  int page_count = 1;
   int item_count = 0;
-  List<Status> lastPage = null;
+  int last_count = 0;
+  int page_count = 1;
 
   @Override
   public void run() {
 
-    Paging paging = new Paging(page_count, provider.getConfig().getPageSize().intValue());
-
-    LOGGER.info(id + " Thread Starting");
+    LOGGER.info("Thread Starting: {}", request.toString());
 
     do {
-      int keepTrying = 0;
-
-      // keep trying to load, give it 5 attempts.
-      //This value was chosen because it seemed like a reasonable number of times
-      //to retry capturing a timeline given the sorts of errors that could potentially
-      //occur (network timeout/interruption, faulty client, etc.)
-      while (keepTrying < 5) {
-
-        try {
-          this.client = provider.getTwitterClient();
 
-          ResponseList<Status> statuses = client.getUserTimeline(id, paging);
+      List<Tweet> statuses = client.userTimeline(request);
 
-          for (Status twitterStatus : statuses) {
+      last_count = statuses.size();
+      if( statuses.size() > 0 ) {
 
-            String json = TwitterObjectFactory.getRawJSON(twitterStatus);
-
-            if ( item_count < provider.getConfig().getMaxItems() ) {
-              try {
-                org.apache.streams.twitter.pojo.Tweet tweet = MAPPER.readValue(json, org.apache.streams.twitter.pojo.Tweet.class);
-                ComponentUtils.offerUntilSuccess(new StreamsDatum(tweet), provider.providerQueue);
-              } catch (Exception exception) {
-                LOGGER.warn("Failed to read document as Tweet ", twitterStatus);
-              }
-              item_count++;
-            }
+        for (Tweet status : statuses) {
 
+          if (item_count < provider.getConfig().getMaxItems()) {
+            ComponentUtils.offerUntilSuccess(new StreamsDatum(status), provider.providerQueue);
+            item_count++;
           }
 
-          lastPage = statuses;
-          page_count = paging.getPage() + 1;
-          paging.setPage(page_count);
-
-          keepTrying = 10;
-        } catch (Exception ex) {
-          keepTrying += TwitterErrorHandler.handleTwitterError(client, id, ex);
         }
+
+        Stream<Long> statusIds = statuses.stream().map(status -> status.getId());
+        long minId = statusIds.reduce(Math::min).get();
+        page_count++;
+        request.setMaxId(minId);
+
       }
+
     }
-    while (shouldContinuePulling());
+    while (shouldContinuePulling(last_count, page_count, item_count));
 
-    LOGGER.info(id + " Thread Finished");
+    LOGGER.info("Thread Finished: {}", request.toString());
 
   }
 
-  public boolean shouldContinuePulling() {
-    return (lastPage != null)
-        && item_count < provider.getConfig().getMaxItems()
-        && page_count <= provider.getConfig().getMaxPages();
+  public boolean shouldContinuePulling(int count, int page_count, int item_count) {
+    return (
+        count > 0
+            && item_count < provider.getConfig().getMaxItems()
+            && page_count <= provider.getConfig().getMaxPages());
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
index 214d204..1a7b906 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
@@ -28,6 +28,8 @@ import org.apache.streams.core.StreamsResultSet;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.twitter.TwitterFollowingConfiguration;
 import org.apache.streams.twitter.TwitterUserInformationConfiguration;
+import org.apache.streams.twitter.api.Twitter;
+import org.apache.streams.twitter.api.UsersLookupRequest;
 import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
 import org.apache.streams.twitter.pojo.User;
 import org.apache.streams.util.ComponentUtils;
@@ -35,6 +37,8 @@ import org.apache.streams.util.ComponentUtils;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.Uninterruptibles;
@@ -45,10 +49,6 @@ import org.apache.commons.lang.NotImplementedException;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import twitter4j.Twitter;
-import twitter4j.TwitterFactory;
-import twitter4j.conf.ConfigurationBuilder;
-import twitter4j.json.DataObjectFactory;
 
 import java.io.BufferedOutputStream;
 import java.io.File;
@@ -88,6 +88,32 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
 
   private TwitterUserInformationConfiguration config;
 
+  protected List<String> names = new ArrayList<>();
+  protected List<Long> ids = new ArrayList<>();
+
+  protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+  protected volatile Queue<StreamsDatum> providerQueue;
+
+  public TwitterUserInformationConfiguration getConfig() {
+    return config;
+  }
+
+  public void setConfig(TwitterUserInformationConfiguration config) {
+    this.config = config;
+  }
+
+  protected Twitter client;
+
+  protected ListeningExecutorService executor;
+
+  protected DateTime start;
+  protected DateTime end;
+
+  protected final AtomicBoolean running = new AtomicBoolean();
+
+  private List<ListenableFuture<Object>> futures = new ArrayList<>();
+
   /**
    * To use from command line:
    *
@@ -148,28 +174,6 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
     outStream.flush();
   }
 
-  protected final ReadWriteLock lock = new ReentrantReadWriteLock();
-
-  protected volatile Queue<StreamsDatum> providerQueue;
-
-  public TwitterUserInformationConfiguration getConfig() {
-    return config;
-  }
-
-  public void setConfig(TwitterUserInformationConfiguration config) {
-    this.config = config;
-  }
-
-  protected Iterator<Long[]> idsBatches;
-  protected Iterator<String[]> screenNameBatches;
-
-  protected ListeningExecutorService executor;
-
-  protected DateTime start;
-  protected DateTime end;
-
-  protected final AtomicBoolean running = new AtomicBoolean();
-
   // TODO: this should be abstracted out
   public static ExecutorService newFixedThreadPoolWithQueueSize(int numThreads, int queueSize) {
     return new ThreadPoolExecutor(numThreads, numThreads,
@@ -212,6 +216,15 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
     Objects.requireNonNull(config.getOauth().getAccessToken());
     Objects.requireNonNull(config.getOauth().getAccessTokenSecret());
     Objects.requireNonNull(config.getInfo());
+    Objects.requireNonNull(config.getThreadsPerProvider());
+
+    try {
+      client = getTwitterClient();
+    } catch (InstantiationException e) {
+      LOGGER.error("InstantiationException", e);
+    }
+
+    Objects.requireNonNull(client);
 
     try {
       lock.writeLock().lock();
@@ -222,12 +235,6 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
 
     Objects.requireNonNull(providerQueue);
 
-    List<String> screenNames = new ArrayList<>();
-    List<String[]> screenNameBatches = new ArrayList<>();
-
-    List<Long> ids = new ArrayList<>();
-    List<Long[]> idsBatches = new ArrayList<>();
-
     for (String s : config.getInfo()) {
       if (s != null) {
         String potentialScreenName = s.replaceAll("@", "").trim().toLowerCase();
@@ -237,46 +244,67 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
         try {
           ids.add(Long.parseLong(potentialScreenName));
         } catch (Exception ex) {
-          screenNames.add(potentialScreenName);
-        }
-
-        // Twitter allows for batches up to 100 per request, but you cannot mix types
-
-        if (ids.size() >= 100) {
-          // add the batch
-          idsBatches.add(ids.toArray(new Long[ids.size()]));
-          // reset the Ids
-          ids = new ArrayList<>();
-        }
-
-        if (screenNames.size() >= 100) {
-          // add the batch
-          screenNameBatches.add(screenNames.toArray(new String[ids.size()]));
-          // reset the Ids
-          screenNames = new ArrayList<>();
+          names.add(potentialScreenName);
         }
       }
     }
 
+    executor = MoreExecutors.listeningDecorator(TwitterUserInformationProvider.newFixedThreadPoolWithQueueSize(config.getThreadsPerProvider().intValue(), ids.size()));
 
-    if (ids.size() > 0) {
-      idsBatches.add(ids.toArray(new Long[ids.size()]));
-    }
+    Objects.requireNonNull(executor);
 
-    if (screenNames.size() > 0) {
-      screenNameBatches.add(screenNames.toArray(new String[ids.size()]));
-    }
+    // Twitter allows for batches up to 100 per request, but you cannot mix types
+    submitUserInformationThreads(ids, names);
+  }
 
-    if (ids.size() + screenNames.size() > 0) {
-      executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, (ids.size() + screenNames.size())));
-    } else {
-      executor = MoreExecutors.listeningDecorator(newSingleThreadExecutor());
+  protected void submitUserInformationThreads(List<Long> ids, List<String> names) {
+
+    int idsIndex = 0;
+    while( idsIndex + 100 < ids.size() ) {
+      List<Long> batchIds = ids.subList(idsIndex, idsIndex + 100);
+      TwitterUserInformationProviderTask providerTask = new TwitterUserInformationProviderTask(
+          this,
+          client,
+          new UsersLookupRequest().withUserId(batchIds));
+      ListenableFuture future = executor.submit(providerTask);
+      futures.add(future);
+      LOGGER.info("Thread Submitted: {}", providerTask.request);
+      idsIndex += 100;
+    }
+    if (ids.size() >= idsIndex) {
+      List<Long> batchIds = ids.subList(idsIndex, ids.size());
+      TwitterUserInformationProviderTask providerTask = new TwitterUserInformationProviderTask(
+          this,
+          client,
+          new UsersLookupRequest().withUserId(batchIds));
+      ListenableFuture future = executor.submit(providerTask);
+      futures.add(future);
+      LOGGER.info("Thread Submitted: {}", providerTask.request);
     }
 
-    Objects.requireNonNull(executor);
+    int namesIndex = 0;
+    while( idsIndex + 100 < ids.size() ) {
+      List<String> batchNames = names.subList(namesIndex, namesIndex + 100);
+      TwitterUserInformationProviderTask providerTask = new TwitterUserInformationProviderTask(
+          this,
+          client,
+          new UsersLookupRequest().withScreenName(batchNames));
+      ListenableFuture future = executor.submit(providerTask);
+      futures.add(future);
+      LOGGER.info("Thread Submitted: {}", providerTask.request);
+      namesIndex += 100;
+    }
+    if (names.size() >= idsIndex) {
+      List<Long> batchNames = ids.subList(idsIndex, names.size());
+      TwitterUserInformationProviderTask providerTask = new TwitterUserInformationProviderTask(
+          this,
+          client,
+          new UsersLookupRequest().withUserId(batchNames));
+      ListenableFuture future = executor.submit(providerTask);
+      futures.add(future);
+      LOGGER.info("Thread Submitted: {}", providerTask.request);
+    }
 
-    this.idsBatches = idsBatches.iterator();
-    this.screenNameBatches = screenNameBatches.iterator();
   }
 
   @Override
@@ -284,82 +312,16 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
 
     Objects.requireNonNull(executor);
 
-    Preconditions.checkArgument(idsBatches.hasNext() || screenNameBatches.hasNext());
-
-    LOGGER.info("{}{} - startStream", idsBatches, screenNameBatches);
-
-    while (idsBatches.hasNext()) {
-      loadBatch(idsBatches.next());
-    }
-
-    while (screenNameBatches.hasNext()) {
-      loadBatch(screenNameBatches.next());
-    }
+    LOGGER.info("startStream: {} Threads", futures.size());
 
     running.set(true);
 
     executor.shutdown();
   }
 
-  protected void loadBatch(Long[] ids) {
-    Twitter client = getTwitterClient();
-    int keepTrying = 0;
-
-    // keep trying to load, give it 5 attempts.
-    //while (keepTrying < 10)
-    while (keepTrying < 1) {
-      try {
-        long[] toQuery = new long[ids.length];
-
-        for (int i = 0; i < ids.length; i++) {
-          toQuery[i] = ids[i];
-        }
-
-        for (twitter4j.User twitterUser : client.lookupUsers(toQuery)) {
-          String json = DataObjectFactory.getRawJSON(twitterUser);
-          try {
-            User user = MAPPER.readValue(json, org.apache.streams.twitter.pojo.User.class);
-            ComponentUtils.offerUntilSuccess(new StreamsDatum(user), providerQueue);
-          } catch (Exception exception) {
-            LOGGER.warn("Failed to read document as User ", twitterUser);
-          }
-        }
-        keepTrying = 10;
-      } catch (Exception ex) {
-        keepTrying += TwitterErrorHandler.handleTwitterError(client, ex);
-      }
-    }
-  }
-
-  protected void loadBatch(String[] ids) {
-    Twitter client = getTwitterClient();
-    int keepTrying = 0;
-
-    // keep trying to load, give it 5 attempts.
-    //while (keepTrying < 10)
-    while (keepTrying < 1) {
-      try {
-        for (twitter4j.User twitterUser : client.lookupUsers(ids)) {
-          String json = DataObjectFactory.getRawJSON(twitterUser);
-          try {
-            User user = MAPPER.readValue(json, org.apache.streams.twitter.pojo.User.class);
-            ComponentUtils.offerUntilSuccess(new StreamsDatum(user), providerQueue);
-          } catch (Exception exception) {
-            LOGGER.warn("Failed to read document as User ", twitterUser);
-          }
-        }
-        keepTrying = 10;
-      } catch (Exception ex) {
-        keepTrying += TwitterErrorHandler.handleTwitterError(client, ex);
-      }
-    }
-  }
-
   @Override
   public StreamsResultSet readCurrent() {
 
-    LOGGER.debug("{}{} - readCurrent", idsBatches, screenNameBatches);
-
     StreamsResultSet result;
 
     try {
@@ -367,7 +329,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
       result = new StreamsResultSet(providerQueue);
       result.setCounter(new DatumStatusCounter());
       providerQueue = constructQueue();
-      LOGGER.debug("{}{} - providing {} docs", idsBatches, screenNameBatches, result.size());
+      LOGGER.debug("readCurrent: {} Documents", result.size());
     } finally {
       lock.writeLock().unlock();
     }
@@ -394,17 +356,14 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
     return (StreamsResultSet)providerQueue.iterator();
   }
 
+
   @Override
   public boolean isRunning() {
-
-    if ( providerQueue.isEmpty() && executor.isTerminated() ) {
-      LOGGER.info("{}{} - completed", idsBatches, screenNameBatches);
-
+    if ( providerQueue.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone() ) {
+      LOGGER.info("All Threads Completed");
       running.set(false);
-
       LOGGER.info("Exiting");
     }
-
     return running.get();
   }
 
@@ -427,29 +386,8 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
     }
   }
 
-
-  // TODO: abstract out, also appears in TwitterTimelineProvider
-  protected Twitter getTwitterClient() {
-    String baseUrl = TwitterProviderUtil.baseUrl(config);
-
-    ConfigurationBuilder builder = new ConfigurationBuilder()
-        .setOAuthConsumerKey(config.getOauth().getConsumerKey())
-        .setOAuthConsumerSecret(config.getOauth().getConsumerSecret())
-        .setOAuthAccessToken(config.getOauth().getAccessToken())
-        .setOAuthAccessTokenSecret(config.getOauth().getAccessTokenSecret())
-        .setIncludeEntitiesEnabled(true)
-        .setJSONStoreEnabled(true)
-        .setAsyncNumThreads(3)
-        .setRestBaseURL(baseUrl)
-        .setIncludeMyRetweetEnabled(Boolean.TRUE)
-        .setPrettyDebugEnabled(Boolean.TRUE);
-
-    return new TwitterFactory(builder.build()).getInstance();
-  }
-
-  protected void callback() {
-
-
+  protected Twitter getTwitterClient() throws InstantiationException {
+    return Twitter.getInstance(config);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProviderTask.java
new file mode 100644
index 0000000..5dbb784
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProviderTask.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.provider;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.twitter.api.Twitter;
+import org.apache.streams.twitter.api.UsersLookupRequest;
+import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
+import org.apache.streams.twitter.pojo.User;
+import org.apache.streams.util.ComponentUtils;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ *  Retrieve recent posts for a single user id.
+ */
+public class TwitterUserInformationProviderTask implements Runnable {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(TwitterUserInformationProviderTask.class);
+
+  private static ObjectMapper MAPPER = new StreamsJacksonMapper(Stream.of(TwitterDateTimeFormat.TWITTER_FORMAT).collect(Collectors.toList()));
+
+  protected TwitterUserInformationProvider provider;
+  protected Twitter client;
+  protected UsersLookupRequest request;
+
+  /**
+   * TwitterTimelineProviderTask constructor.
+   * @param provider TwitterUserInformationProvider
+   * @param twitter Twitter
+   * @param request UsersLookupRequest
+   */
+  public TwitterUserInformationProviderTask(TwitterUserInformationProvider provider, Twitter twitter, UsersLookupRequest request) {
+    this.provider = provider;
+    this.client = twitter;
+    this.request = request;
+  }
+
+  @Override
+  public void run() {
+
+    LOGGER.info("Thread Starting: {}", request.toString());
+
+    List<User> users = client.lookup(request);
+
+    for (User user : users) {
+      ComponentUtils.offerUntilSuccess(new StreamsDatum(user), provider.providerQueue);
+    }
+
+    LOGGER.info("Thread Finished: {}", request.toString());
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterConfiguration.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterConfiguration.json
deleted file mode 100644
index 69048d1..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterConfiguration.json
+++ /dev/null
@@ -1,87 +0,0 @@
-{
-    "$schema": "http://json-schema.org/draft-03/schema",
-    "$license": [
-        "http://www.apache.org/licenses/LICENSE-2.0"
-    ],
-    "id": "#",
-    "type": "object",
-    "javaType" : "org.apache.streams.twitter.TwitterConfiguration",
-    "javaInterfaces": ["java.io.Serializable"],
-    "properties": {
-        "protocol": {
-            "type": "string",
-            "description": "The protocol",
-            "default": "https"
-        },
-        "host": {
-            "type": "string",
-            "description": "The host",
-            "default": "api.twitter.com"
-        },
-        "port": {
-            "type": "integer",
-            "description": "The port",
-            "default": 443
-        },
-        "version": {
-            "type": "string",
-            "description": "The version",
-            "default": "1.1"
-        },
-        "endpoint": {
-            "type": "string",
-            "description": "The endpoint"
-        },
-        "jsonStoreEnabled": {
-            "default" : true,
-            "type": "string"
-        },
-        "oauth": {
-            "type": "object",
-            "dynamic": "true",
-            "javaType" : "org.apache.streams.twitter.TwitterOAuthConfiguration",
-            "javaInterfaces": ["java.io.Serializable"],
-            "properties": {
-                "appName": {
-                    "type": "string"
-                },
-                "consumerKey": {
-                    "type": "string"
-                },
-                "consumerSecret": {
-                    "type": "string"
-                },
-                "accessToken": {
-                    "type": "string"
-                },
-                "accessTokenSecret": {
-                    "type": "string"
-                }
-            }
-        },
-        "basicauth": {
-            "type": "object",
-            "dynamic": "true",
-            "javaType" : "org.apache.streams.twitter.TwitterBasicAuthConfiguration",
-            "javaInterfaces": ["java.io.Serializable"],
-            "properties": {
-                "username": {
-                    "type": "string"
-                },
-                "password": {
-                    "type": "string"
-                }
-            }
-        },
-        "retrySleepMs": {
-             "type": "integer",
-             "description": "ms to sleep when hitting a rate limit",
-             "default": 100000
-         },
-         "retryMax": {
-             "type": "integer",
-             "description": "ms to sleep when hitting a rate limit",
-             "default": 10
-        }
-   }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterFollowingConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterFollowingConfiguration.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterFollowingConfiguration.json
deleted file mode 100644
index 89fc7af..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterFollowingConfiguration.json
+++ /dev/null
@@ -1,33 +0,0 @@
-{
-    "$schema": "http://json-schema.org/draft-03/schema",
-    "$license": [
-        "http://www.apache.org/licenses/LICENSE-2.0"
-    ],
-    "id": "#",
-    "type": "object",
-    "javaType" : "org.apache.streams.twitter.TwitterFollowingConfiguration",
-    "extends": {"$ref":"TwitterUserInformationConfiguration.json"},
-    "javaInterfaces": ["java.io.Serializable"],
-    "properties": {
-        "ids_only": {
-            "type": "boolean",
-            "description": "Whether to collect ids only, or full profiles",
-            "default": "true"
-        },
-        "max_items": {
-            "type": "integer",
-            "description": "Max items per user to collect",
-            "default": 50000
-        },
-        "max_pages": {
-            "type": "integer",
-            "description": "Max pages per user to request",
-            "default": 10
-        },
-        "page_size": {
-            "type": "integer",
-            "description": "Max items per page to request",
-            "default": 5000
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterStreamConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterStreamConfiguration.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterStreamConfiguration.json
deleted file mode 100644
index 6fa2a73..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterStreamConfiguration.json
+++ /dev/null
@@ -1,45 +0,0 @@
-{
-    "$schema": "http://json-schema.org/draft-03/schema",
-    "$license": [
-        "http://www.apache.org/licenses/LICENSE-2.0"
-    ],
-    "id": "#",
-    "type": "object",
-    "javaType" : "org.apache.streams.twitter.TwitterStreamConfiguration",
-    "extends": {"$ref":"TwitterConfiguration.json"},
-    "javaInterfaces": ["java.io.Serializable"],
-    "properties": {
-        "includeEntities": {
-            "type": "string"
-        },
-        "truncated": {
-            "type": "boolean"
-        },
-        "filter-level": {
-            "type": "string",
-            "description": "Setting this parameter to one of none, low, or medium will set the minimum value of the filter_level Tweet attribute required to be included in the stream"
-        },
-        "with": {
-            "type": "string",
-            "description": "Typically following or user"
-        },
-        "replies": {
-            "type": "string",
-            "description": "Set to all, to see all @replies"
-        },
-        "follow": {
-            "type": "array",
-            "description": "A list of user IDs, indicating the users whose Tweets should be delivered on the stream",
-            "items": {
-                "type": "integer"
-            }
-        },
-        "track": {
-            "type": "array",
-            "description": "A list of phrases which will be used to determine what Tweets will be delivered on the stream",
-            "items": {
-                "type": "string"
-            }
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterTimelineProviderConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterTimelineProviderConfiguration.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterTimelineProviderConfiguration.json
deleted file mode 100644
index 37ed60e..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterTimelineProviderConfiguration.json
+++ /dev/null
@@ -1,23 +0,0 @@
-{
-    "$schema": "http://json-schema.org/draft-03/schema",
-    "$license": [
-        "http://www.apache.org/licenses/LICENSE-2.0"
-    ],
-    "id": "#",
-    "type": "object",
-    "javaType" : "org.apache.streams.twitter.TwitterTimelineProviderConfiguration",
-    "extends": {"$ref":"TwitterUserInformationConfiguration.json"},
-    "javaInterfaces": ["java.io.Serializable"],
-    "properties": {
-        "max_items": {
-            "type": "integer",
-            "description": "Max items per user to collect",
-            "default": 3200
-        },
-        "max_pages": {
-            "type": "integer",
-            "description": "Max items per page to request",
-            "default": 16
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterUserInformationConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterUserInformationConfiguration.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterUserInformationConfiguration.json
deleted file mode 100644
index 405c87a..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterUserInformationConfiguration.json
+++ /dev/null
@@ -1,25 +0,0 @@
-{
-    "$schema": "http://json-schema.org/draft-03/schema",
-    "$license": [
-        "http://www.apache.org/licenses/LICENSE-2.0"
-    ],
-    "id": "#",
-    "type": "object",
-    "javaType" : "org.apache.streams.twitter.TwitterUserInformationConfiguration",
-    "extends": {"$ref":"TwitterConfiguration.json"},
-    "javaInterfaces": ["java.io.Serializable"],
-    "properties": {
-        "info": {
-            "type": "array",
-            "description": "A list of user IDs, indicating the users whose Tweets should be delivered on the stream",
-            "items": {
-                "type": "string"
-            }
-        },
-        "page_size": {
-            "type": "integer",
-            "description": "Max items per page to request",
-            "default": 200
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowersIdsRequest.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowersIdsRequest.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowersIdsRequest.json
new file mode 100644
index 0000000..2b98689
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowersIdsRequest.json
@@ -0,0 +1,13 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "id": "#",
+  "type": "object",
+  "javaType" : "org.apache.streams.twitter.api.FollowersIdsRequest",
+  "javaInterfaces": ["java.io.Serializable"],
+  "description": "https://dev.twitter.com/rest/reference/get/followers/ids",
+  "extends": { "$ref": "FollowingIdsRequest.json" },
+  "properties": {}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowersIdsResponse.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowersIdsResponse.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowersIdsResponse.json
new file mode 100644
index 0000000..03dd83c
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowersIdsResponse.json
@@ -0,0 +1,32 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "id": "#",
+  "javaType": "org.apache.streams.twitter.api.FollowersIdsResponse",
+  "javaInterfaces": [
+    "java.io.Serializable"
+  ],
+  "description": "https://dev.twitter.com/rest/reference/get/followers/ids",
+  "properties": {
+    "ids": {
+      "type": "array",
+      "items": {
+        "type": "integer"
+      }
+    },
+    "previous_cursor": {
+      "type": "integer"
+    },
+    "previous_cursor_str": {
+      "type": "string"
+    },
+    "next_cursor": {
+      "type": "integer"
+    },
+    "next_cursor_str": {
+      "type": "string"
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowersListRequest.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowersListRequest.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowersListRequest.json
new file mode 100644
index 0000000..c588579
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowersListRequest.json
@@ -0,0 +1,13 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "id": "#",
+  "type": "object",
+  "javaType" : "org.apache.streams.twitter.api.FollowersListRequest",
+  "javaInterfaces": ["java.io.Serializable"],
+  "description": "https://dev.twitter.com/rest/reference/get/followers/list",
+  "extends": { "$ref": "FollowingListRequest.json" },
+  "properties": {}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowersListResponse.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowersListResponse.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowersListResponse.json
new file mode 100644
index 0000000..e8e46a1
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowersListResponse.json
@@ -0,0 +1,33 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "id": "#",
+  "javaType": "org.apache.streams.twitter.api.FollowersListResponse",
+  "javaInterfaces": [
+    "java.io.Serializable"
+  ],
+  "description": "https://dev.twitter.com/rest/reference/get/followers/list",
+  "properties": {
+    "users": {
+      "type": "array",
+      "items": {
+        "type": "object",
+        "$ref": "../pojo/User.json"
+      }
+    },
+    "previous_cursor": {
+      "type": "integer"
+    },
+    "previous_cursor_str": {
+      "type": "string"
+    },
+    "next_cursor": {
+      "type": "integer"
+    },
+    "next_cursor_str": {
+      "type": "string"
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowingIdsRequest.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowingIdsRequest.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowingIdsRequest.json
new file mode 100644
index 0000000..81804fa
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowingIdsRequest.json
@@ -0,0 +1,36 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "id": "#",
+  "javaType" : "org.apache.streams.twitter.api.FollowingIdsRequest",
+  "javaInterfaces": ["java.io.Serializable"],
+  "properties": {
+    "id": {
+      "description": "The ID of the user for whom to return results for.",
+      "required": false,
+      "type": "integer"
+    },
+    "screen_name": {
+      "description": "The screen name of the user for whom to return results for.",
+      "required": false,
+      "type": "string"
+    },
+    "cursor": {
+      "description": "Causes the list of connections to be broken into pages of no more than 5000 IDs at a time. The number of IDs returned is not guaranteed to be 5000 as suspended users are filtered out after connections are queried. If no cursor is provided, a value of -1 will be assumed, which is the first page.\nThe response from the API will include a previous_cursor and next_cursor to allow paging back and forth.",
+      "required": false,
+      "type": "integer"
+    },
+    "stringify_ids": {
+      "description": "Many programming environments will not consume our Tweet ids due to their size. Provide this option to have ids returned as strings instead.",
+      "required": false,
+      "type": "boolean"
+    },
+    "count": {
+      "description": "Specifies the number of IDs attempt retrieval of, up to a maximum of 5,000 per distinct request. The value of count is best thought of as a limit to the number of results to return. When using the count parameter with this method, it is wise to use a consistent count value across all requests to the same user\u2019s collection. Usage of this parameter is encouraged in environments where all 5,000 IDs constitutes too large of a response.",
+      "required": false,
+      "type": "integer"
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowingListRequest.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowingListRequest.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowingListRequest.json
new file mode 100644
index 0000000..90295b5
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowingListRequest.json
@@ -0,0 +1,41 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "id": "#",
+  "javaType" : "org.apache.streams.twitter.api.FollowingListRequest",
+  "javaInterfaces": ["java.io.Serializable"],
+  "properties": {
+    "id": {
+      "description": "The ID of the user for whom to return results for.",
+      "required": false,
+      "type": "integer"
+    },
+    "screen_name": {
+      "description": "The screen name of the user for whom to return results for.",
+      "required": false,
+      "type": "string"
+    },
+    "cursor": {
+      "description": "Causes the list of connections to be broken into pages of no more than 5000 IDs at a time. The number of IDs returned is not guaranteed to be 5000 as suspended users are filtered out after connections are queried. If no cursor is provided, a value of -1 will be assumed, which is the first page.\nThe response from the API will include a previous_cursor and next_cursor to allow paging back and forth.",
+      "required": false,
+      "type": "integer"
+    },
+    "count": {
+      "description": "The number of users to return per page, up to a maximum of 200. Defaults to 20.",
+      "required": false,
+      "type": "integer"
+    },
+    "skip_status": {
+      "description": "When set to either true , t or 1 , statuses will not be included in the returned user objects. If set to any other value, statuses will be included.",
+      "required": false,
+      "type": "boolean"
+    },
+    "include_user_entities": {
+      "description": "The user object entities node will not be included when set to false.",
+      "required": false,
+      "type": "integer"
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FriendsIdsRequest.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FriendsIdsRequest.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FriendsIdsRequest.json
new file mode 100644
index 0000000..fda54f8
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FriendsIdsRequest.json
@@ -0,0 +1,13 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "id": "#",
+  "type": "object",
+  "javaType" : "org.apache.streams.twitter.api.FriendsIdsRequest",
+  "javaInterfaces": ["java.io.Serializable"],
+  "description": "https://dev.twitter.com/rest/reference/get/friends/ids",
+  "extends": { "$ref": "FollowingIdsRequest.json" },
+  "properties": {}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FriendsIdsResponse.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FriendsIdsResponse.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FriendsIdsResponse.json
new file mode 100644
index 0000000..fc46841
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FriendsIdsResponse.json
@@ -0,0 +1,32 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "id": "#",
+  "javaType": "org.apache.streams.twitter.api.FriendsIdsResponse",
+  "javaInterfaces": [
+    "java.io.Serializable"
+  ],
+  "description": "https://dev.twitter.com/rest/reference/get/friends/ids",
+  "properties": {
+    "ids": {
+      "type": "array",
+      "items": {
+        "type": "integer"
+      }
+    },
+    "previous_cursor": {
+      "type": "integer"
+    },
+    "previous_cursor_str": {
+      "type": "string"
+    },
+    "next_cursor": {
+      "type": "integer"
+    },
+    "next_cursor_str": {
+      "type": "string"
+    }
+  }
+}
\ No newline at end of file