You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/11/25 20:25:13 UTC

[32/42] incubator-streams git commit: STREAMS-440: custom checkstyle.xml, address compliance

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/RegexUtils.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/RegexUtils.java b/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/RegexUtils.java
index 6b4f28a..d1936d1 100644
--- a/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/RegexUtils.java
+++ b/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/RegexUtils.java
@@ -21,6 +21,7 @@ package org.apache.streams.regex;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -30,73 +31,73 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 /**
- * Provides utilities for extracting matches from content
+ * Provides utilities for extracting matches from content.
  */
 public class RegexUtils {
 
-    private static final Map<String, Pattern> patternCache = Maps.newConcurrentMap();
-    private final static Logger LOGGER = LoggerFactory.getLogger(RegexUtils.class);
+  private static final Map<String, Pattern> patternCache = Maps.newConcurrentMap();
+  private static final Logger LOGGER = LoggerFactory.getLogger(RegexUtils.class);
 
-    private RegexUtils() {}
+  private RegexUtils() {}
 
-    /**
-     * Extracts matches of the given pattern in the content and returns them as a list.
-     * @param pattern the pattern for the substring to match.  For example, [0-9]* matches 911 in Emergency number is 911.
-     * @param content the complete content to find matches in.
-     * @return a non-null list of matches.
-     */
-    public static Map<String, List<Integer>> extractMatches(String pattern, String content) {
-        return getMatches(pattern, content, -1);
-    }
+  /**
+   * Extracts matches of the given pattern in the content and returns them as a list.
+   * @param pattern the pattern for the substring to match.  For example, [0-9]* matches 911 in Emergency number is 911.
+   * @param content the complete content to find matches in.
+   * @return a non-null list of matches.
+   */
+  public static Map<String, List<Integer>> extractMatches(String pattern, String content) {
+    return getMatches(pattern, content, -1);
+  }
 
-    /**
-     * Extracts matches of the given pattern that are bounded by separation characters and returns them as a list.
-     * @param pattern the pattern for the substring to match.  For example, [0-9]* matches 911 in Emergency number is 911.
-     * @param content the complete content to find matches in.
-     * @return a non-null list of matches.
-     */
-    public static Map<String, List<Integer>> extractWordMatches(String pattern, String content) {
-        pattern = "(^|\\s)(" + pattern + ")([\\s!\\.;,?]|$)";
-        return getMatches(pattern, content, 2);
-    }
+  /**
+   * Extracts matches of the given pattern that are bounded by separation characters and returns them as a list.
+   * @param pattern the pattern for the substring to match.  For example, [0-9]* matches 911 in Emergency number is 911.
+   * @param content the complete content to find matches in.
+   * @return a non-null list of matches.
+   */
+  public static Map<String, List<Integer>> extractWordMatches(String pattern, String content) {
+    pattern = "(^|\\s)(" + pattern + ")([\\s!\\.;,?]|$)";
+    return getMatches(pattern, content, 2);
+  }
 
-    protected static Map<String, List<Integer>> getMatches(String pattern, String content, int capture) {
-        try {
-            Map<String, List<Integer>> matches = Maps.newHashMap();
-            if(content == null) {
-                return matches;
-            }
+  protected static Map<String, List<Integer>> getMatches(String pattern, String content, int capture) {
+    try {
+      Map<String, List<Integer>> matches = Maps.newHashMap();
+      if (content == null) {
+        return matches;
+      }
 
-            Matcher m = getPattern(pattern).matcher(content);
-            while (m.find()) {
-                String group = capture > 0 ? m.group(capture) : m.group();
-                if (group != null && !group.equals("")) {
-                    List<Integer> indices;
-                    if (matches.containsKey(group)) {
-                        indices = matches.get(group);
-                    } else {
-                        indices = Lists.newArrayList();
-                        matches.put(group, indices);
-                    }
-                    indices.add(m.start());
-                }
-            }
-            return matches;
-        } catch (Throwable e) {
-            LOGGER.error("Throwable process {}", e);
-            e.printStackTrace();
-            throw new RuntimeException(e);
+      Matcher matcher = getPattern(pattern).matcher(content);
+      while (matcher.find()) {
+        String group = capture > 0 ? matcher.group(capture) : matcher.group();
+        if (group != null && !group.equals("")) {
+          List<Integer> indices;
+          if (matches.containsKey(group)) {
+            indices = matches.get(group);
+          } else {
+            indices = Lists.newArrayList();
+            matches.put(group, indices);
+          }
+          indices.add(matcher.start());
         }
+      }
+      return matches;
+    } catch (Throwable ex) {
+      LOGGER.error("Throwable process {}", ex);
+      ex.printStackTrace();
+      throw new RuntimeException(ex);
     }
+  }
 
-    private static Pattern getPattern(String pattern) {
-        Pattern p;
-        if (patternCache.containsKey(pattern)) {
-            p = patternCache.get(pattern);
-        } else {
-            p = Pattern.compile(pattern);
-            patternCache.put(pattern, p);
-        }
-        return p;
+  private static Pattern getPattern(String patternString) {
+    Pattern pattern;
+    if (patternCache.containsKey(patternString)) {
+      pattern = patternCache.get(patternString);
+    } else {
+      pattern = Pattern.compile(patternString);
+      patternCache.put(patternString, pattern);
     }
+    return pattern;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexHashtagExtractorTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexHashtagExtractorTest.java b/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexHashtagExtractorTest.java
index 2de4aa8..6e17de8 100644
--- a/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexHashtagExtractorTest.java
+++ b/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexHashtagExtractorTest.java
@@ -19,7 +19,6 @@
 
 package org.apache.streams.regex;
 
-
 import com.google.common.collect.Sets;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.pojo.extensions.ExtensionUtil;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexMentionExtractorTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexMentionExtractorTest.java b/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexMentionExtractorTest.java
index c7778a8..66f7aa5 100644
--- a/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexMentionExtractorTest.java
+++ b/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexMentionExtractorTest.java
@@ -19,7 +19,6 @@
 
 package org.apache.streams.regex;
 
-
 import com.google.common.collect.Sets;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.pojo.extensions.ExtensionUtil;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexUrlExtractorTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexUrlExtractorTest.java b/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexUrlExtractorTest.java
index 344bf98..d5d8d9b 100644
--- a/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexUrlExtractorTest.java
+++ b/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexUrlExtractorTest.java
@@ -19,7 +19,6 @@
 
 package org.apache.streams.regex;
 
-
 import com.google.common.collect.Sets;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.pojo.json.Activity;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexUtilsTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexUtilsTest.java b/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexUtilsTest.java
index fc2b9f6..a156f3a 100644
--- a/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexUtilsTest.java
+++ b/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexUtilsTest.java
@@ -19,7 +19,6 @@
 
 package org.apache.streams.regex;
 
-
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -33,7 +32,6 @@ import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
 
-
 @RunWith(Parameterized.class)
 public class RegexUtilsTest {
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPageActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPageActivitySerializer.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPageActivitySerializer.java
index 7a6648a..1216c38 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPageActivitySerializer.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPageActivitySerializer.java
@@ -18,15 +18,17 @@
 
 package org.apache.streams.facebook.api;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.commons.lang.NotImplementedException;
 import org.apache.streams.data.ActivitySerializer;
 import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.facebook.Page;
 import org.apache.streams.facebook.serializer.FacebookActivityUtil;
 import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.facebook.Page;
 import org.apache.streams.pojo.json.Activity;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.commons.lang.NotImplementedException;
+
 import java.util.List;
 
 /**
@@ -35,32 +37,32 @@ import java.util.List;
  */
 public class FacebookPageActivitySerializer implements ActivitySerializer<Page> {
 
-    public static ObjectMapper mapper;
-    static {
-        mapper = StreamsJacksonMapper.getInstance();
-    }
+  public static ObjectMapper mapper;
+  static {
+    mapper = StreamsJacksonMapper.getInstance();
+  }
 
-    @Override
-    public String serializationFormat() {
-        return "facebook_post_json_v1";
-    }
+  @Override
+  public String serializationFormat() {
+    return "facebook_post_json_v1";
+  }
 
-    @Override
-    public Page serialize(Activity deserialized) throws ActivitySerializerException {
-        throw new NotImplementedException("Not currently supported by this deserializer");
-    }
+  @Override
+  public Page serialize(Activity deserialized) throws ActivitySerializerException {
+    throw new NotImplementedException("Not currently supported by this deserializer");
+  }
 
-    @Override
-    public Activity deserialize(Page page) throws ActivitySerializerException {
-        Activity activity = new Activity();
+  @Override
+  public Activity deserialize(Page page) throws ActivitySerializerException {
+    Activity activity = new Activity();
 
-        FacebookActivityUtil.updateActivity(page, activity);
+    FacebookActivityUtil.updateActivity(page, activity);
 
-        return activity;
-    }
+    return activity;
+  }
 
-    @Override
-    public List<Activity> deserializeAll(List<Page> serializedList) {
-        throw new NotImplementedException("Not currently supported by this deserializer");
-    }
+  @Override
+  public List<Activity> deserializeAll(List<Page> serializedList) {
+    throw new NotImplementedException("Not currently supported by this deserializer");
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPostActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPostActivitySerializer.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPostActivitySerializer.java
index 4326fb1..306fecc 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPostActivitySerializer.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPostActivitySerializer.java
@@ -15,16 +15,19 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.streams.facebook.api;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.commons.lang.NotImplementedException;
 import org.apache.streams.data.ActivitySerializer;
 import org.apache.streams.exceptions.ActivitySerializerException;
 import org.apache.streams.facebook.Post;
 import org.apache.streams.facebook.serializer.FacebookActivityUtil;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.Activity;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.commons.lang.NotImplementedException;
 import org.joda.time.format.DateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;
 import org.joda.time.format.ISODateTimeFormat;
@@ -33,37 +36,34 @@ import java.util.List;
 
 public class FacebookPostActivitySerializer implements ActivitySerializer<org.apache.streams.facebook.Post> {
 
-    public static final DateTimeFormatter FACEBOOK_FORMAT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ssZ");
-    public static final DateTimeFormatter ACTIVITY_FORMAT = ISODateTimeFormat.basicDateTime();
+  public static final DateTimeFormatter FACEBOOK_FORMAT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ssZ");
+  public static final DateTimeFormatter ACTIVITY_FORMAT = ISODateTimeFormat.basicDateTime();
 
-    public static final String PROVIDER_NAME = "Facebook";
+  public static final String PROVIDER_NAME = "Facebook";
 
-    public static ObjectMapper mapper;
-    static {
-        mapper = StreamsJacksonMapper.getInstance();
-    }
+  public static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
 
-    @Override
-    public String serializationFormat() {
-        return "facebook_post_json_v1";
-    }
+  @Override
+  public String serializationFormat() {
+    return "facebook_post_json_v1";
+  }
 
-    @Override
-    public Post serialize(Activity deserialized) throws ActivitySerializerException {
-        throw new NotImplementedException("Not currently supported by this deserializer");
-    }
+  @Override
+  public Post serialize(Activity deserialized) throws ActivitySerializerException {
+    throw new NotImplementedException("Not currently supported by this deserializer");
+  }
 
-    @Override
-    public Activity deserialize(Post post) throws ActivitySerializerException {
-        Activity activity = new Activity();
+  @Override
+  public Activity deserialize(Post post) throws ActivitySerializerException {
+    Activity activity = new Activity();
 
-        FacebookActivityUtil.updateActivity(post, activity);
+    FacebookActivityUtil.updateActivity(post, activity);
 
-        return activity;
-    }
+    return activity;
+  }
 
-    @Override
-    public List<Activity> deserializeAll(List<Post> serializedList) {
-        throw new NotImplementedException("Not currently supported by this deserializer");
-    }
+  @Override
+  public List<Activity> deserializeAll(List<Post> serializedList) {
+    throw new NotImplementedException("Not currently supported by this deserializer");
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/processor/FacebookTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/processor/FacebookTypeConverter.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/processor/FacebookTypeConverter.java
index 762b6c0..92cf333 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/processor/FacebookTypeConverter.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/processor/FacebookTypeConverter.java
@@ -18,12 +18,6 @@
 
 package org.apache.streams.facebook.processor;
 
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
 import org.apache.streams.exceptions.ActivitySerializerException;
@@ -34,6 +28,14 @@ import org.apache.streams.facebook.api.FacebookPostActivitySerializer;
 import org.apache.streams.facebook.provider.FacebookEventClassifier;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.Activity;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,172 +43,187 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Queue;
 
+/**
+ * FacebookTypeConverter converts facebook data to activity streams types.
+ */
 public class FacebookTypeConverter implements StreamsProcessor {
 
-    public final static String STREAMS_ID = "FacebookTypeConverter";
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(FacebookTypeConverter.class);
-
-    private ObjectMapper mapper;
-
-    private Queue<StreamsDatum> inQueue;
-    private Queue<StreamsDatum> outQueue;
-
-    private Class inClass;
-    private Class outClass;
-
-    private FacebookPostActivitySerializer facebookPostActivitySerializer;
-    private FacebookPageActivitySerializer facebookPageActivitySerializer;
-
-    private int count = 0;
-
-    public final static String TERMINATE = new String("TERMINATE");
-
-    public FacebookTypeConverter(Class inClass, Class outClass) {
-        this.inClass = inClass;
-        this.outClass = outClass;
-    }
-
-    public Queue<StreamsDatum> getProcessorOutputQueue() {
-        return outQueue;
-    }
-
-    public void setProcessorInputQueue(Queue<StreamsDatum> inputQueue) {
-        inQueue = inputQueue;
+  public static final String STREAMS_ID = "FacebookTypeConverter";
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(FacebookTypeConverter.class);
+
+  private ObjectMapper mapper;
+
+  private Queue<StreamsDatum> inQueue;
+  private Queue<StreamsDatum> outQueue;
+
+  private Class inClass;
+  private Class outClass;
+
+  private FacebookPostActivitySerializer facebookPostActivitySerializer;
+  private FacebookPageActivitySerializer facebookPageActivitySerializer;
+
+  private int count = 0;
+
+  public static final String TERMINATE = new String("TERMINATE");
+
+  public FacebookTypeConverter(Class inClass, Class outClass) {
+    this.inClass = inClass;
+    this.outClass = outClass;
+  }
+
+  public Queue<StreamsDatum> getProcessorOutputQueue() {
+    return outQueue;
+  }
+
+  public void setProcessorInputQueue(Queue<StreamsDatum> inputQueue) {
+    inQueue = inputQueue;
+  }
+
+  /**
+   * convert.
+   * @param event event
+   * @param inClass inClass
+   * @param outClass outClass
+   * @return Object
+   * @throws ActivitySerializerException ActivitySerializerException
+   * @throws JsonProcessingException JsonProcessingException
+   */
+  public Object convert(ObjectNode event, Class inClass, Class outClass) throws ActivitySerializerException, JsonProcessingException {
+
+    Object result = null;
+
+    if ( outClass.equals( Activity.class )) {
+      LOGGER.debug("ACTIVITY");
+      if (inClass.equals(Post.class)) {
+        LOGGER.debug("POST");
+        result = facebookPostActivitySerializer.deserialize(mapper.convertValue(event, Post.class));
+      } else if (inClass.equals(Page.class)) {
+        LOGGER.debug("PAGE");
+        result = facebookPageActivitySerializer.deserialize(mapper.convertValue(event, Page.class));
+      }
+    } else if ( outClass.equals( Post.class )) {
+      LOGGER.debug("POST");
+      result = mapper.convertValue(event, Post.class);
+    } else if ( outClass.equals(Page.class)) {
+      LOGGER.debug("PAGE");
+      result = mapper.convertValue(event, Page.class);
+    } else if ( outClass.equals( ObjectNode.class )) {
+      LOGGER.debug("OBJECTNODE");
+      result = mapper.convertValue(event, ObjectNode.class);
     }
 
-    public Object convert(ObjectNode event, Class inClass, Class outClass) throws ActivitySerializerException, JsonProcessingException {
-
-        Object result = null;
-
-        if( outClass.equals( Activity.class )) {
-            LOGGER.debug("ACTIVITY");
-            if(inClass.equals(Post.class)) {
-                LOGGER.debug("POST");
-                result = facebookPostActivitySerializer.deserialize(mapper.convertValue(event, Post.class));
-            } else if(inClass.equals(Page.class)) {
-                LOGGER.debug("PAGE");
-                result = facebookPageActivitySerializer.deserialize(mapper.convertValue(event, Page.class));
-            }
-        } else if( outClass.equals( Post.class )) {
-            LOGGER.debug("POST");
-            result = mapper.convertValue(event, Post.class);
-        } else if( outClass.equals(Page.class)) {
-            LOGGER.debug("PAGE");
-            result = mapper.convertValue(event, Page.class);
-        } else if( outClass.equals( ObjectNode.class )) {
-            LOGGER.debug("OBJECTNODE");
-            result = mapper.convertValue(event, ObjectNode.class);
-        }
-
-        // no supported conversion were applied
-        if( result != null ) {
-            count ++;
-            return result;
-        }
-
-        LOGGER.debug("CONVERT FAILED");
-
-        return null;
+    // no supported conversion were applied
+    if ( result != null ) {
+      count ++;
+      return result;
     }
 
-    public boolean validate(Object document, Class klass) {
-
-        // TODO
-        return true;
+    LOGGER.debug("CONVERT FAILED");
+
+    return null;
+  }
+
+  // TODO: use standard validation
+  public boolean validate(Object document, Class klass) {
+    return true;
+  }
+
+  // TODO: replace with standard validation
+  public boolean isValidJSON(final String json) {
+    boolean valid = false;
+    try {
+      final JsonParser parser = new ObjectMapper().getJsonFactory()
+          .createJsonParser(json);
+      while (parser.nextToken() != null) {
+      }
+      valid = true;
+    } catch (JsonParseException jpe) {
+      LOGGER.warn("validate: {}", jpe);
+    } catch (IOException ioe) {
+      LOGGER.warn("validate: {}", ioe);
     }
 
-    public boolean isValidJSON(final String json) {
-        boolean valid = false;
-        try {
-            final JsonParser parser = new ObjectMapper().getJsonFactory()
-                    .createJsonParser(json);
-            while (parser.nextToken() != null) {
-            }
-            valid = true;
-        } catch (JsonParseException jpe) {
-            LOGGER.warn("validate: {}", jpe);
-        } catch (IOException ioe) {
-            LOGGER.warn("validate: {}", ioe);
-        }
-
-        return valid;
-    }
+    return valid;
+  }
 
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
 
-    @Override
-    public List<StreamsDatum> process(StreamsDatum entry) {
+  @Override
+  public List<StreamsDatum> process(StreamsDatum entry) {
 
-        StreamsDatum result = null;
+    StreamsDatum result = null;
 
-        try {
-            Object item = entry.getDocument();
-            ObjectNode node;
+    try {
+      Object item = entry.getDocument();
+      ObjectNode node;
 
-            LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass());
+      LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass());
 
-            if( item instanceof String ) {
+      if ( item instanceof String ) {
 
-                // if the target is string, just pass-through
-                if( String.class.equals(outClass)) {
-                    result = entry;
-                }
-                else {
-                    // first check for valid json
-                    node = (ObjectNode)mapper.readTree((String)item);
+        // if the target is string, just pass-through
+        if ( String.class.equals(outClass)) {
+          result = entry;
+        } else {
+          // first check for valid json
+          node = (ObjectNode)mapper.readTree((String)item);
 
-                    // since data is coming from outside provider, we don't know what type the events are
-                    // for now we'll assume post
-                    Class inClass = FacebookEventClassifier.detectClass((String) item);
+          // since data is coming from outside provider, we don't know what type the events are
+          // for now we'll assume post
+          Class inClass = FacebookEventClassifier.detectClass((String) item);
 
-                    Object out = convert(node, inClass, outClass);
+          Object out = convert(node, inClass, outClass);
 
-                    if( out != null && validate(out, outClass))
-                        result = new StreamsDatum(out);
-                }
+          if ( out != null && validate(out, outClass)) {
+            result = new StreamsDatum(out);
+          }
+        }
 
-            } else if( item instanceof ObjectNode) {
+      } else if ( item instanceof ObjectNode) {
 
-                // first check for valid json
-                node = (ObjectNode)mapper.valueToTree(item);
+        // first check for valid json
+        node = (ObjectNode)mapper.valueToTree(item);
 
-                Class inClass = FacebookEventClassifier.detectClass(mapper.writeValueAsString(item));
+        Class inClass = FacebookEventClassifier.detectClass(mapper.writeValueAsString(item));
 
-                Object out = convert(node, inClass, outClass);
+        Object out = convert(node, inClass, outClass);
 
-                if( out != null && validate(out, outClass))
-                    result = new StreamsDatum(out);
-            } else if(item instanceof Post || item instanceof Page) {
-                Object out = convert(mapper.convertValue(item, ObjectNode.class), inClass, outClass);
+        if ( out != null && validate(out, outClass)) {
+          result = new StreamsDatum(out);
+        }
+      } else if (item instanceof Post || item instanceof Page) {
+        Object out = convert(mapper.convertValue(item, ObjectNode.class), inClass, outClass);
 
-                if( out != null && validate(out, outClass))
-                    result = new StreamsDatum(out);
-            }
-        }  catch (Exception e) {
-            LOGGER.error("Exception switching types : {}", e);
-            if(e instanceof InterruptedException) {
-                Thread.currentThread().interrupt();
-            }
+        if ( out != null && validate(out, outClass)) {
+          result = new StreamsDatum(out);
         }
+      }
+    } catch (Exception ex) {
+      LOGGER.error("Exception switching types : {}", ex);
+      if (ex instanceof InterruptedException) {
+        Thread.currentThread().interrupt();
+      }
+    }
 
-        if( result != null )
-            return Lists.newArrayList(result);
-        else
-            return Lists.newArrayList();
+    if ( result != null ) {
+      return Lists.newArrayList(result);
+    } else {
+      return Lists.newArrayList();
     }
+  }
 
-    @Override
-    public void prepare(Object o) {
-        mapper = StreamsJacksonMapper.getInstance();
+  @Override
+  public void prepare(Object configurationObject) {
+    mapper = StreamsJacksonMapper.getInstance();
 
-        facebookPageActivitySerializer = new FacebookPageActivitySerializer();
-        facebookPostActivitySerializer = new FacebookPostActivitySerializer();
-    }
+    facebookPageActivitySerializer = new FacebookPageActivitySerializer();
+    facebookPostActivitySerializer = new FacebookPostActivitySerializer();
+  }
 
-    @Override
-    public void cleanUp() {}
+  @Override
+  public void cleanUp() {}
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookDataCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookDataCollector.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookDataCollector.java
index 33ee9dc..617bfab 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookDataCollector.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookDataCollector.java
@@ -15,130 +15,139 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.streams.facebook.provider;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Strings;
-import facebook4j.Facebook;
-import facebook4j.FacebookFactory;
-import facebook4j.conf.ConfigurationBuilder;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.facebook.FacebookConfiguration;
 import org.apache.streams.facebook.IdConfig;
 import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
 import org.apache.streams.util.api.requests.backoff.impl.ExponentialBackOffStrategy;
 import org.apache.streams.util.oauth.tokens.tokenmanager.SimpleTokenManager;
-import org.apache.streams.util.oauth.tokens.tokenmanager.impl.BasicTokenManger;
+import org.apache.streams.util.oauth.tokens.tokenmanager.impl.BasicTokenManager;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import facebook4j.Facebook;
+import facebook4j.FacebookFactory;
+import facebook4j.conf.ConfigurationBuilder;
+
 /**
  * Abstract data collector for Facebook.  Iterates over ids and queues data to be output
  * by a {@link org.apache.streams.core.StreamsProvider}
  */
 public abstract class FacebookDataCollector implements Runnable {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(FacebookDataCollector.class);
-    private static final String READ_ONLY = "read_streams";
-
-    @VisibleForTesting
-    protected AtomicBoolean isComplete;
-    protected BackOffStrategy backOff;
-
-    private FacebookConfiguration config;
-    private BlockingQueue<StreamsDatum> queue;
-    private SimpleTokenManager<String> authTokens;
-
-
-    public FacebookDataCollector(FacebookConfiguration config, BlockingQueue<StreamsDatum> queue) {
-        this.config = config;
-        this.queue = queue;
-        this.isComplete = new AtomicBoolean(false);
-        this.backOff = new ExponentialBackOffStrategy(5);
-        this.authTokens = new BasicTokenManger<String>();
-        if(config.getUserAccessTokens() != null) {
-            for(String token : config.getUserAccessTokens()) {
-                this.authTokens.addTokenToPool(token);
-            }
-        }
-    }
+  private static final Logger LOGGER = LoggerFactory.getLogger(FacebookDataCollector.class);
+  private static final String READ_ONLY = "read_streams";
 
-    /**
-     * Returns true when the collector has finished querying facebook and has queued all data
-     * for the provider
-     * @return
-     */
-    public boolean isComplete(){
-        return this.isComplete.get();
-    }
+  @VisibleForTesting
+  protected AtomicBoolean isComplete;
+  protected BackOffStrategy backOff;
+
+  private FacebookConfiguration config;
+  private BlockingQueue<StreamsDatum> queue;
+  private SimpleTokenManager<String> authTokens;
 
-    /**
-     * Queues facebook data
-     * @param data
-     * @param id
-     */
-    protected void outputData(Object data, String id) {
-        try {
-            this.queue.put(new StreamsDatum(data, id));
-        } catch (InterruptedException ie) {
-            Thread.currentThread().interrupt();
-        }
+  /**
+   * FacebookDataCollector constructor.
+   * @param config config
+   * @param queue queue
+   */
+  public FacebookDataCollector(FacebookConfiguration config, BlockingQueue<StreamsDatum> queue) {
+    this.config = config;
+    this.queue = queue;
+    this.isComplete = new AtomicBoolean(false);
+    this.backOff = new ExponentialBackOffStrategy(5);
+    this.authTokens = new BasicTokenManager<String>();
+    if (config.getUserAccessTokens() != null) {
+      for (String token : config.getUserAccessTokens()) {
+        this.authTokens.addTokenToPool(token);
+      }
     }
+  }
 
-    /**
-     * Gets a Facebook client.  If multiple authenticated users for this app are available
-     * it will rotate through the users oauth credentials
-     * @return
-     */
-    protected Facebook getNextFacebookClient() {
-            ConfigurationBuilder cb = new ConfigurationBuilder();
-            cb.setDebugEnabled(true);
-            cb.setOAuthPermissions(READ_ONLY);
-            cb.setOAuthAppId(this.config.getOauth().getAppId());
-            cb.setOAuthAppSecret(this.config.getOauth().getAppSecret());
-            if(this.authTokens.numAvailableTokens() > 0)
-                cb.setOAuthAccessToken(this.authTokens.getNextAvailableToken());
-            else {
-                cb.setOAuthAccessToken(this.config.getOauth().getAppAccessToken());
-                LOGGER.debug("appAccessToken : {}", this.config.getOauth().getAppAccessToken());
-            }
-            cb.setJSONStoreEnabled(true);
-            if(!Strings.isNullOrEmpty(config.getVersion()))
-                cb.setRestBaseURL("https://graph.facebook.com/" + config.getVersion() + "/");
-            LOGGER.debug("appId : {}", this.config.getOauth().getAppId());
-            LOGGER.debug("appSecret: {}", this.config.getOauth().getAppSecret());
-            FacebookFactory ff = new FacebookFactory(cb.build());
-            return  ff.getInstance();
+  /**
+   * Returns true when the collector has finished querying facebook and has queued all data
+   * for the provider.
+   * @return isComplete
+   */
+  public boolean isComplete() {
+    return this.isComplete.get();
+  }
+
+  /**
+   * Queues facebook data.
+   * @param data data
+   * @param id id
+   */
+  protected void outputData(Object data, String id) {
+    try {
+      this.queue.put(new StreamsDatum(data, id));
+    } catch (InterruptedException ie) {
+      Thread.currentThread().interrupt();
     }
+  }
 
-    /**
-     * Queries facebook and queues the resulting data
-     * @param id
-     * @throws Exception
-     */
-    protected abstract void getData(IdConfig id) throws Exception;
-
-
-    @Override
-    public void run() {
-        for( IdConfig id : this.config.getIds()) {
-            try {
-                getData(id);
-            } catch (InterruptedException ie) {
-                Thread.currentThread().interrupt();
-            } catch (Exception e) {
-                LOGGER.error("Caught Exception while trying to poll data for page : {}", id);
-                LOGGER.error("Exception while getting page feed data: {}", e);
-            }
-        }
-        this.isComplete.set(true);
+  /**
+   * Gets a Facebook client.  If multiple authenticated users for this app are available
+   * it will rotate through the users oauth credentials
+   * @return client
+   */
+  protected Facebook getNextFacebookClient() {
+    ConfigurationBuilder cb = new ConfigurationBuilder();
+    cb.setDebugEnabled(true);
+    cb.setOAuthPermissions(READ_ONLY);
+    cb.setOAuthAppId(this.config.getOauth().getAppId());
+    cb.setOAuthAppSecret(this.config.getOauth().getAppSecret());
+    if (this.authTokens.numAvailableTokens() > 0) {
+      cb.setOAuthAccessToken(this.authTokens.getNextAvailableToken());
+    } else {
+      cb.setOAuthAccessToken(this.config.getOauth().getAppAccessToken());
+      LOGGER.debug("appAccessToken : {}", this.config.getOauth().getAppAccessToken());
+    }
+    cb.setJSONStoreEnabled(true);
+    if (!Strings.isNullOrEmpty(config.getVersion())) {
+      cb.setRestBaseURL("https://graph.facebook.com/" + config.getVersion() + "/");
     }
+    LOGGER.debug("appId : {}", this.config.getOauth().getAppId());
+    LOGGER.debug("appSecret: {}", this.config.getOauth().getAppSecret());
+    FacebookFactory ff = new FacebookFactory(cb.build());
+    return  ff.getInstance();
+  }
+
+  /**
+   * Queries facebook and queues the resulting data.
+   * @param id id
+   * @throws Exception Exception
+   */
+  protected abstract void getData(IdConfig id) throws Exception;
 
-    @VisibleForTesting
-    protected BlockingQueue<StreamsDatum> getQueue() {
-        return queue;
+
+  @Override
+  public void run() {
+    for ( IdConfig id : this.config.getIds()) {
+      try {
+        getData(id);
+      } catch (InterruptedException ie) {
+        Thread.currentThread().interrupt();
+      } catch (Exception ex) {
+        LOGGER.error("Caught Exception while trying to poll data for page : {}", id);
+        LOGGER.error("Exception while getting page feed data: {}", ex);
+      }
     }
+    this.isComplete.set(true);
+  }
+
+  @VisibleForTesting
+  protected BlockingQueue<StreamsDatum> getQueue() {
+    return queue;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookEventClassifier.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookEventClassifier.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookEventClassifier.java
index 16e2a25..47c2afb 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookEventClassifier.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookEventClassifier.java
@@ -18,40 +18,50 @@
 
 package org.apache.streams.facebook.provider;
 
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Preconditions;
-import org.apache.commons.lang.StringUtils;
-
-import java.io.IOException;
-
 import org.apache.streams.facebook.Page;
 import org.apache.streams.facebook.Post;
 import org.apache.streams.jackson.StreamsJacksonMapper;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
+
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+
+/**
+ * FacebookEventClassifier classifies facebook events.
+ */
 public class FacebookEventClassifier {
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(FacebookEventClassifier.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(FacebookEventClassifier.class);
 
-    public static Class detectClass( String json ) {
+  /**
+   * detectClass from json string.
+   * @param json json string
+   * @return detected Class
+   */
+  public static Class detectClass( String json ) {
 
-        Preconditions.checkNotNull(json);
-        Preconditions.checkArgument(StringUtils.isNotEmpty(json));
+    Preconditions.checkNotNull(json);
+    Preconditions.checkArgument(StringUtils.isNotEmpty(json));
 
-        ObjectNode objectNode;
-        try {
-            objectNode = (ObjectNode) StreamsJacksonMapper.getInstance().readTree(json);
-        } catch (IOException e) {
-            LOGGER.error("Exception while trying to detect class: {}", e.getMessage());
-            return null;
-        }
+    ObjectNode objectNode;
+    try {
+      objectNode = (ObjectNode) StreamsJacksonMapper.getInstance().readTree(json);
+    } catch (IOException ex) {
+      LOGGER.error("Exception while trying to detect class: {}", ex.getMessage());
+      return null;
+    }
 
-        if( objectNode.findValue("about") != null)
-            return Page.class;
-        else if( objectNode.findValue("statusType") != null )
-            return Post.class;
-        else
-            return Post.class;
+    if ( objectNode.findValue("about") != null) {
+      return Page.class;
+    } else if ( objectNode.findValue("statusType") != null ) {
+      return Post.class;
+    } else {
+      return Post.class;
     }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendFeedProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendFeedProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendFeedProvider.java
index 231ee4f..3253479 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendFeedProvider.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendFeedProvider.java
@@ -18,15 +18,6 @@
 
 package org.apache.streams.facebook.provider;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Queues;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigRenderOptions;
-import facebook4j.*;
-import facebook4j.conf.ConfigurationBuilder;
-import facebook4j.json.DataObjectFactory;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.DatumStatusCounter;
 import org.apache.streams.core.StreamsDatum;
@@ -36,253 +27,295 @@ import org.apache.streams.facebook.FacebookUserInformationConfiguration;
 import org.apache.streams.facebook.FacebookUserstreamConfiguration;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.util.ComponentUtils;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Queues;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigRenderOptions;
+
+import org.apache.commons.lang.NotImplementedException;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.commons.lang.NotImplementedException;
 
 import java.io.IOException;
 import java.io.Serializable;
 import java.math.BigInteger;
 import java.util.Iterator;
 import java.util.Queue;
-import java.util.concurrent.*;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-public class FacebookFriendFeedProvider implements StreamsProvider, Serializable
-{
-
-    public static final String STREAMS_ID = "FacebookFriendFeedProvider";
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(FacebookFriendFeedProvider.class);
-
-    private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+import facebook4j.Facebook;
+import facebook4j.FacebookException;
+import facebook4j.FacebookFactory;
+import facebook4j.Friend;
+import facebook4j.Paging;
+import facebook4j.Post;
+import facebook4j.ResponseList;
+import facebook4j.conf.ConfigurationBuilder;
+import facebook4j.json.DataObjectFactory;
 
-    private static final String ALL_PERMISSIONS = "ads_management,ads_read,create_event,create_note,email,export_stream,friends_about_me,friends_actions.books,friends_actions.music,friends_actions.news,friends_actions.video,friends_activities,friends_birthday,friends_education_history,friends_events,friends_games_activity,friends_groups,friends_hometown,friends_interests,friends_likes,friends_location,friends_notes,friends_online_presence,friends_photo_video_tags,friends_photos,friends_questions,friends_relationship_details,friends_relationships,friends_religion_politics,friends_status,friends_subscriptions,friends_videos,friends_website,friends_work_history,manage_friendlists,manage_notifications,manage_pages,photo_upload,publish_actions,publish_stream,read_friendlists,read_insights,read_mailbox,read_page_mailboxes,read_requests,read_stream,rsvp_event,share_item,sms,status_update,user_about_me,user_actions.books,user_actions.music,user_actions.news,user_actions.video,user_activitie
 s,user_birthday,user_education_history,user_events,user_friends,user_games_activity,user_groups,user_hometown,user_interests,user_likes,user_location,user_notes,user_online_presence,user_photo_video_tags,user_photos,user_questions,user_relationship_details,user_relationships,user_religion_politics,user_status,user_subscriptions,user_videos,user_website,user_work_history,video_upload,xmpp_login";
-    private FacebookUserstreamConfiguration configuration;
+public class FacebookFriendFeedProvider implements StreamsProvider, Serializable {
 
-    private Class klass;
-    protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+  public static final String STREAMS_ID = "FacebookFriendFeedProvider";
 
-    protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>();
+  private static final Logger LOGGER = LoggerFactory.getLogger(FacebookFriendFeedProvider.class);
 
-    public FacebookUserstreamConfiguration getConfig()              { return configuration; }
+  private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
 
-    public void setConfig(FacebookUserstreamConfiguration config)   { this.configuration = config; }
+  private static final String ALL_PERMISSIONS = "ads_management,ads_read,create_event,create_note,email,export_stream,friends_about_me,friends_actions.books,friends_actions.music,friends_actions.news,friends_actions.video,friends_activities,friends_birthday,friends_education_history,friends_events,friends_games_activity,friends_groups,friends_hometown,friends_interests,friends_likes,friends_location,friends_notes,friends_online_presence,friends_photo_video_tags,friends_photos,friends_questions,friends_relationship_details,friends_relationships,friends_religion_politics,friends_status,friends_subscriptions,friends_videos,friends_website,friends_work_history,manage_friendlists,manage_notifications,manage_pages,photo_upload,publish_actions,publish_stream,read_friendlists,read_insights,read_mailbox,read_page_mailboxes,read_requests,read_stream,rsvp_event,share_item,sms,status_update,user_about_me,user_actions.books,user_actions.music,user_actions.news,user_actions.video,user_activities,
 user_birthday,user_education_history,user_events,user_friends,user_games_activity,user_groups,user_hometown,user_interests,user_likes,user_location,user_notes,user_online_presence,user_photo_video_tags,user_photos,user_questions,user_relationship_details,user_relationships,user_religion_politics,user_status,user_subscriptions,user_videos,user_website,user_work_history,video_upload,xmpp_login";
+  private FacebookUserstreamConfiguration configuration;
 
-    protected Iterator<String[]> idsBatches;
+  private Class klass;
+  protected final ReadWriteLock lock = new ReentrantReadWriteLock();
 
-    protected ExecutorService executor;
+  protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>();
 
-    protected DateTime start;
-    protected DateTime end;
+  public FacebookUserstreamConfiguration getConfig() {
+    return configuration;
+  }
 
-    protected final AtomicBoolean running = new AtomicBoolean();
+  public void setConfig(FacebookUserstreamConfiguration config) {
+    this.configuration = config;
+  }
 
-    private DatumStatusCounter countersCurrent = new DatumStatusCounter();
-    private DatumStatusCounter countersTotal = new DatumStatusCounter();
+  protected Iterator<String[]> idsBatches;
 
-    private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
-        return new ThreadPoolExecutor(nThreads, nThreads,
-                5000L, TimeUnit.MILLISECONDS,
-                new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
-    }
+  protected ExecutorService executor;
 
-    public FacebookFriendFeedProvider() {
-        Config config = StreamsConfigurator.config.getConfig("facebook");
-        FacebookUserInformationConfiguration configuration;
-        try {
-            configuration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class);
-        } catch (IOException e) {
-            e.printStackTrace();
-            return;
-        }
-    }
+  protected DateTime start;
+  protected DateTime end;
 
-    public FacebookFriendFeedProvider(FacebookUserstreamConfiguration config) {
-        this.configuration = config;
-    }
+  protected final AtomicBoolean running = new AtomicBoolean();
 
-    public FacebookFriendFeedProvider(Class klass) {
-        Config config = StreamsConfigurator.config.getConfig("facebook");
-        FacebookUserInformationConfiguration configuration;
-        try {
-            configuration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class);
-        } catch (IOException e) {
-            e.printStackTrace();
-            return;
-        }
-        this.klass = klass;
-    }
+  private DatumStatusCounter countersCurrent = new DatumStatusCounter();
+  private DatumStatusCounter countersTotal = new DatumStatusCounter();
 
-    public FacebookFriendFeedProvider(FacebookUserstreamConfiguration config, Class klass) {
-        this.configuration = config;
-        this.klass = klass;
-    }
+  private static ExecutorService newFixedThreadPoolWithQueueSize(int numThreads, int queueSize) {
+    return new ThreadPoolExecutor(numThreads, numThreads,
+        5000L, TimeUnit.MILLISECONDS,
+        new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
+  }
 
-    public Queue<StreamsDatum> getProviderQueue() {
-        return this.providerQueue;
+  /**
+   * FacebookFriendFeedProvider constructor - resolves FacebookUserInformationConfiguration from JVM 'facebook'.
+   */
+  public FacebookFriendFeedProvider() {
+    Config config = StreamsConfigurator.config.getConfig("facebook");
+    FacebookUserInformationConfiguration configuration;
+    try {
+      configuration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class);
+    } catch (IOException ex) {
+      ex.printStackTrace();
+      return;
     }
-
-    @Override
-    public String getId() {
-        return STREAMS_ID;
+  }
+
+  /**
+   * FacebookFriendFeedProvider constructor - uses supplied FacebookUserInformationConfiguration.
+   */
+  public FacebookFriendFeedProvider(FacebookUserstreamConfiguration config) {
+    this.configuration = config;
+  }
+
+  /**
+   * FacebookFriendFeedProvider constructor - output supplied Class.
+   * @param klass Class
+   */
+  public FacebookFriendFeedProvider(Class klass) {
+    Config config = StreamsConfigurator.config.getConfig("facebook");
+    FacebookUserInformationConfiguration configuration;
+    try {
+      configuration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class);
+    } catch (IOException ex) {
+      ex.printStackTrace();
+      return;
     }
-
-    @Override
-    public void startStream() {
-        shutdownAndAwaitTermination(executor);
-        running.set(true);
+    this.klass = klass;
+  }
+
+  public FacebookFriendFeedProvider(FacebookUserstreamConfiguration config, Class klass) {
+    this.configuration = config;
+    this.klass = klass;
+  }
+
+  public Queue<StreamsDatum> getProviderQueue() {
+    return this.providerQueue;
+  }
+
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
+
+  @Override
+  public void startStream() {
+    shutdownAndAwaitTermination(executor);
+    running.set(true);
+  }
+
+  @Override
+  public StreamsResultSet readCurrent() {
+
+    StreamsResultSet current;
+
+    synchronized (FacebookUserstreamProvider.class) {
+      current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(providerQueue));
+      current.setCounter(new DatumStatusCounter());
+      current.getCounter().add(countersCurrent);
+      countersTotal.add(countersCurrent);
+      countersCurrent = new DatumStatusCounter();
+      providerQueue.clear();
     }
 
-    public StreamsResultSet readCurrent() {
-
-        StreamsResultSet current;
-
-        synchronized (FacebookUserstreamProvider.class) {
-            current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(providerQueue));
-            current.setCounter(new DatumStatusCounter());
-            current.getCounter().add(countersCurrent);
-            countersTotal.add(countersCurrent);
-            countersCurrent = new DatumStatusCounter();
-            providerQueue.clear();
+    return current;
+
+  }
+
+  @Override
+  public StreamsResultSet readNew(BigInteger sequence) {
+    LOGGER.debug("{} readNew", STREAMS_ID);
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public StreamsResultSet readRange(DateTime start, DateTime end) {
+    LOGGER.debug("{} readRange", STREAMS_ID);
+    this.start = start;
+    this.end = end;
+    readCurrent();
+    StreamsResultSet result = (StreamsResultSet)providerQueue.iterator();
+    return result;
+  }
+
+  @Override
+  public boolean isRunning() {
+    return running.get();
+  }
+
+  void shutdownAndAwaitTermination(ExecutorService pool) {
+    pool.shutdown(); // Disable new tasks from being submitted
+    try {
+      // Wait a while for existing tasks to terminate
+      if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+        pool.shutdownNow(); // Cancel currently executing tasks
+        // Wait a while for tasks to respond to being cancelled
+        if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+          System.err.println("Pool did not terminate");
         }
+      }
+    } catch (InterruptedException ie) {
+      // (Re-)Cancel if current thread also interrupted
+      pool.shutdownNow();
+      // Preserve interrupt status
+      Thread.currentThread().interrupt();
+    }
+  }
 
-        return current;
+  @Override
+  public void prepare(Object configurationObject) {
 
-    }
+    executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
 
-    public StreamsResultSet readNew(BigInteger sequence) {
-        LOGGER.debug("{} readNew", STREAMS_ID);
-        throw new NotImplementedException();
-    }
+    Preconditions.checkNotNull(providerQueue);
+    Preconditions.checkNotNull(this.klass);
+    Preconditions.checkNotNull(configuration.getOauth().getAppId());
+    Preconditions.checkNotNull(configuration.getOauth().getAppSecret());
+    Preconditions.checkNotNull(configuration.getOauth().getUserAccessToken());
 
-    public StreamsResultSet readRange(DateTime start, DateTime end) {
-        LOGGER.debug("{} readRange", STREAMS_ID);
-        this.start = start;
-        this.end = end;
-        readCurrent();
-        StreamsResultSet result = (StreamsResultSet)providerQueue.iterator();
-        return result;
-    }
+    Facebook client = getFacebookClient();
 
-    @Override
-    public boolean isRunning() {
-        return running.get();
-    }
+    try {
+      ResponseList<Friend> friendResponseList = client.friends().getFriends();
+      Paging<Friend> friendPaging;
+      do {
 
-    void shutdownAndAwaitTermination(ExecutorService pool) {
-        pool.shutdown(); // Disable new tasks from being submitted
-        try {
-            // Wait a while for existing tasks to terminate
-            if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
-                pool.shutdownNow(); // Cancel currently executing tasks
-                // Wait a while for tasks to respond to being cancelled
-                if (!pool.awaitTermination(10, TimeUnit.SECONDS))
-                    System.err.println("Pool did not terminate");
-            }
-        } catch (InterruptedException ie) {
-            // (Re-)Cancel if current thread also interrupted
-            pool.shutdownNow();
-            // Preserve interrupt status
-            Thread.currentThread().interrupt();
+        for ( Friend friend : friendResponseList ) {
+          executor.submit(new FacebookFriendFeedTask(this, friend.getId()));
         }
+        friendPaging = friendResponseList.getPaging();
+        friendResponseList = client.fetchNext(friendPaging);
+      }
+      while ( friendPaging != null
+              &&
+              friendResponseList != null );
+    } catch (FacebookException ex) {
+      ex.printStackTrace();
     }
 
-    @Override
-    public void prepare(Object o) {
+  }
 
-        executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
+  protected Facebook getFacebookClient() {
 
-        Preconditions.checkNotNull(providerQueue);
-        Preconditions.checkNotNull(this.klass);
-        Preconditions.checkNotNull(configuration.getOauth().getAppId());
-        Preconditions.checkNotNull(configuration.getOauth().getAppSecret());
-        Preconditions.checkNotNull(configuration.getOauth().getUserAccessToken());
+    ConfigurationBuilder cb = new ConfigurationBuilder();
+    cb.setDebugEnabled(true)
+        .setOAuthAppId(configuration.getOauth().getAppId())
+        .setOAuthAppSecret(configuration.getOauth().getAppSecret())
+        .setOAuthAccessToken(configuration.getOauth().getUserAccessToken())
+        .setOAuthPermissions(ALL_PERMISSIONS)
+        .setJSONStoreEnabled(true)
+        .setClientVersion("v1.0");
 
-        Facebook client = getFacebookClient();
+    FacebookFactory ff = new FacebookFactory(cb.build());
+    Facebook facebook = ff.getInstance();
 
-        try {
-            ResponseList<Friend> friendResponseList = client.friends().getFriends();
-            Paging<Friend> friendPaging;
-            do {
+    return facebook;
+  }
 
-                for( Friend friend : friendResponseList ) {
+  @Override
+  public void cleanUp() {
+    shutdownAndAwaitTermination(executor);
+  }
 
-                    executor.submit(new FacebookFriendFeedTask(this, friend.getId()));
-                }
-                friendPaging = friendResponseList.getPaging();
-                friendResponseList = client.fetchNext(friendPaging);
-            } while( friendPaging != null &&
-                    friendResponseList != null );
-        } catch (FacebookException e) {
-            e.printStackTrace();
-        }
+  private class FacebookFriendFeedTask implements Runnable {
 
-    }
+    FacebookFriendFeedProvider provider;
+    Facebook client;
+    String id;
 
-    protected Facebook getFacebookClient()
-    {
-        ConfigurationBuilder cb = new ConfigurationBuilder();
-        cb.setDebugEnabled(true)
-            .setOAuthAppId(configuration.getOauth().getAppId())
-            .setOAuthAppSecret(configuration.getOauth().getAppSecret())
-            .setOAuthAccessToken(configuration.getOauth().getUserAccessToken())
-            .setOAuthPermissions(ALL_PERMISSIONS)
-            .setJSONStoreEnabled(true)
-            .setClientVersion("v1.0");
-
-        FacebookFactory ff = new FacebookFactory(cb.build());
-        Facebook facebook = ff.getInstance();
-
-        return facebook;
+    public FacebookFriendFeedTask(FacebookFriendFeedProvider provider, String id) {
+      this.provider = provider;
+      this.id = id;
     }
 
     @Override
-    public void cleanUp() {
-        shutdownAndAwaitTermination(executor);
-    }
-
-    private class FacebookFriendFeedTask implements Runnable {
-
-        FacebookFriendFeedProvider provider;
-        Facebook client;
-        String id;
-
-        public FacebookFriendFeedTask(FacebookFriendFeedProvider provider, String id) {
-            this.provider = provider;
-            this.id = id;
+    public void run() {
+      client = provider.getFacebookClient();
+      try {
+        ResponseList<Post> postResponseList = client.getFeed(id);
+        Paging<Post> postPaging;
+        do {
+
+          for (Post item : postResponseList) {
+            String json = DataObjectFactory.getRawJSON(item);
+            org.apache.streams.facebook.Post post = mapper.readValue(json, org.apache.streams.facebook.Post.class);
+            try {
+              lock.readLock().lock();
+              ComponentUtils.offerUntilSuccess(new StreamsDatum(post), providerQueue);
+              countersCurrent.incrementAttempt();
+            } finally {
+              lock.readLock().unlock();
+            }
+          }
+          postPaging = postResponseList.getPaging();
+          postResponseList = client.fetchNext(postPaging);
         }
+        while ( postPaging != null
+                &&
+                postResponseList != null );
 
-        @Override
-        public void run() {
-            client = provider.getFacebookClient();
-                try {
-                    ResponseList<Post> postResponseList = client.getFeed(id);
-                    Paging<Post> postPaging;
-                    do {
-
-                        for (Post item : postResponseList) {
-                            String json = DataObjectFactory.getRawJSON(item);
-                            org.apache.streams.facebook.Post post = mapper.readValue(json, org.apache.streams.facebook.Post.class);
-                            try {
-                                lock.readLock().lock();
-                                ComponentUtils.offerUntilSuccess(new StreamsDatum(post), providerQueue);
-                                countersCurrent.incrementAttempt();
-                            } finally {
-                                lock.readLock().unlock();
-                            }
-                        }
-                        postPaging = postResponseList.getPaging();
-                        postResponseList = client.fetchNext(postPaging);
-                    } while( postPaging != null &&
-                            postResponseList != null );
-
-                } catch (Exception e) {
-                    e.printStackTrace();
-                }
-        }
+      } catch (Exception ex) {
+        ex.printStackTrace();
+      }
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendUpdatesProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendUpdatesProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendUpdatesProvider.java
index cda868e..50ac64a 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendUpdatesProvider.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendUpdatesProvider.java
@@ -18,15 +18,6 @@
 
 package org.apache.streams.facebook.provider;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigRenderOptions;
-import facebook4j.*;
-import facebook4j.conf.ConfigurationBuilder;
-import facebook4j.json.DataObjectFactory;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.DatumStatusCounter;
 import org.apache.streams.core.StreamsDatum;
@@ -36,10 +27,18 @@ import org.apache.streams.facebook.FacebookUserInformationConfiguration;
 import org.apache.streams.facebook.FacebookUserstreamConfiguration;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.util.ComponentUtils;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigRenderOptions;
+
+import org.apache.commons.lang.NotImplementedException;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.commons.lang.NotImplementedException;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -47,246 +46,290 @@ import java.math.BigInteger;
 import java.util.Iterator;
 import java.util.Queue;
 import java.util.Set;
-import java.util.concurrent.*;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-public class FacebookFriendUpdatesProvider implements StreamsProvider, Serializable
-{
-
-    public static final String STREAMS_ID = "FacebookFriendPostsProvider";
+import facebook4j.Facebook;
+import facebook4j.FacebookException;
+import facebook4j.FacebookFactory;
+import facebook4j.Friend;
+import facebook4j.Paging;
+import facebook4j.Post;
+import facebook4j.ResponseList;
+import facebook4j.conf.ConfigurationBuilder;
+import facebook4j.json.DataObjectFactory;
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(FacebookFriendUpdatesProvider.class);
+/**
+ * FacebookFriendUpdatesProvider provides updates from friend feed.
+ */
+public class FacebookFriendUpdatesProvider implements StreamsProvider, Serializable {
 
-    private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+  public static final String STREAMS_ID = "FacebookFriendPostsProvider";
 
-    private static final String ALL_PERMISSIONS = "ads_management,ads_read,create_event,create_note,email,export_stream,friends_about_me,friends_actions.books,friends_actions.music,friends_actions.news,friends_actions.video,friends_activities,friends_birthday,friends_education_history,friends_events,friends_games_activity,friends_groups,friends_hometown,friends_interests,friends_likes,friends_location,friends_notes,friends_online_presence,friends_photo_video_tags,friends_photos,friends_questions,friends_relationship_details,friends_relationships,friends_religion_politics,friends_status,friends_subscriptions,friends_videos,friends_website,friends_work_history,manage_friendlists,manage_notifications,manage_pages,photo_upload,publish_actions,publish_stream,read_friendlists,read_insights,read_mailbox,read_page_mailboxes,read_requests,read_stream,rsvp_event,share_item,sms,status_update,user_about_me,user_actions.books,user_actions.music,user_actions.news,user_actions.video,user_activitie
 s,user_birthday,user_education_history,user_events,user_friends,user_games_activity,user_groups,user_hometown,user_interests,user_likes,user_location,user_notes,user_online_presence,user_photo_video_tags,user_photos,user_questions,user_relationship_details,user_relationships,user_religion_politics,user_status,user_subscriptions,user_videos,user_website,user_work_history,video_upload,xmpp_login";
-    private FacebookUserstreamConfiguration configuration;
+  private static final Logger LOGGER = LoggerFactory.getLogger(FacebookFriendUpdatesProvider.class);
 
-    private Class klass;
-    protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+  private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
 
-    protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>();
+  private static final String ALL_PERMISSIONS =
+      "ads_management,ads_read,create_event,create_note,email,export_stream,friends_about_me,friends_actions.books,friends_actions.music,friends_actions.news,friends_actions.video,friends_activities,friends_birthday,friends_education_history,friends_events,friends_games_activity,friends_groups,friends_hometown,friends_interests,friends_likes,friends_location,friends_notes,friends_online_presence,friends_photo_video_tags,friends_photos,friends_questions,friends_relationship_details,friends_relationships,friends_religion_politics,friends_status,friends_subscriptions,friends_videos,friends_website,friends_work_history,manage_friendlists,manage_notifications,manage_pages,photo_upload,publish_actions,publish_stream,read_friendlists,read_insights,read_mailbox,read_page_mailboxes,read_requests,read_stream,rsvp_event,share_item,sms,status_update,user_about_me,user_actions.books,user_actions.music,user_actions.news,user_actions.video,user_activities,user_birthday,user_education_history,user_
 events,user_friends,user_games_activity,user_groups,user_hometown,user_interests,user_likes,user_location,user_notes,user_online_presence,user_photo_video_tags,user_photos,user_questions,user_relationship_details,user_relationships,user_religion_politics,user_status,user_subscriptions,user_videos,user_website,user_work_history,video_upload,xmpp_login";
 
-    public FacebookUserstreamConfiguration getConfig()              { return configuration; }
+  private FacebookUserstreamConfiguration configuration;
 
-    public void setConfig(FacebookUserstreamConfiguration config)   { this.configuration = config; }
+  private Class klass;
+  protected final ReadWriteLock lock = new ReentrantReadWriteLock();
 
-    protected Iterator<String[]> idsBatches;
+  protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>();
 
-    protected ExecutorService executor;
+  public FacebookUserstreamConfiguration getConfig() {
+    return configuration;
+  }
 
-    protected DateTime start;
-    protected DateTime end;
+  public void setConfig(FacebookUserstreamConfiguration config) {
+    this.configuration = config;
+  }
 
-    protected final AtomicBoolean running = new AtomicBoolean();
+  protected Iterator<String[]> idsBatches;
 
-    private DatumStatusCounter countersCurrent = new DatumStatusCounter();
-    private DatumStatusCounter countersTotal = new DatumStatusCounter();
+  protected ExecutorService executor;
 
-    private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
-        return new ThreadPoolExecutor(nThreads, nThreads,
-                5000L, TimeUnit.MILLISECONDS,
-                new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
-    }
+  protected DateTime start;
+  protected DateTime end;
 
-    public FacebookFriendUpdatesProvider() {
-        Config config = StreamsConfigurator.config.getConfig("facebook");
-        FacebookUserInformationConfiguration configuration;
-        try {
-            configuration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class);
-        } catch (IOException e) {
-            e.printStackTrace();
-            return;
-        }
-    }
+  protected final AtomicBoolean running = new AtomicBoolean();
 
-    public FacebookFriendUpdatesProvider(FacebookUserstreamConfiguration config) {
-        this.configuration = config;
-    }
+  private DatumStatusCounter countersCurrent = new DatumStatusCounter();
+  private DatumStatusCounter countersTotal = new DatumStatusCounter();
 
-    public FacebookFriendUpdatesProvider(Class klass) {
-        Config config = StreamsConfigurator.config.getConfig("facebook");
-        FacebookUserInformationConfiguration configuration;
-        try {
-            configuration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class);
-        } catch (IOException e) {
-            e.printStackTrace();
-            return;
-        }
-        this.klass = klass;
-    }
+  // TODO: factor this out.
+  private static ExecutorService newFixedThreadPoolWithQueueSize(int numThreads, int queueSize) {
+    return new ThreadPoolExecutor(numThreads, numThreads,
+        5000L, TimeUnit.MILLISECONDS,
+        new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
+  }
 
-    public FacebookFriendUpdatesProvider(FacebookUserstreamConfiguration config, Class klass) {
-        this.configuration = config;
-        this.klass = klass;
+  /**
+   * FacebookFriendUpdatesProvider constructor - resolves FacebookUserInformationConfiguration from JVM 'facebook'.
+   */
+  public FacebookFriendUpdatesProvider() {
+    Config config = StreamsConfigurator.config.getConfig("facebook");
+    FacebookUserInformationConfiguration configuration;
+    try {
+      configuration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class);
+    } catch (IOException ex) {
+      ex.printStackTrace();
+      return;
     }
-
-    public Queue<StreamsDatum> getProviderQueue() {
-        return this.providerQueue;
+  }
+
+  /**
+   * FacebookFriendUpdatesProvider constructor - uses supplied FacebookUserstreamConfiguration.
+   */
+  public FacebookFriendUpdatesProvider(FacebookUserstreamConfiguration config) {
+    this.configuration = config;
+  }
+
+  /**
+   * FacebookFriendUpdatesProvider constructor.
+   * uses supplied output Class.
+   * resolves FacebookUserInformationConfiguration from JVM 'facebook.
+   */
+  public FacebookFriendUpdatesProvider(Class klass) {
+    Config config = StreamsConfigurator.config.getConfig("facebook");
+    FacebookUserInformationConfiguration configuration;
+    try {
+      configuration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class);
+    } catch (IOException ex) {
+      ex.printStackTrace();
+      return;
     }
-
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
-
-    @Override
-    public void startStream() {
-        running.set(true);
+    this.klass = klass;
+  }
+
+  /**
+   * FacebookFriendUpdatesProvider constructor.
+   * uses supplied FacebookUserstreamConfiguration.
+   * uses supplied output Class.
+   */
+  public FacebookFriendUpdatesProvider(FacebookUserstreamConfiguration config, Class klass) {
+    this.configuration = config;
+    this.klass = klass;
+  }
+
+  public Queue<StreamsDatum> getProviderQueue() {
+    return this.providerQueue;
+  }
+
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
+
+  @Override
+  public void startStream() {
+    running.set(true);
+  }
+
+  @Override
+  public StreamsResultSet readCurrent() {
+
+    Preconditions.checkArgument(idsBatches.hasNext());
+
+    LOGGER.info("readCurrent");
+
+    // return stuff
+
+    LOGGER.info("Finished.  Cleaning up...");
+
+    LOGGER.info("Providing {} docs", providerQueue.size());
+
+    StreamsResultSet result =  new StreamsResultSet(providerQueue);
+    running.set(false);
+
+    LOGGER.info("Exiting");
+
+    return result;
+
+  }
+
+  @Override
+  public StreamsResultSet readNew(BigInteger sequence) {
+    LOGGER.debug("{} readNew", STREAMS_ID);
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public StreamsResultSet readRange(DateTime start, DateTime end) {
+    LOGGER.debug("{} readRange", STREAMS_ID);
+    this.start = start;
+    this.end = end;
+    readCurrent();
+    StreamsResultSet result = (StreamsResultSet)providerQueue.iterator();
+    return result;
+  }
+
+  @Override
+  public boolean isRunning() {
+    return running.get();
+  }
+
+  void shutdownAndAwaitTermination(ExecutorService pool) {
+    pool.shutdown(); // Disable new tasks from being submitted
+    try {
+      // Wait a while for existing tasks to terminate
+      if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+        pool.shutdownNow(); // Cancel currently executing tasks
+        // Wait a while for tasks to respond to being cancelled
+        if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+          System.err.println("Pool did not terminate");
+        }
+      }
+    } catch (InterruptedException ie) {
+      // (Re-)Cancel if current thread also interrupted
+      pool.shutdownNow();
+      // Preserve interrupt status
+      Thread.currentThread().interrupt();
     }
+  }
 
-    public StreamsResultSet readCurrent() {
-
-        Preconditions.checkArgument(idsBatches.hasNext());
-
-        LOGGER.info("readCurrent");
-
-        // return stuff
+  @Override
+  public void prepare(Object configurationObject) {
 
-        LOGGER.info("Finished.  Cleaning up...");
+    executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
 
-        LOGGER.info("Providing {} docs", providerQueue.size());
+    Preconditions.checkNotNull(providerQueue);
+    Preconditions.checkNotNull(this.klass);
+    Preconditions.checkNotNull(configuration.getOauth().getAppId());
+    Preconditions.checkNotNull(configuration.getOauth().getAppSecret());
+    Preconditions.checkNotNull(configuration.getOauth().getUserAccessToken());
 
-        StreamsResultSet result =  new StreamsResultSet(providerQueue);
-        running.set(false);
+    Facebook client = getFacebookClient();
 
-        LOGGER.info("Exiting");
-
-        return result;
+    try {
+      ResponseList<Friend> friendResponseList = client.friends().getFriends();
+      Paging<Friend> friendPaging;
+      do {
 
+        for ( Friend friend : friendResponseList ) {
+          // client.rawAPI().callPostAPI();
+          // add a subscription
+        }
+        friendPaging = friendResponseList.getPaging();
+        friendResponseList = client.fetchNext(friendPaging);
+      }
+      while ( friendPaging != null
+              &&
+              friendResponseList != null );
+    } catch (FacebookException ex) {
+      ex.printStackTrace();
     }
 
-    public StreamsResultSet readNew(BigInteger sequence) {
-        LOGGER.debug("{} readNew", STREAMS_ID);
-        throw new NotImplementedException();
-    }
+  }
 
-    public StreamsResultSet readRange(DateTime start, DateTime end) {
-        LOGGER.debug("{} readRange", STREAMS_ID);
-        this.start = start;
-        this.end = end;
-        readCurrent();
-        StreamsResultSet result = (StreamsResultSet)providerQueue.iterator();
-        return result;
-    }
+  protected Facebook getFacebookClient() {
 
-    @Override
-    public boolean isRunning() {
-        return running.get();
-    }
+    ConfigurationBuilder cb = new ConfigurationBuilder();
+    cb.setDebugEnabled(true)
+        .setOAuthAppId(configuration.getOauth().getAppId())
+        .setOAuthAppSecret(configuration.getOauth().getAppSecret())
+        .setOAuthAccessToken(configuration.getOauth().getUserAccessToken())
+        .setOAuthPermissions(ALL_PERMISSIONS)
+        .setJSONStoreEnabled(true)
+        .setClientVersion("v1.0");
 
-    void shutdownAndAwaitTermination(ExecutorService pool) {
-        pool.shutdown(); // Disable new tasks from being submitted
-        try {
-            // Wait a while for existing tasks to terminate
-            if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
-                pool.shutdownNow(); // Cancel currently executing tasks
-                // Wait a while for tasks to respond to being cancelled
-                if (!pool.awaitTermination(10, TimeUnit.SECONDS))
-                    System.err.println("Pool did not terminate");
-            }
-        } catch (InterruptedException ie) {
-            // (Re-)Cancel if current thread also interrupted
-            pool.shutdownNow();
-            // Preserve interrupt status
-            Thread.currentThread().interrupt();
-        }
-    }
-
-    @Override
-    public void prepare(Object o) {
+    FacebookFactory ff = new FacebookFactory(cb.build());
+    Facebook facebook = ff.getInstance();
 
-        executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
+    return facebook;
+  }
 
-        Preconditions.checkNotNull(providerQueue);
-        Preconditions.checkNotNull(this.klass);
-        Preconditions.checkNotNull(configuration.getOauth().getAppId());
-        Preconditions.checkNotNull(configuration.getOauth().getAppSecret());
-        Preconditions.checkNotNull(configuration.getOauth().getUserAccessToken());
+  @Override
+  public void cleanUp() {
+    shutdownAndAwaitTermination(executor);
+  }
 
-        Facebook client = getFacebookClient();
+  private class FacebookFeedPollingTask implements Runnable {
 
-        try {
-            ResponseList<Friend> friendResponseList = client.friends().getFriends();
-            Paging<Friend> friendPaging;
-            do {
-
-                for( Friend friend : friendResponseList ) {
-
-                    //client.rawAPI().callPostAPI();
-                    // add a subscription
-                }
-                friendPaging = friendResponseList.getPaging();
-                friendResponseList = client.fetchNext(friendPaging);
-            } while( friendPaging != null &&
-                    friendResponseList != null );
-        } catch (FacebookException e) {
-            e.printStackTrace();
-        }
+    FacebookUserstreamProvider provider;
+    Facebook client;
 
-    }
+    private Set<Post> priorPollResult = Sets.newHashSet();
 
-    protected Facebook getFacebookClient()
-    {
-        ConfigurationBuilder cb = new ConfigurationBuilder();
-        cb.setDebugEnabled(true)
-            .setOAuthAppId(configuration.getOauth().getAppId())
-            .setOAuthAppSecret(configuration.getOauth().getAppSecret())
-            .setOAuthAccessToken(configuration.getOauth().getUserAccessToken())
-            .setOAuthPermissions(ALL_PERMISSIONS)
-            .setJSONStoreEnabled(true)
-            .setClientVersion("v1.0");
-
-        FacebookFactory ff = new FacebookFactory(cb.build());
-        Facebook facebook = ff.getInstance();
-
-        return facebook;
+    public FacebookFeedPollingTask(FacebookUserstreamProvider facebookUserstreamProvider) {
+      provider = facebookUserstreamProvider;
     }
 
     @Override
-    public void cleanUp() {
-        shutdownAndAwaitTermination(executor);
-    }
-
-    private class FacebookFeedPollingTask implements Runnable {
-
-        FacebookUserstreamProvider provider;
-        Facebook client;
-
-        private Set<Post> priorPollResult = Sets.newHashSet();
-
-        public FacebookFeedPollingTask(FacebookUserstreamProvider facebookUserstreamProvider) {
-            provider = facebookUserstreamProvider;
-        }
-
-        @Override
-        public void run() {
-            client = provider.getFacebookClient();
-            while (provider.isRunning()) {
-                try {
-                    ResponseList<Post> postResponseList = client.getHome();
-                    Set<Post> update = Sets.newHashSet(postResponseList);
-                    Set<Post> repeats = Sets.intersection(priorPollResult, Sets.newHashSet(update));
-                    Set<Post> entrySet = Sets.difference(update, repeats);
-                    for (Post item : entrySet) {
-                        String json = DataObjectFactory.getRawJSON(item);
-                        org.apache.streams.facebook.Post post = mapper.readValue(json, org.apache.streams.facebook.Post.class);
-                        try {
-                            lock.readLock().lock();
-                            ComponentUtils.offerUntilSuccess(new StreamsDatum(post), providerQueue);
-                            countersCurrent.incrementAttempt();
-                        } finally {
-                            lock.readLock().unlock();
-                        }
-                    }
-                    priorPollResult = update;
-                    Thread.sleep(configuration.getPollIntervalMillis());
-                } catch (Exception e) {
-                    e.printStackTrace();
-                }
+    public void run() {
+      client = provider.getFacebookClient();
+      while (provider.isRunning()) {
+        try {
+          ResponseList<Post> postResponseList = client.getHome();
+          Set<Post> update = Sets.newHashSet(postResponseList);
+          Set<Post> repeats = Sets.intersection(priorPollResult, Sets.newHashSet(update));
+          Set<Post> entrySet = Sets.difference(update, repeats);
+          for (Post item : entrySet) {
+            String json = DataObjectFactory.getRawJSON(item);
+            org.apache.streams.facebook.Post post = mapper.readValue(json, org.apache.streams.facebook.Post.class);
+            try {
+              lock.readLock().lock();
+              ComponentUtils.offerUntilSuccess(new StreamsDatum(post), providerQueue);
+              countersCurrent.incrementAttempt();
+            } finally {
+              lock.readLock().unlock();
             }
+          }
+          priorPollResult = update;
+          Thread.sleep(configuration.getPollIntervalMillis());
+        } catch (Exception ex) {
+          ex.printStackTrace();
         }
+      }
     }
+  }
 }