You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/11/25 20:25:12 UTC

[31/42] incubator-streams git commit: STREAMS-440: custom checkstyle.xml, address compliance

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookProvider.java
index e907082..be59bd7 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookProvider.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookProvider.java
@@ -15,16 +15,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.streams.facebook.provider;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Queues;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.typesafe.config.ConfigRenderOptions;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
@@ -34,6 +27,16 @@ import org.apache.streams.facebook.IdConfig;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.util.ComponentUtils;
 import org.apache.streams.util.SerializationUtil;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Queues;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.typesafe.config.ConfigRenderOptions;
+
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,7 +47,8 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -52,103 +56,109 @@ import java.util.concurrent.atomic.AtomicBoolean;
  */
 public abstract class FacebookProvider implements StreamsProvider {
 
-    private final static String STREAMS_ID = "FacebookProvider";
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(FacebookProvider.class);
-    private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
-    private static final int MAX_BATCH_SIZE = 2000;
-
-    protected FacebookConfiguration configuration;
-    protected BlockingQueue<StreamsDatum> datums;
-
-    private AtomicBoolean isComplete;
-    private ListeningExecutorService executor;
-    List<ListenableFuture<Object>> futures = new ArrayList<>();
-
-    private FacebookDataCollector dataCollector;
-
-    public FacebookProvider() {
-        try {
-            this.configuration = MAPPER.readValue(StreamsConfigurator.config.getConfig("facebook").root().render(ConfigRenderOptions.concise()), FacebookConfiguration.class);
-        } catch (IOException ioe) {
-            LOGGER.error("Exception trying to read default config : {}", ioe);
-        }
-    }
-
-    public FacebookProvider(FacebookConfiguration configuration) {
-        this.configuration = (FacebookConfiguration) SerializationUtil.cloneBySerialization(configuration);
-    }
-
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
+  private static final String STREAMS_ID = "FacebookProvider";
 
-    @Override
-    public void startStream() {
-        ListenableFuture future = executor.submit(getDataCollector());
-        futures.add(future);
-        executor.shutdown();
-    }
+  private static final Logger LOGGER = LoggerFactory.getLogger(FacebookProvider.class);
+  private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+  private static final int MAX_BATCH_SIZE = 2000;
 
-    protected abstract FacebookDataCollector getDataCollector();
-
-    @Override
-    public StreamsResultSet readCurrent() {
-        int batchSize = 0;
-        BlockingQueue<StreamsDatum> batch = Queues.newLinkedBlockingQueue();
-        while(!this.datums.isEmpty() && batchSize < MAX_BATCH_SIZE) {
-            ComponentUtils.offerUntilSuccess(ComponentUtils.pollWhileNotEmpty(this.datums), batch);
-            ++batchSize;
-        }
-        return new StreamsResultSet(batch);
-    }
+  protected FacebookConfiguration configuration;
+  protected BlockingQueue<StreamsDatum> datums;
 
-    @Override
-    public StreamsResultSet readNew(BigInteger sequence) {
-        return null;
-    }
+  private AtomicBoolean isComplete;
+  private ListeningExecutorService executor;
+  List<ListenableFuture<Object>> futures = new ArrayList<>();
 
-    @Override
-    public StreamsResultSet readRange(DateTime start, DateTime end) {
-        return null;
-    }
+  private FacebookDataCollector dataCollector;
 
-    @Override
-    public void prepare(Object configurationObject) {
-        this.datums = Queues.newLinkedBlockingQueue();
-        this.isComplete = new AtomicBoolean(false);
-        this.executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
+  /**
+   * FacebookProvider constructor - resolves FacebookConfiguration from JVM 'facebook'.
+   */
+  public FacebookProvider() {
+    try {
+      this.configuration = MAPPER.readValue(StreamsConfigurator.config.getConfig("facebook").root().render(ConfigRenderOptions.concise()), FacebookConfiguration.class);
+    } catch (IOException ioe) {
+      LOGGER.error("Exception trying to read default config : {}", ioe);
     }
-
-    @Override
-    public void cleanUp() {
-        ComponentUtils.shutdownExecutor(executor, 5, 5);
-        executor = null;
+  }
+
+  /**
+   * FacebookProvider constructor - uses supplied FacebookConfiguration.
+   */
+  public FacebookProvider(FacebookConfiguration configuration) {
+    this.configuration = (FacebookConfiguration) SerializationUtil.cloneBySerialization(configuration);
+  }
+
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
+
+  @Override
+  public void startStream() {
+    ListenableFuture future = executor.submit(getDataCollector());
+    futures.add(future);
+    executor.shutdown();
+  }
+
+  protected abstract FacebookDataCollector getDataCollector();
+
+  @Override
+  public StreamsResultSet readCurrent() {
+    int batchSize = 0;
+    BlockingQueue<StreamsDatum> batch = Queues.newLinkedBlockingQueue();
+    while (!this.datums.isEmpty() && batchSize < MAX_BATCH_SIZE) {
+      ComponentUtils.offerUntilSuccess(ComponentUtils.pollWhileNotEmpty(this.datums), batch);
+      ++batchSize;
     }
-
-    /**
-     * Overrides the ids and addedAfter time in the configuration
-     * @param idsToAfterDate
-     */
-    public void overrideIds(Map<String, DateTime> idsToAfterDate) {
-        Set<IdConfig> ids = Sets.newHashSet();
-        for(String id : idsToAfterDate.keySet()) {
-            IdConfig idConfig = new IdConfig();
-            idConfig.setId(id);
-            idConfig.setAfterDate(idsToAfterDate.get(id));
-            ids.add(idConfig);
-        }
-        this.configuration.setIds(ids);
+    return new StreamsResultSet(batch);
+  }
+
+  @Override
+  public StreamsResultSet readNew(BigInteger sequence) {
+    return null;
+  }
+
+  @Override
+  public StreamsResultSet readRange(DateTime start, DateTime end) {
+    return null;
+  }
+
+  @Override
+  public void prepare(Object configurationObject) {
+    this.datums = Queues.newLinkedBlockingQueue();
+    this.isComplete = new AtomicBoolean(false);
+    this.executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
+  }
+
+  @Override
+  public void cleanUp() {
+    ComponentUtils.shutdownExecutor(executor, 5, 5);
+    executor = null;
+  }
+
+  /**
+   * Overrides the ids and addedAfter time in the configuration.
+   * @param idsToAfterDate idsToAfterDate
+   */
+  public void overrideIds(Map<String, DateTime> idsToAfterDate) {
+    Set<IdConfig> ids = Sets.newHashSet();
+    for (String id : idsToAfterDate.keySet()) {
+      IdConfig idConfig = new IdConfig();
+      idConfig.setId(id);
+      idConfig.setAfterDate(idsToAfterDate.get(id));
+      ids.add(idConfig);
     }
-
-    @Override
-    public boolean isRunning() {
-        if (datums.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone()) {
-            LOGGER.info("Completed");
-            isComplete.set(true);
-            LOGGER.info("Exiting");
-        }
-        return !isComplete.get();
+    this.configuration.setIds(ids);
+  }
+
+  @Override
+  public boolean isRunning() {
+    if (datums.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone()) {
+      LOGGER.info("Completed");
+      isComplete.set(true);
+      LOGGER.info("Exiting");
     }
+    return !isComplete.get();
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserInformationProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserInformationProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserInformationProvider.java
index 1262106..3939f23 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserInformationProvider.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserInformationProvider.java
@@ -18,24 +18,24 @@
 
 package org.apache.streams.facebook.provider;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigRenderOptions;
-import facebook4j.*;
-import facebook4j.conf.ConfigurationBuilder;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
 import org.apache.streams.facebook.FacebookUserInformationConfiguration;
 import org.apache.streams.jackson.StreamsJacksonMapper;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigRenderOptions;
+
+import org.apache.commons.lang.NotImplementedException;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.commons.lang.NotImplementedException;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -44,262 +44,291 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Queue;
-import java.util.concurrent.*;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-public class FacebookUserInformationProvider implements StreamsProvider, Serializable
-{
+import facebook4j.Facebook;
+import facebook4j.FacebookException;
+import facebook4j.FacebookFactory;
+import facebook4j.Friend;
+import facebook4j.Paging;
+import facebook4j.ResponseList;
+import facebook4j.User;
+import facebook4j.conf.ConfigurationBuilder;
 
-    public static final String STREAMS_ID = "FacebookUserInformationProvider";
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(FacebookUserInformationProvider.class);
+public class FacebookUserInformationProvider implements StreamsProvider, Serializable {
 
-    private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+  public static final String STREAMS_ID = "FacebookUserInformationProvider";
 
-    private static final String ALL_PERMISSIONS = "ads_management,ads_read,create_event,create_note,email,export_stream,friends_about_me,friends_actions.books,friends_actions.music,friends_actions.news,friends_actions.video,friends_activities,friends_birthday,friends_education_history,friends_events,friends_games_activity,friends_groups,friends_hometown,friends_interests,friends_likes,friends_location,friends_notes,friends_online_presence,friends_photo_video_tags,friends_photos,friends_questions,friends_relationship_details,friends_relationships,friends_religion_politics,friends_status,friends_subscriptions,friends_videos,friends_website,friends_work_history,manage_friendlists,manage_notifications,manage_pages,photo_upload,publish_actions,publish_stream,read_friendlists,read_insights,read_mailbox,read_page_mailboxes,read_requests,read_stream,rsvp_event,share_item,sms,status_update,user_about_me,user_actions.books,user_actions.music,user_actions.news,user_actions.video,user_activitie
 s,user_birthday,user_education_history,user_events,user_friends,user_games_activity,user_groups,user_hometown,user_interests,user_likes,user_location,user_notes,user_online_presence,user_photo_video_tags,user_photos,user_questions,user_relationship_details,user_relationships,user_religion_politics,user_status,user_subscriptions,user_videos,user_website,user_work_history,video_upload,xmpp_login";
-    private FacebookUserInformationConfiguration facebookUserInformationConfiguration;
+  private static final Logger LOGGER = LoggerFactory.getLogger(FacebookUserInformationProvider.class);
 
-    private Class klass;
-    protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>();
+  private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
 
-    public FacebookUserInformationConfiguration getConfig()              { return facebookUserInformationConfiguration; }
+  private static final String ALL_PERMISSIONS = "ads_management,ads_read,create_event,create_note,email,export_stream,friends_about_me,friends_actions.books,friends_actions.music,friends_actions.news,friends_actions.video,friends_activities,friends_birthday,friends_education_history,friends_events,friends_games_activity,friends_groups,friends_hometown,friends_interests,friends_likes,friends_location,friends_notes,friends_online_presence,friends_photo_video_tags,friends_photos,friends_questions,friends_relationship_details,friends_relationships,friends_religion_politics,friends_status,friends_subscriptions,friends_videos,friends_website,friends_work_history,manage_friendlists,manage_notifications,manage_pages,photo_upload,publish_actions,publish_stream,read_friendlists,read_insights,read_mailbox,read_page_mailboxes,read_requests,read_stream,rsvp_event,share_item,sms,status_update,user_about_me,user_actions.books,user_actions.music,user_actions.news,user_actions.video,user_activities,
 user_birthday,user_education_history,user_events,user_friends,user_games_activity,user_groups,user_hometown,user_interests,user_likes,user_location,user_notes,user_online_presence,user_photo_video_tags,user_photos,user_questions,user_relationship_details,user_relationships,user_religion_politics,user_status,user_subscriptions,user_videos,user_website,user_work_history,video_upload,xmpp_login";
+  private FacebookUserInformationConfiguration facebookUserInformationConfiguration;
 
-    public void setConfig(FacebookUserInformationConfiguration config)   { this.facebookUserInformationConfiguration = config; }
+  private Class klass;
+  protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>();
 
-    protected Iterator<String[]> idsBatches;
+  public FacebookUserInformationConfiguration getConfig() {
+    return facebookUserInformationConfiguration;
+  }
 
-    protected ExecutorService executor;
+  public void setConfig(FacebookUserInformationConfiguration config) {
+    this.facebookUserInformationConfiguration = config;
+  }
 
-    protected DateTime start;
-    protected DateTime end;
+  protected Iterator<String[]> idsBatches;
 
-    protected final AtomicBoolean running = new AtomicBoolean();
+  protected ExecutorService executor;
 
-    private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
-        return new ThreadPoolExecutor(nThreads, nThreads,
-                5000L, TimeUnit.MILLISECONDS,
-                new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
-    }
+  protected DateTime start;
+  protected DateTime end;
 
-    public FacebookUserInformationProvider() {
-        Config config = StreamsConfigurator.config.getConfig("facebook");
-        FacebookUserInformationConfiguration facebookUserInformationConfiguration;
-        try {
-            facebookUserInformationConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class);
-        } catch (IOException e) {
-            e.printStackTrace();
-            return;
-        }
-    }
+  protected final AtomicBoolean running = new AtomicBoolean();
 
-    public FacebookUserInformationProvider(FacebookUserInformationConfiguration config) {
-        this.facebookUserInformationConfiguration = config;
-    }
+  private static ExecutorService newFixedThreadPoolWithQueueSize(int numThreads, int queueSize) {
+    return new ThreadPoolExecutor(numThreads, numThreads,
+        5000L, TimeUnit.MILLISECONDS,
+        new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
+  }
 
-    public FacebookUserInformationProvider(Class klass) {
-        Config config = StreamsConfigurator.config.getConfig("facebook");
-        FacebookUserInformationConfiguration facebookUserInformationConfiguration;
-        try {
-            facebookUserInformationConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class);
-        } catch (IOException e) {
-            e.printStackTrace();
-            return;
-        }
-        this.klass = klass;
+  /**
+   * FacebookUserInformationProvider constructor - resolves FacebookUserInformationConfiguration from JVM 'facebook'.
+   */
+  public FacebookUserInformationProvider() {
+    Config config = StreamsConfigurator.config.getConfig("facebook");
+    FacebookUserInformationConfiguration facebookUserInformationConfiguration;
+    try {
+      facebookUserInformationConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class);
+    } catch (IOException ex) {
+      ex.printStackTrace();
+      return;
     }
-
-    public FacebookUserInformationProvider(FacebookUserInformationConfiguration config, Class klass) {
-        this.facebookUserInformationConfiguration = config;
-        this.klass = klass;
+  }
+
+  /**
+   * FacebookUserInformationProvider constructor - uses supplie FacebookUserInformationConfiguration.
+   * @param config
+   */
+  public FacebookUserInformationProvider(FacebookUserInformationConfiguration config) {
+    this.facebookUserInformationConfiguration = config;
+  }
+
+  public FacebookUserInformationProvider(Class klass) {
+    Config config = StreamsConfigurator.config.getConfig("facebook");
+    FacebookUserInformationConfiguration facebookUserInformationConfiguration;
+    try {
+      facebookUserInformationConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class);
+    } catch (IOException ex) {
+      ex.printStackTrace();
+      return;
     }
-
-    public Queue<StreamsDatum> getProviderQueue() {
-        return this.providerQueue;
+    this.klass = klass;
+  }
+
+  public FacebookUserInformationProvider(FacebookUserInformationConfiguration config, Class klass) {
+    this.facebookUserInformationConfiguration = config;
+    this.klass = klass;
+  }
+
+  public Queue<StreamsDatum> getProviderQueue() {
+    return this.providerQueue;
+  }
+
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
+
+  @Override
+  public void startStream() {
+    running.set(true);
+  }
+
+  @Override
+  public StreamsResultSet readCurrent() {
+
+    Preconditions.checkArgument(idsBatches.hasNext());
+
+    LOGGER.info("readCurrent");
+
+    Facebook client = getFacebookClient();
+
+    try {
+      User me = client.users().getMe();
+      String json = mapper.writeValueAsString(me);
+      providerQueue.add(
+          new StreamsDatum(json, DateTime.now())
+      );
+    } catch (JsonProcessingException ex) {
+      ex.printStackTrace();
+    } catch (FacebookException ex) {
+      ex.printStackTrace();
     }
 
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
-
-    @Override
-    public void startStream() {
-        running.set(true);
-    }
-
-    public StreamsResultSet readCurrent() {
-
-        Preconditions.checkArgument(idsBatches.hasNext());
-
-        LOGGER.info("readCurrent");
-
-        Facebook client = getFacebookClient();
-
+    if (idsBatches.hasNext()) {
+      while (idsBatches.hasNext()) {
         try {
-            User me = client.users().getMe();
-            String json = mapper.writeValueAsString(me);
-            providerQueue.add(
-                new StreamsDatum(json, DateTime.now())
-            );
-        } catch (JsonProcessingException e) {
-            e.printStackTrace();
-        } catch (FacebookException e) {
-            e.printStackTrace();
-        }
+          List<User> userList = client.users().getUsers(idsBatches.next());
+          for (User user : userList) {
 
-        if( idsBatches.hasNext()) {
-            while (idsBatches.hasNext()) {
-                try {
-                    List<User> userList = client.users().getUsers(idsBatches.next());
-                    for (User user : userList) {
-
-                        try {
-                            String json = mapper.writeValueAsString(user);
-                            providerQueue.add(
-                                    new StreamsDatum(json, DateTime.now())
-                            );
-                        } catch (JsonProcessingException e) {
-                            //                        e.printStackTrace();
-                        }
-                    }
-
-                } catch (FacebookException e) {
-                    e.printStackTrace();
-                }
-            }
-        } else {
             try {
-                ResponseList<Friend> friendResponseList = client.friends().getFriends();
-                Paging<Friend> friendPaging;
-                do {
-
-                    for( Friend friend : friendResponseList ) {
-
-                        String json;
-                        try {
-                            json = mapper.writeValueAsString(friend);
-                            providerQueue.add(
-                                    new StreamsDatum(json)
-                            );
-                        } catch (JsonProcessingException e) {
-//                        e.printStackTrace();
-                        }
-                    }
-                    friendPaging = friendResponseList.getPaging();
-                    friendResponseList = client.fetchNext(friendPaging);
-                } while( friendPaging != null &&
-                        friendResponseList != null );
-            } catch (FacebookException e) {
-                e.printStackTrace();
+              String json = mapper.writeValueAsString(user);
+              providerQueue.add(
+                  new StreamsDatum(json, DateTime.now())
+              );
+            } catch (JsonProcessingException ex) {
+              LOGGER.trace("JsonProcessingException", ex);
             }
+          }
 
+        } catch (FacebookException ex) {
+          ex.printStackTrace();
         }
+      }
+    } else {
+      try {
+        ResponseList<Friend> friendResponseList = client.friends().getFriends();
+        Paging<Friend> friendPaging;
+        do {
 
-        LOGGER.info("Finished.  Cleaning up...");
-
-        LOGGER.info("Providing {} docs", providerQueue.size());
-
-        StreamsResultSet result =  new StreamsResultSet(providerQueue);
-        running.set(false);
+          for ( Friend friend : friendResponseList ) {
 
-        LOGGER.info("Exiting");
-
-        return result;
-
-    }
-
-    public StreamsResultSet readNew(BigInteger sequence) {
-        LOGGER.debug("{} readNew", STREAMS_ID);
-        throw new NotImplementedException();
-    }
-
-    public StreamsResultSet readRange(DateTime start, DateTime end) {
-        LOGGER.debug("{} readRange", STREAMS_ID);
-        this.start = start;
-        this.end = end;
-        readCurrent();
-        StreamsResultSet result = (StreamsResultSet)providerQueue.iterator();
-        return result;
-    }
+            String json;
+            try {
+              json = mapper.writeValueAsString(friend);
+              providerQueue.add(
+                  new StreamsDatum(json)
+              );
+            } catch (JsonProcessingException ex) {
+              LOGGER.trace("JsonProcessingException", ex);
+            }
+          }
+          friendPaging = friendResponseList.getPaging();
+          friendResponseList = client.fetchNext(friendPaging);
+        }
+        while ( friendPaging != null
+                &&
+                friendResponseList != null );
+      } catch (FacebookException ex) {
+        ex.printStackTrace();
+      }
 
-    @Override
-    public boolean isRunning() {
-        return running.get();
     }
 
-    void shutdownAndAwaitTermination(ExecutorService pool) {
-        pool.shutdown(); // Disable new tasks from being submitted
-        try {
-            // Wait a while for existing tasks to terminate
-            if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
-                pool.shutdownNow(); // Cancel currently executing tasks
-                // Wait a while for tasks to respond to being cancelled
-                if (!pool.awaitTermination(10, TimeUnit.SECONDS))
-                    System.err.println("Pool did not terminate");
-            }
-        } catch (InterruptedException ie) {
-            // (Re-)Cancel if current thread also interrupted
-            pool.shutdownNow();
-            // Preserve interrupt status
-            Thread.currentThread().interrupt();
+    LOGGER.info("Finished.  Cleaning up...");
+
+    LOGGER.info("Providing {} docs", providerQueue.size());
+
+    StreamsResultSet result =  new StreamsResultSet(providerQueue);
+    running.set(false);
+
+    LOGGER.info("Exiting");
+
+    return result;
+
+  }
+
+  @Override
+  public StreamsResultSet readNew(BigInteger sequence) {
+    LOGGER.debug("{} readNew", STREAMS_ID);
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public StreamsResultSet readRange(DateTime start, DateTime end) {
+    LOGGER.debug("{} readRange", STREAMS_ID);
+    this.start = start;
+    this.end = end;
+    readCurrent();
+    StreamsResultSet result = (StreamsResultSet)providerQueue.iterator();
+    return result;
+  }
+
+  @Override
+  public boolean isRunning() {
+    return running.get();
+  }
+
+  void shutdownAndAwaitTermination(ExecutorService pool) {
+    pool.shutdown(); // Disable new tasks from being submitted
+    try {
+      // Wait a while for existing tasks to terminate
+      if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+        pool.shutdownNow(); // Cancel currently executing tasks
+        // Wait a while for tasks to respond to being cancelled
+        if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+          System.err.println("Pool did not terminate");
         }
+      }
+    } catch (InterruptedException ie) {
+      // (Re-)Cancel if current thread also interrupted
+      pool.shutdownNow();
+      // Preserve interrupt status
+      Thread.currentThread().interrupt();
     }
+  }
 
-    @Override
-    public void prepare(Object o) {
-
-        executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
+  @Override
+  public void prepare(Object configurationObject) {
 
-        Preconditions.checkNotNull(providerQueue);
-        Preconditions.checkNotNull(this.klass);
-        Preconditions.checkNotNull(facebookUserInformationConfiguration.getOauth().getAppId());
-        Preconditions.checkNotNull(facebookUserInformationConfiguration.getOauth().getAppSecret());
-        Preconditions.checkNotNull(facebookUserInformationConfiguration.getOauth().getUserAccessToken());
-        Preconditions.checkNotNull(facebookUserInformationConfiguration.getInfo());
+    executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
 
-        List<String> ids = new ArrayList<String>();
-        List<String[]> idsBatches = new ArrayList<String[]>();
+    Preconditions.checkNotNull(providerQueue);
+    Preconditions.checkNotNull(this.klass);
+    Preconditions.checkNotNull(facebookUserInformationConfiguration.getOauth().getAppId());
+    Preconditions.checkNotNull(facebookUserInformationConfiguration.getOauth().getAppSecret());
+    Preconditions.checkNotNull(facebookUserInformationConfiguration.getOauth().getUserAccessToken());
+    Preconditions.checkNotNull(facebookUserInformationConfiguration.getInfo());
 
-        for(String s : facebookUserInformationConfiguration.getInfo()) {
-            if(s != null)
-            {
-                ids.add(s);
+    List<String> ids = new ArrayList<String>();
+    List<String[]> idsBatches = new ArrayList<String[]>();
 
-                if(ids.size() >= 100) {
-                    // add the batch
-                    idsBatches.add(ids.toArray(new String[ids.size()]));
-                    // reset the Ids
-                    ids = new ArrayList<String>();
-                }
+    for (String s : facebookUserInformationConfiguration.getInfo()) {
+      if (s != null) {
+        ids.add(s);
 
-            }
+        if (ids.size() >= 100) {
+          // add the batch
+          idsBatches.add(ids.toArray(new String[ids.size()]));
+          // reset the Ids
+          ids = new ArrayList<String>();
         }
 
-        if(ids.size() > 0)
-            idsBatches.add(ids.toArray(new String[ids.size()]));
-
-        this.idsBatches = idsBatches.iterator();
+      }
     }
 
-    protected Facebook getFacebookClient()
-    {
-        ConfigurationBuilder cb = new ConfigurationBuilder();
-        cb.setDebugEnabled(true)
-            .setOAuthAppId(facebookUserInformationConfiguration.getOauth().getAppId())
-            .setOAuthAppSecret(facebookUserInformationConfiguration.getOauth().getAppSecret())
-            .setOAuthAccessToken(facebookUserInformationConfiguration.getOauth().getUserAccessToken())
-            .setOAuthPermissions(ALL_PERMISSIONS)
-            .setJSONStoreEnabled(true)
-            .setClientVersion("v1.0");
-
-        FacebookFactory ff = new FacebookFactory(cb.build());
-        Facebook facebook = ff.getInstance();
-
-        return facebook;
+    if (ids.size() > 0) {
+      idsBatches.add(ids.toArray(new String[ids.size()]));
     }
 
-    @Override
-    public void cleanUp() {
-        shutdownAndAwaitTermination(executor);
-    }
+    this.idsBatches = idsBatches.iterator();
+  }
+
+  protected Facebook getFacebookClient() {
+    ConfigurationBuilder cb = new ConfigurationBuilder();
+    cb.setDebugEnabled(true)
+        .setOAuthAppId(facebookUserInformationConfiguration.getOauth().getAppId())
+        .setOAuthAppSecret(facebookUserInformationConfiguration.getOauth().getAppSecret())
+        .setOAuthAccessToken(facebookUserInformationConfiguration.getOauth().getUserAccessToken())
+        .setOAuthPermissions(ALL_PERMISSIONS)
+        .setJSONStoreEnabled(true)
+        .setClientVersion("v1.0");
+
+    FacebookFactory ff = new FacebookFactory(cb.build());
+    Facebook facebook = ff.getInstance();
+
+    return facebook;
+  }
+
+  @Override
+  public void cleanUp() {
+    shutdownAndAwaitTermination(executor);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserstreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserstreamProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserstreamProvider.java
index 0f2121a..b292d30 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserstreamProvider.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserstreamProvider.java
@@ -18,18 +18,6 @@
 
 package org.apache.streams.facebook.provider;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Queues;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigRenderOptions;
-import facebook4j.*;
-import facebook4j.Post;
-import facebook4j.conf.ConfigurationBuilder;
-import facebook4j.json.DataObjectFactory;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.DatumStatusCounter;
 import org.apache.streams.core.StreamsDatum;
@@ -39,10 +27,20 @@ import org.apache.streams.facebook.FacebookUserInformationConfiguration;
 import org.apache.streams.facebook.FacebookUserstreamConfiguration;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.util.ComponentUtils;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Queues;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigRenderOptions;
+
+import org.apache.commons.lang.NotImplementedException;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.commons.lang.NotImplementedException;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -51,276 +49,306 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Queue;
 import java.util.Set;
-import java.util.concurrent.*;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import facebook4j.Facebook;
+import facebook4j.FacebookException;
+import facebook4j.FacebookFactory;
+import facebook4j.Post;
+import facebook4j.ResponseList;
+import facebook4j.conf.ConfigurationBuilder;
+import facebook4j.json.DataObjectFactory;
+
 public class FacebookUserstreamProvider implements StreamsProvider, Serializable {
 
-    public static final String STREAMS_ID = "FacebookUserstreamProvider";
+  public static final String STREAMS_ID = "FacebookUserstreamProvider";
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(FacebookUserstreamProvider.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(FacebookUserstreamProvider.class);
 
-    private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+  private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
 
-    private static final String ALL_PERMISSIONS = "read_stream";
-    private FacebookUserstreamConfiguration configuration;
+  private static final String ALL_PERMISSIONS = "read_stream";
+  private FacebookUserstreamConfiguration configuration;
 
-    private Class klass;
-    protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+  private Class klass;
+  protected final ReadWriteLock lock = new ReentrantReadWriteLock();
 
-    protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>();
+  protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>();
 
-    public FacebookUserstreamConfiguration getConfig() {
-        return configuration;
-    }
+  public FacebookUserstreamConfiguration getConfig() {
+    return configuration;
+  }
 
-    public void setConfig(FacebookUserstreamConfiguration config) {
-        this.configuration = config;
-    }
+  public void setConfig(FacebookUserstreamConfiguration config) {
+    this.configuration = config;
+  }
 
-    protected ListeningExecutorService executor;
+  protected ListeningExecutorService executor;
 
-    protected DateTime start;
-    protected DateTime end;
+  protected DateTime start;
+  protected DateTime end;
 
-    protected final AtomicBoolean running = new AtomicBoolean();
+  protected final AtomicBoolean running = new AtomicBoolean();
 
-    private DatumStatusCounter countersCurrent = new DatumStatusCounter();
-    private DatumStatusCounter countersTotal = new DatumStatusCounter();
+  private DatumStatusCounter countersCurrent = new DatumStatusCounter();
+  private DatumStatusCounter countersTotal = new DatumStatusCounter();
 
-    protected Facebook client;
+  protected Facebook client;
 
-    private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
-        return new ThreadPoolExecutor(nThreads, nThreads,
-                5000L, TimeUnit.MILLISECONDS,
-                new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
-    }
+  private static ExecutorService newFixedThreadPoolWithQueueSize(int numThreads, int queueSize) {
+    return new ThreadPoolExecutor(numThreads, numThreads,
+        5000L, TimeUnit.MILLISECONDS,
+        new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
+  }
 
-    public FacebookUserstreamProvider() {
-        Config config = StreamsConfigurator.config.getConfig("facebook");
-        FacebookUserInformationConfiguration facebookUserInformationConfiguration;
-        try {
-            facebookUserInformationConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class);
-        } catch (IOException e) {
-            e.printStackTrace();
-            return;
-        }
+  /**
+   * FacebookUserstreamProvider constructor.
+   */
+  public FacebookUserstreamProvider() {
+    Config config = StreamsConfigurator.config.getConfig("facebook");
+    FacebookUserInformationConfiguration facebookUserInformationConfiguration;
+    try {
+      facebookUserInformationConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class);
+    } catch (IOException ex) {
+      ex.printStackTrace();
+      return;
     }
-
-    public FacebookUserstreamProvider(FacebookUserstreamConfiguration config) {
-        this.configuration = config;
+  }
+
+  /**
+   * FacebookUserstreamProvider constructor.
+   * @param config config
+   */
+  public FacebookUserstreamProvider(FacebookUserstreamConfiguration config) {
+    this.configuration = config;
+  }
+
+  /**
+   * FacebookUserstreamProvider constructor.
+   * @param klass output Class
+   */
+  public FacebookUserstreamProvider(Class klass) {
+    Config config = StreamsConfigurator.config.getConfig("facebook");
+    FacebookUserInformationConfiguration facebookUserInformationConfiguration;
+    try {
+      facebookUserInformationConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class);
+    } catch (IOException ex) {
+      ex.printStackTrace();
+      return;
     }
-
-    public FacebookUserstreamProvider(Class klass) {
-        Config config = StreamsConfigurator.config.getConfig("facebook");
-        FacebookUserInformationConfiguration facebookUserInformationConfiguration;
-        try {
-            facebookUserInformationConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class);
-        } catch (IOException e) {
-            e.printStackTrace();
-            return;
-        }
-        this.klass = klass;
+    this.klass = klass;
+  }
+
+  public FacebookUserstreamProvider(FacebookUserstreamConfiguration config, Class klass) {
+    this.configuration = config;
+    this.klass = klass;
+  }
+
+  public Queue<StreamsDatum> getProviderQueue() {
+    return this.providerQueue;
+  }
+
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
+
+  @Override
+  public void startStream() {
+
+    client = getFacebookClient();
+
+    if ( configuration.getInfo() != null
+         &&
+         configuration.getInfo().size() > 0 ) {
+      for ( String id : configuration.getInfo()) {
+        executor.submit(new FacebookFeedPollingTask(this, id));
+      }
+      running.set(true);
+    } else {
+      try {
+        String id = client.getMe().getId();
+        executor.submit(new FacebookFeedPollingTask(this, id));
+        running.set(true);
+      } catch (FacebookException ex) {
+        LOGGER.error(ex.getMessage());
+        running.set(false);
+      }
     }
+  }
 
-    public FacebookUserstreamProvider(FacebookUserstreamConfiguration config, Class klass) {
-        this.configuration = config;
-        this.klass = klass;
-    }
+  @Override
+  public StreamsResultSet readCurrent() {
 
-    public Queue<StreamsDatum> getProviderQueue() {
-        return this.providerQueue;
-    }
+    StreamsResultSet current;
 
-    @Override
-    public String getId() {
-        return STREAMS_ID;
+    synchronized (FacebookUserstreamProvider.class) {
+      current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(providerQueue));
+      current.setCounter(new DatumStatusCounter());
+      current.getCounter().add(countersCurrent);
+      countersTotal.add(countersCurrent);
+      countersCurrent = new DatumStatusCounter();
+      providerQueue.clear();
     }
 
-    @Override
-    public void startStream() {
-
-        client = getFacebookClient();
-
-        if( configuration.getInfo() != null &&
-            configuration.getInfo().size() > 0 ) {
-            for( String id : configuration.getInfo()) {
-                executor.submit(new FacebookFeedPollingTask(this, id));
-            }
-            running.set(true);
-        } else {
-            try {
-                String id = client.getMe().getId();
-                executor.submit(new FacebookFeedPollingTask(this, id));
-                running.set(true);
-            } catch (FacebookException e) {
-                LOGGER.error(e.getMessage());
-                running.set(false);
-            }
+    return current;
+
+  }
+
+  @Override
+  public StreamsResultSet readNew(BigInteger sequence) {
+    LOGGER.debug("{} readNew", STREAMS_ID);
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public StreamsResultSet readRange(DateTime start, DateTime end) {
+    LOGGER.debug("{} readRange", STREAMS_ID);
+    this.start = start;
+    this.end = end;
+    readCurrent();
+    StreamsResultSet result = (StreamsResultSet) providerQueue.iterator();
+    return result;
+  }
+
+  @Override
+  public boolean isRunning() {
+    return running.get();
+  }
+
+  void shutdownAndAwaitTermination(ExecutorService pool) {
+    pool.shutdown(); // Disable new tasks from being submitted
+    try {
+      // Wait a while for existing tasks to terminate
+      if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+        pool.shutdownNow(); // Cancel currently executing tasks
+        // Wait a while for tasks to respond to being cancelled
+        if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+          System.err.println("Pool did not terminate");
         }
+      }
+    } catch (InterruptedException ie) {
+      // (Re-)Cancel if current thread also interrupted
+      pool.shutdownNow();
+      // Preserve interrupt status
+      Thread.currentThread().interrupt();
     }
+  }
 
-    public StreamsResultSet readCurrent() {
+  @Override
+  public void prepare(Object configurationObject) {
 
-        StreamsResultSet current;
+    executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
 
-        synchronized (FacebookUserstreamProvider.class) {
-            current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(providerQueue));
-            current.setCounter(new DatumStatusCounter());
-            current.getCounter().add(countersCurrent);
-            countersTotal.add(countersCurrent);
-            countersCurrent = new DatumStatusCounter();
-            providerQueue.clear();
-        }
+    Preconditions.checkNotNull(providerQueue);
+    Preconditions.checkNotNull(this.klass);
+    Preconditions.checkNotNull(configuration.getOauth().getAppId());
+    Preconditions.checkNotNull(configuration.getOauth().getAppSecret());
+    Preconditions.checkNotNull(configuration.getOauth().getUserAccessToken());
 
-        return current;
+    client = getFacebookClient();
 
-    }
+    if ( configuration.getInfo() != null
+         &&
+         configuration.getInfo().size() > 0 ) {
 
-    public StreamsResultSet readNew(BigInteger sequence) {
-        LOGGER.debug("{} readNew", STREAMS_ID);
-        throw new NotImplementedException();
-    }
+      List<String> ids = new ArrayList<String>();
+      List<String[]> idsBatches = new ArrayList<String[]>();
 
-    public StreamsResultSet readRange(DateTime start, DateTime end) {
-        LOGGER.debug("{} readRange", STREAMS_ID);
-        this.start = start;
-        this.end = end;
-        readCurrent();
-        StreamsResultSet result = (StreamsResultSet) providerQueue.iterator();
-        return result;
-    }
+      for (String s : configuration.getInfo()) {
+        if (s != null) {
+          ids.add(s);
 
-    @Override
-    public boolean isRunning() {
-        return running.get();
-    }
+          if (ids.size() >= 100) {
+            // add the batch
+            idsBatches.add(ids.toArray(new String[ids.size()]));
+            // reset the Ids
+            ids = new ArrayList<String>();
+          }
 
-    void shutdownAndAwaitTermination(ExecutorService pool) {
-        pool.shutdown(); // Disable new tasks from being submitted
-        try {
-            // Wait a while for existing tasks to terminate
-            if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
-                pool.shutdownNow(); // Cancel currently executing tasks
-                // Wait a while for tasks to respond to being cancelled
-                if (!pool.awaitTermination(10, TimeUnit.SECONDS))
-                    System.err.println("Pool did not terminate");
-            }
-        } catch (InterruptedException ie) {
-            // (Re-)Cancel if current thread also interrupted
-            pool.shutdownNow();
-            // Preserve interrupt status
-            Thread.currentThread().interrupt();
         }
+      }
     }
+  }
 
-    @Override
-    public void prepare(Object o) {
-
-        executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
+  protected Facebook getFacebookClient() {
+    ConfigurationBuilder cb = new ConfigurationBuilder();
+    cb.setDebugEnabled(true)
+        .setOAuthAppId(configuration.getOauth().getAppId())
+        .setOAuthAppSecret(configuration.getOauth().getAppSecret())
+        .setOAuthAccessToken(configuration.getOauth().getUserAccessToken())
+        .setOAuthPermissions(ALL_PERMISSIONS)
+        .setJSONStoreEnabled(true);
 
-        Preconditions.checkNotNull(providerQueue);
-        Preconditions.checkNotNull(this.klass);
-        Preconditions.checkNotNull(configuration.getOauth().getAppId());
-        Preconditions.checkNotNull(configuration.getOauth().getAppSecret());
-        Preconditions.checkNotNull(configuration.getOauth().getUserAccessToken());
+    FacebookFactory ff = new FacebookFactory(cb.build());
+    Facebook facebook = ff.getInstance();
 
-        client = getFacebookClient();
+    return facebook;
+  }
 
-        if( configuration.getInfo() != null &&
-            configuration.getInfo().size() > 0 ) {
+  @Override
+  public void cleanUp() {
+    shutdownAndAwaitTermination(executor);
+  }
 
-            List<String> ids = new ArrayList<String>();
-            List<String[]> idsBatches = new ArrayList<String[]>();
+  private class FacebookFeedPollingTask implements Runnable {
 
-            for (String s : configuration.getInfo()) {
-                if (s != null) {
-                    ids.add(s);
+    FacebookUserstreamProvider provider;
+    Facebook client;
+    String id;
 
-                    if (ids.size() >= 100) {
-                        // add the batch
-                        idsBatches.add(ids.toArray(new String[ids.size()]));
-                        // reset the Ids
-                        ids = new ArrayList<String>();
-                    }
+    private Set<Post> priorPollResult = Sets.newHashSet();
 
-                }
-            }
-        }
+    public FacebookFeedPollingTask(FacebookUserstreamProvider facebookUserstreamProvider) {
+      this.provider = facebookUserstreamProvider;
     }
 
-    protected Facebook getFacebookClient() {
-        ConfigurationBuilder cb = new ConfigurationBuilder();
-        cb.setDebugEnabled(true)
-                .setOAuthAppId(configuration.getOauth().getAppId())
-                .setOAuthAppSecret(configuration.getOauth().getAppSecret())
-                .setOAuthAccessToken(configuration.getOauth().getUserAccessToken())
-                .setOAuthPermissions(ALL_PERMISSIONS)
-                .setJSONStoreEnabled(true);
-
-        FacebookFactory ff = new FacebookFactory(cb.build());
-        Facebook facebook = ff.getInstance();
-
-        return facebook;
+    public FacebookFeedPollingTask(FacebookUserstreamProvider facebookUserstreamProvider, String id) {
+      this.provider = facebookUserstreamProvider;
+      this.client = provider.client;
+      this.id = id;
     }
 
     @Override
-    public void cleanUp() {
-        shutdownAndAwaitTermination(executor);
-    }
-
-    private class FacebookFeedPollingTask implements Runnable {
-
-        FacebookUserstreamProvider provider;
-        Facebook client;
-        String id;
-
-        private Set<Post> priorPollResult = Sets.newHashSet();
-
-        public FacebookFeedPollingTask(FacebookUserstreamProvider facebookUserstreamProvider) {
-            this.provider = facebookUserstreamProvider;
-        }
-
-        public FacebookFeedPollingTask(FacebookUserstreamProvider facebookUserstreamProvider, String id) {
-            this.provider = facebookUserstreamProvider;
-            this.client = provider.client;
-            this.id = id;
-        }
-        @Override
-        public void run() {
-            while (provider.isRunning()) {
-                ResponseList<Post> postResponseList;
-                try {
-                    postResponseList = client.getFeed(id);
-
-                    Set<Post> update = Sets.newHashSet(postResponseList);
-                    Set<Post> repeats = Sets.intersection(priorPollResult, Sets.newHashSet(update));
-                    Set<Post> entrySet = Sets.difference(update, repeats);
-                    LOGGER.debug(this.id + " response: " + update.size() + " previous: " + repeats.size() + " new: " + entrySet.size());
-                    for (Post item : entrySet) {
-                        String json = DataObjectFactory.getRawJSON(item);
-                        org.apache.streams.facebook.Post post = mapper.readValue(json, org.apache.streams.facebook.Post.class);
-                        try {
-                            lock.readLock().lock();
-                            ComponentUtils.offerUntilSuccess(new StreamsDatum(post), providerQueue);
-                            countersCurrent.incrementAttempt();
-                        } finally {
-                            lock.readLock().unlock();
-                        }
-                    }
-                    priorPollResult = update;
-                } catch (Exception e) {
-                    e.printStackTrace();
-                } finally {
-                    try {
-                        Thread.sleep(configuration.getPollIntervalMillis());
-                    } catch (InterruptedException e) {
-                        Thread.currentThread().interrupt();
-                    }
-                }
+    public void run() {
+      while (provider.isRunning()) {
+        ResponseList<Post> postResponseList;
+        try {
+          postResponseList = client.getFeed(id);
+
+          Set<Post> update = Sets.newHashSet(postResponseList);
+          Set<Post> repeats = Sets.intersection(priorPollResult, Sets.newHashSet(update));
+          Set<Post> entrySet = Sets.difference(update, repeats);
+          LOGGER.debug(this.id + " response: " + update.size() + " previous: " + repeats.size() + " new: " + entrySet.size());
+          for (Post item : entrySet) {
+            String json = DataObjectFactory.getRawJSON(item);
+            org.apache.streams.facebook.Post post = mapper.readValue(json, org.apache.streams.facebook.Post.class);
+            try {
+              lock.readLock().lock();
+              ComponentUtils.offerUntilSuccess(new StreamsDatum(post), providerQueue);
+              countersCurrent.incrementAttempt();
+            } finally {
+              lock.readLock().unlock();
             }
+          }
+          priorPollResult = update;
+        } catch (Exception ex) {
+          ex.printStackTrace();
+        } finally {
+          try {
+            Thread.sleep(configuration.getPollIntervalMillis());
+          } catch (InterruptedException interrupt) {
+            Thread.currentThread().interrupt();
+          }
         }
+      }
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageDataCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageDataCollector.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageDataCollector.java
index 0e88dd4..68d8f06 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageDataCollector.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageDataCollector.java
@@ -20,7 +20,6 @@ package org.apache.streams.facebook.provider.page;
 
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.facebook.FacebookConfiguration;
-import org.apache.streams.facebook.FacebookPageProviderConfiguration;
 import org.apache.streams.facebook.IdConfig;
 import org.apache.streams.facebook.provider.FacebookDataCollector;
 import org.apache.streams.jackson.StreamsJacksonMapper;
@@ -28,7 +27,6 @@ import org.apache.streams.jackson.StreamsJacksonMapper;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 
-import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,11 +34,10 @@ import java.util.concurrent.BlockingQueue;
 
 import facebook4j.FacebookException;
 import facebook4j.Page;
-import facebook4j.Reading;
 import facebook4j.json.DataObjectFactory;
 
 /**
- * Collects the page data from public Facebook pages
+ * Collects the page data from public Facebook pages.
  */
 public class FacebookPageDataCollector extends FacebookDataCollector {
 
@@ -48,11 +45,8 @@ public class FacebookPageDataCollector extends FacebookDataCollector {
   private static final int MAX_ATTEMPTS = 5;
   private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
 
-  private String fields;
-
-  public FacebookPageDataCollector(BlockingQueue<StreamsDatum> queue, FacebookPageProviderConfiguration configuration) {
+  public FacebookPageDataCollector(BlockingQueue<StreamsDatum> queue, FacebookConfiguration configuration) {
     super(configuration, queue);
-    fields = StringUtils.join(configuration.getFields(), ',');
   }
 
   @Override
@@ -70,7 +64,7 @@ public class FacebookPageDataCollector extends FacebookDataCollector {
     while (attempt < MAX_ATTEMPTS) {
       ++attempt;
       try {
-        Page page = getNextFacebookClient().getPage(pageId, new Reading().fields(fields));
+        Page page = getNextFacebookClient().getPage(pageId);
         return page;
       } catch (FacebookException fe) {
         LOGGER.error("Facebook returned an exception : {}", fe);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageProvider.java
index d11a486..e7bbcfa 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageProvider.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageProvider.java
@@ -80,9 +80,8 @@ public class FacebookPageProvider extends FacebookProvider {
 
   private FacebookPageProviderConfiguration configuration;
 
-  public FacebookPageProvider(FacebookPageProviderConfiguration facebookConfiguration) {
+  public FacebookPageProvider(FacebookConfiguration facebookConfiguration) {
     super(facebookConfiguration);
-    configuration = facebookConfiguration;
   }
 
   @VisibleForTesting
@@ -92,7 +91,7 @@ public class FacebookPageProvider extends FacebookProvider {
 
   @Override
   protected FacebookDataCollector getDataCollector() {
-    return new FacebookPageDataCollector(super.datums, configuration);
+    return new FacebookPageDataCollector(super.datums, super.configuration);
   }
 
   /**
@@ -115,7 +114,7 @@ public class FacebookPageProvider extends FacebookProvider {
     Config typesafe  = conf.withFallback(reference).resolve();
 
     StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe);
-    FacebookPageProviderConfiguration config = new ComponentConfigurator<>(FacebookPageProviderConfiguration.class).detectConfiguration(typesafe, "facebook");
+    FacebookConfiguration config = new ComponentConfigurator<>(FacebookConfiguration.class).detectConfiguration(typesafe, "facebook");
     FacebookPageProvider provider = new FacebookPageProvider(config);
 
     PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookPageFeedDataCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookPageFeedDataCollector.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookPageFeedDataCollector.java
index c2ba700..f509170 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookPageFeedDataCollector.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookPageFeedDataCollector.java
@@ -15,116 +15,124 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.streams.facebook.provider.pagefeed;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import facebook4j.*;
-import facebook4j.json.DataObjectFactory;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.facebook.FacebookConfiguration;
 import org.apache.streams.facebook.IdConfig;
 import org.apache.streams.facebook.provider.FacebookDataCollector;
 import org.apache.streams.jackson.StreamsJacksonMapper;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.BlockingQueue;
 
+import facebook4j.FacebookException;
+import facebook4j.Paging;
+import facebook4j.Post;
+import facebook4j.Reading;
+import facebook4j.ResponseList;
+import facebook4j.json.DataObjectFactory;
+
 /**
- * Collects the page feed data from public Facebook pages
+ * Collects the page feed data from public Facebook pages.
  */
 public class FacebookPageFeedDataCollector extends FacebookDataCollector {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(FacebookPageFeedDataCollector.class);
-    private static final int MAX_ATTEMPTS = 5;
-    private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
-    private static final int LIMIT = 100;
+  private static final Logger LOGGER = LoggerFactory.getLogger(FacebookPageFeedDataCollector.class);
+  private static final int MAX_ATTEMPTS = 5;
+  private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+  private static final int LIMIT = 100;
 
-    public FacebookPageFeedDataCollector(BlockingQueue<StreamsDatum> queue, FacebookConfiguration configuration) {
-        super(configuration, queue);
-    }
+  public FacebookPageFeedDataCollector(BlockingQueue<StreamsDatum> queue, FacebookConfiguration configuration) {
+    super(configuration, queue);
+  }
+
+  @Override
+  protected void getData(IdConfig id) throws Exception {
+    boolean exit = false;
 
-    @Override
-    protected void getData(IdConfig id) throws Exception {
-        boolean exit = false;
+    ResponseList<Post> facebookPosts = getPosts(id.getId());
+    LOGGER.debug("Post received : {}", facebookPosts.size());
+    backOff.reset();
+    do {
+      for (Post post : facebookPosts) {
+        if (id.getBeforeDate() != null && id.getAfterDate() != null) {
+          if (id.getBeforeDate().isAfter(post.getCreatedTime().getTime())
+              && id.getAfterDate().isBefore(post.getCreatedTime().getTime())) {
+            super.outputData(MAPPER.readValue(DataObjectFactory.getRawJSON(post), org.apache.streams.facebook.Post.class), post.getId());
 
-        ResponseList<Post> facebookPosts = getPosts(id.getId());
-        LOGGER.debug("Post received : {}", facebookPosts.size());
+          }
+        } else if (id.getBeforeDate() != null && id.getBeforeDate().isAfter(post.getCreatedTime().getTime())) {
+          super.outputData(MAPPER.readValue(DataObjectFactory.getRawJSON(post), org.apache.streams.facebook.Post.class), post.getId());
+        } else if (id.getAfterDate() != null && id.getAfterDate().isBefore(post.getCreatedTime().getTime())) {
+          super.outputData(MAPPER.readValue(DataObjectFactory.getRawJSON(post), org.apache.streams.facebook.Post.class), post.getId());
+        } else if (id.getBeforeDate() == null && id.getAfterDate() == null) {
+          super.outputData(MAPPER.readValue(DataObjectFactory.getRawJSON(post), org.apache.streams.facebook.Post.class), post.getId());
+        } else {
+          exit = true;
+          LOGGER.debug("Breaking on post, {}, with createdAtDate {}", post.getId(), post.getCreatedTime());
+          break;
+        }
+      }
+      if (facebookPosts.getPaging() != null && !exit) {
+        LOGGER.debug("Paging. . .");
+        facebookPosts = getPosts(facebookPosts.getPaging());
         backOff.reset();
-        do {
-            for(Post post : facebookPosts) {
-                if(id.getBeforeDate() != null && id.getAfterDate() != null) {
-                    if(id.getBeforeDate().isAfter(post.getCreatedTime().getTime())
-                            && id.getAfterDate().isBefore(post.getCreatedTime().getTime())) {
-                        super.outputData(MAPPER.readValue(DataObjectFactory.getRawJSON(post), org.apache.streams.facebook.Post.class), post.getId());
-
-                    }
-                } else if(id.getBeforeDate() != null && id.getBeforeDate().isAfter(post.getCreatedTime().getTime())) {
-                    super.outputData(MAPPER.readValue(DataObjectFactory.getRawJSON(post), org.apache.streams.facebook.Post.class), post.getId());
-                } else if(id.getAfterDate() != null && id.getAfterDate().isBefore(post.getCreatedTime().getTime())) {
-                    super.outputData(MAPPER.readValue(DataObjectFactory.getRawJSON(post), org.apache.streams.facebook.Post.class), post.getId());
-                } else if(id.getBeforeDate() == null && id.getAfterDate() == null) {
-                    super.outputData(MAPPER.readValue(DataObjectFactory.getRawJSON(post), org.apache.streams.facebook.Post.class), post.getId());
-                } else {
-                    exit = true;
-                    LOGGER.debug("Breaking on post, {}, with createdAtDate {}", post.getId(), post.getCreatedTime());
-                    break;
-                }
-            }
-            if(facebookPosts.getPaging() != null && !exit) {
-                LOGGER.debug("Paging. . .");
-                facebookPosts = getPosts(facebookPosts.getPaging());
-                backOff.reset();
-                LOGGER.debug("Paging received {} posts*", facebookPosts.size());
-            } else {
-                LOGGER.debug("No more paging.");
-                facebookPosts = null;
-            }
-        } while(facebookPosts != null && facebookPosts.size() != 0);
+        LOGGER.debug("Paging received {} posts*", facebookPosts.size());
+      } else {
+        LOGGER.debug("No more paging.");
+        facebookPosts = null;
+      }
+    }
+    while (facebookPosts != null && facebookPosts.size() != 0);
 
+  }
 
-    }
+  private ResponseList<Post> getPosts(Paging<Post> paging) throws Exception {
+    return getPosts(null, paging);
+  }
 
-    private ResponseList<Post> getPosts(Paging<Post> paging) throws Exception{
-        return getPosts(null, paging);
-    }
+  private ResponseList<Post> getPosts(String pageId) throws Exception {
+    return getPosts(pageId, null);
+  }
 
-    private ResponseList<Post> getPosts(String pageId) throws Exception {
-        return getPosts(pageId, null);
-    }
+  /**
+   * Queries facebook.  Attempts requests up to 5 times and backs off on each facebook exception.
+   * @param pageId pageId
+   * @param paging paging
+   * @return ResponseList of $link{facebook4j.Post}
+   * @throws Exception Exception
+   */
+  private ResponseList<Post> getPosts(String pageId, Paging<Post> paging) throws Exception {
+    int attempt = 0;
+    while (attempt < MAX_ATTEMPTS) {
+      ++attempt;
+      try {
+        if (pageId != null) {
+          Reading reading = new Reading();
+          reading.limit(LIMIT);
+          return getNextFacebookClient().getPosts(pageId, reading);
+        } else {
+          return getNextFacebookClient().fetchNext(paging);
+        }
+      } catch (FacebookException fe) {
+        LOGGER.error("Facebook returned an exception : {}", fe);
+        LOGGER.error("Facebook returned an exception while trying to get feed for page, {} : {}", pageId, fe.getMessage());
+        //TODO Rate limit exceptions with facebook4j unclear http://facebook4j.org/oldjavadocs/1.1.12-2.0.0/2.0.0/index.html?facebook4j/internal/http/HttpResponseCode.html
+        // back off at all exceptions until figured out.
+        int errorCode = fe.getErrorCode();
 
-    /**
-     * Queries facebook.  Attempts requests up to 5 times and backs off on each facebook exception.
-     * @param pageId
-     * @param paging
-     * @return
-     * @throws Exception
-     */
-    private ResponseList<Post> getPosts(String pageId, Paging<Post> paging) throws Exception {
-        int attempt = 0;
-        while(attempt < MAX_ATTEMPTS) {
-            ++attempt;
-            try {
-                if (pageId != null) {
-                    Reading reading = new Reading();
-                    reading.limit(LIMIT);
-                    return getNextFacebookClient().getPosts(pageId, reading);
-                }
-                else
-                    return getNextFacebookClient().fetchNext(paging);
-            } catch (FacebookException fe) {
-                LOGGER.error("Facebook returned an exception : {}", fe);
-                LOGGER.error("Facebook returned an exception while trying to get feed for page, {} : {}", pageId, fe.getMessage());
-                //TODO Rate limit exceptions with facebook4j unclear http://facebook4j.org/oldjavadocs/1.1.12-2.0.0/2.0.0/index.html?facebook4j/internal/http/HttpResponseCode.html
-                // back off at all exceptions until figured out.
-                int errorCode = fe.getErrorCode();
-
-                //Some sort of rate limiting
-                if(errorCode == 17 || errorCode == 4 || errorCode == 341) {
-                    super.backOff.backOff();
-                }
-            }
+        //Some sort of rate limiting
+        if (errorCode == 17 || errorCode == 4 || errorCode == 341) {
+          super.backOff.backOff();
         }
-        throw new Exception("Failed to get data from facebook after "+MAX_ATTEMPTS);
+      }
     }
+    throw new Exception("Failed to get data from facebook after " + MAX_ATTEMPTS);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookPageFeedProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookPageFeedProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookPageFeedProvider.java
index 308b129..5d977e0 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookPageFeedProvider.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookPageFeedProvider.java
@@ -15,15 +15,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.streams.facebook.provider.pagefeed;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.Uninterruptibles;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigParseOptions;
 import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfiguration;
 import org.apache.streams.config.StreamsConfigurator;
@@ -33,6 +27,15 @@ import org.apache.streams.facebook.provider.FacebookDataCollector;
 import org.apache.streams.facebook.provider.FacebookProvider;
 import org.apache.streams.facebook.provider.page.FacebookPageProvider;
 import org.apache.streams.jackson.StreamsJacksonMapper;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Uninterruptibles;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,65 +47,71 @@ import java.util.Iterator;
 import java.util.concurrent.TimeUnit;
 
 /**
- *
+ * FacebookPageFeedProvider provides content from facebook public page.
  */
 public class FacebookPageFeedProvider extends FacebookProvider {
 
-    public static final String STREAMS_ID = "FacebookPageFeedProvider";
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(FacebookPageProvider.class);
-
-    private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
-
-    public FacebookPageFeedProvider() {
-        super();
-    }
-
-    public FacebookPageFeedProvider(FacebookConfiguration config) {
-        super(config);
-    }
-
-    @Override
-    protected FacebookDataCollector getDataCollector() {
-        return new FacebookPageFeedDataCollector(super.datums, super.configuration);
-    }
-
-    public static void main(String[] args) throws Exception {
-
-        Preconditions.checkArgument(args.length >= 2);
-
-        String configfile = args[0];
-        String outfile = args[1];
-
-        Config reference = ConfigFactory.load();
-        File conf_file = new File(configfile);
-        assert(conf_file.exists());
-        Config conf = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
-
-        Config typesafe  = conf.withFallback(reference).resolve();
-
-        StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe);
-        FacebookConfiguration config = new ComponentConfigurator<>(FacebookConfiguration.class).detectConfiguration(typesafe, "facebook");
-        FacebookPageFeedProvider provider = new FacebookPageFeedProvider(config);
-
-        PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
-        provider.prepare(config);
-        provider.startStream();
-        do {
-            Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
-            Iterator<StreamsDatum> iterator = provider.readCurrent().iterator();
-            while(iterator.hasNext()) {
-                StreamsDatum datum = iterator.next();
-                String json;
-                try {
-                    json = MAPPER.writeValueAsString(datum.getDocument());
-                    outStream.println(json);
-                } catch (JsonProcessingException e) {
-                    System.err.println(e.getMessage());
-                }
-            }
-        } while( provider.isRunning());
-        provider.cleanUp();
-        outStream.flush();
+  public static final String STREAMS_ID = "FacebookPageFeedProvider";
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(FacebookPageProvider.class);
+
+  private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+  public FacebookPageFeedProvider() {
+    super();
+  }
+
+  public FacebookPageFeedProvider(FacebookConfiguration config) {
+    super(config);
+  }
+
+  @Override
+  protected FacebookDataCollector getDataCollector() {
+    return new FacebookPageFeedDataCollector(super.datums, super.configuration);
+  }
+
+  /**
+   * Run FacebookPageFeedProvider from command line.
+   * @param args configfile outfile
+   * @throws Exception Exception
+   */
+  public static void main(String[] args) throws Exception {
+
+    Preconditions.checkArgument(args.length >= 2);
+
+    String configfile = args[0];
+    String outfile = args[1];
+
+    Config reference = ConfigFactory.load();
+    File confFile = new File(configfile);
+    assert (confFile.exists());
+    Config conf = ConfigFactory.parseFileAnySyntax(confFile, ConfigParseOptions.defaults().setAllowMissing(false));
+
+    Config typesafe  = conf.withFallback(reference).resolve();
+
+    StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe);
+    FacebookConfiguration config = new ComponentConfigurator<>(FacebookConfiguration.class).detectConfiguration(typesafe, "facebook");
+    FacebookPageFeedProvider provider = new FacebookPageFeedProvider(config);
+
+    PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
+    provider.prepare(config);
+    provider.startStream();
+    do {
+      Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
+      Iterator<StreamsDatum> iterator = provider.readCurrent().iterator();
+      while (iterator.hasNext()) {
+        StreamsDatum datum = iterator.next();
+        String json;
+        try {
+          json = MAPPER.writeValueAsString(datum.getDocument());
+          outStream.println(json);
+        } catch (JsonProcessingException ex) {
+          System.err.println(ex.getMessage());
+        }
+      }
     }
+    while ( provider.isRunning());
+    provider.cleanUp();
+    outStream.flush();
+  }
 }