You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/08/12 00:53:13 UTC

[13/15] updated packages

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dc432af2/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPostActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPostActivitySerializer.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPostActivitySerializer.java
new file mode 100644
index 0000000..f1687ea
--- /dev/null
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPostActivitySerializer.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.facebook.api;
+
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.facebook.Post;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.*;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.ISODateTimeFormat;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.streams.data.util.ActivityUtil.*;
+
+/**
+ * Serializes activity posts
+ *   sblackmon: This class needs a rewrite
+ */
+public class FacebookPostActivitySerializer implements ActivitySerializer<org.apache.streams.facebook.Post> {
+
+    public static final DateTimeFormatter FACEBOOK_FORMAT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ssZ");
+    public static final DateTimeFormatter ACTIVITY_FORMAT = ISODateTimeFormat.basicDateTime();
+
+    public static final String PROVIDER_NAME = "facebook";
+
+    public static ObjectMapper mapper;
+    static {
+        mapper = StreamsJacksonMapper.getInstance();
+    }
+
+    @Override
+    public String serializationFormat() {
+        return "facebook_post_json_v1";
+    }
+
+    @Override
+    public Post serialize(Activity deserialized) throws ActivitySerializerException {
+        throw new NotImplementedException("Not currently supported by this deserializer");
+    }
+
+    @Override
+    public Activity deserialize(Post post) throws ActivitySerializerException {
+        Activity activity = new Activity();
+        activity.setPublished(post.getCreatedTime());
+        activity.setUpdated(post.getUpdatedTime());
+        addActor(activity, mapper.convertValue(post.getFrom(), ObjectNode.class));
+        setProvider(activity);
+        setObjectType(post.getType(), activity);
+        parseObject(activity, mapper.convertValue(post, ObjectNode.class));
+        fixObjectId(activity);
+        fixContentFromSummary(activity);
+        return activity;
+    }
+
+    @Override
+    public List<Activity> deserializeAll(List<Post> serializedList) {
+        throw new NotImplementedException("Not currently supported by this deserializer");
+    }
+
+    private void fixContentFromSummary(Activity activity) {
+        //we MUST have a content field set, so choose the best option
+        if(activity.getContent() == null) {
+            activity.setContent(activity.getAdditionalProperties().containsKey("summary") ?
+                    (String) activity.getAdditionalProperties().get("summary") :
+                    activity.getObject().getSummary());
+        }
+    }
+
+    private void fixObjectId(Activity activity) {
+        //An artifact of schema generation, the default value is {link}
+        if(activity.getObject().getId().equals("{link}")) {
+            activity.getObject().setId(null);
+        }
+    }
+
+    private void setObjectType(String type, Activity activity) {
+        ActivityObject object = new ActivityObject();
+        activity.setObject(object);
+        object.setObjectType(type);
+    }
+
+    private void setProvider(Activity activity) {
+        Provider provider = new Provider();
+        provider.setId(getProviderId(PROVIDER_NAME));
+        provider.setDisplayName(PROVIDER_NAME);
+        activity.setProvider(provider);
+    }
+
+    private String getObjectType(JsonNode node) {
+        Iterator<Map.Entry<String, JsonNode>> fields = node.fields();
+        ensureMoreFields(fields);
+        Map.Entry<String, JsonNode> field = fields.next();
+        //ensureNoMoreFields(fields);
+        return node.asText();
+    }
+
+    private void parseObject(Activity activity, JsonNode jsonNode) throws ActivitySerializerException {
+        for(Iterator<Map.Entry<String, JsonNode>> fields = jsonNode.fields(); fields.hasNext();) {
+            Map.Entry<String, JsonNode> field = fields.next();
+            String key = field.getKey();
+            JsonNode value = field.getValue();
+            mapField(activity, key, value);
+        }
+    }
+
+    private void mapField(Activity activity, String name, JsonNode value) throws ActivitySerializerException {
+        if("application".equals(name)) {
+            addGenerator(activity, value);
+        } else if ("caption".equals(name)) {
+            addSummary(activity, value);
+        } else if ("comments".equals(name)) {
+            addAttachments(activity, value);
+        } else if ("description".equals(name)) {
+            addObjectSummary(activity, value);
+        } else if ("from".equals(name)) {
+            addActor(activity, value);
+        } else if ("icon".equals(name)) {
+            addIcon(activity, value);
+        } else if ("id".equals(name)) {
+            addId(activity, value);
+        } else if ("is_hidden".equals(name)) {
+            addObjectHiddenExtension(activity, value);
+        } else if ("like_count".equals(name)) {
+            addLikeExtension(activity, value);
+        } else if ("link".equals(name)) {
+            addObjectLink(activity, value);
+        } else if ("message".equals(name)) {
+            activity.setContent(value.asText());
+        } else if ("name".equals(name)) {
+            addObjectName(activity, value);
+        } else if ("object_id".equals(name)) {
+            addObjectId(activity, value);
+        } else if ("picture".equals(name)) {
+            addObjectImage(activity, value);
+        } else if ("place".equals(name)) {
+            addLocationExtension(activity, value);
+        } else if ("shares".equals(name)) {
+            addRebroadcastExtension(activity, value);
+        } else if ("source".equals(name)) {
+            addObjectLink(activity, value);
+        } else if ("story".equals(name)) {
+            addTitle(activity, value);
+        }
+    }
+
+    private void addSummary(Activity activity, JsonNode value) {
+        activity.setAdditionalProperty("summary", value.asText());
+    }
+
+    private void addTitle(Activity activity, JsonNode value) {
+        activity.setTitle(value.asText());
+    }
+
+    private void addLikeExtension(Activity activity, JsonNode value) {
+        Map<String, Object> extensions = ensureExtensions(activity);
+        extensions.put(LIKES_EXTENSION, value.asInt());
+    }
+
+    private void addLocationExtension(Activity activity, JsonNode value) {
+        Map<String, Object> extensions = ensureExtensions(activity);
+        if(value.has("location")) {
+            Map<String, Object> location = new HashMap<String, Object>();
+            JsonNode fbLocation = value.get("location");
+            if(fbLocation.has("country")) {
+                location.put(LOCATION_EXTENSION_COUNTRY, fbLocation.get("country"));
+            }
+            if(fbLocation.has("latitude") && fbLocation.has("longitude")) {
+                location.put(LOCATION_EXTENSION_COORDINATES, String.format("%s,%s", fbLocation.get("longitude"), fbLocation.get("latitude")));
+            }
+            extensions.put(LOCATION_EXTENSION, location);
+        }
+    }
+
+    private void addObjectImage(Activity activity, JsonNode value) {
+        Image image = new Image();
+        image.setUrl(value.asText());
+        activity.getObject().setImage(image);
+    }
+
+    private void addObjectId(Activity activity, JsonNode value) {
+        activity.getObject().setId(getObjectId("facebook", activity.getObject().getObjectType(), value.asText()));
+    }
+
+    private void addObjectName(Activity activity, JsonNode value) {
+        activity.getObject().setDisplayName(value.asText());
+    }
+
+    private void addId(Activity activity, JsonNode value) {
+        activity.setId(getActivityId(PROVIDER_NAME, value.asText()));
+    }
+
+    private void addObjectLink(Activity activity, JsonNode value) {
+        activity.getObject().setUrl(value.asText());
+    }
+
+    private void addRebroadcastExtension(Activity activity, JsonNode value) {
+        Map<String, Object> extensions = ensureExtensions(activity);
+        if(value.has("count")) {
+            extensions.put(REBROADCAST_EXTENSION, value.get("count").asInt());
+        }
+    }
+
+    private void addObjectHiddenExtension(Activity activity, JsonNode value) {
+        Map<String, Object> extensions = ensureExtensions(activity);
+        extensions.put("hidden", value.asBoolean());
+    }
+
+    private void addIcon(Activity activity, JsonNode value) {
+        Icon icon = new Icon();
+        //Apparently the Icon didn't map from the schema very well
+        icon.setAdditionalProperty("url", value.asText());
+        activity.setIcon(icon);
+    }
+
+    private void addActor(Activity activity, JsonNode value) {
+        Actor actor = new Actor();
+        if(value.has("name")) {
+            actor.setDisplayName(value.get("name").asText());
+        }
+        if(value.has("id")) {
+            actor.setId(getPersonId(PROVIDER_NAME, value.get("id").asText()));
+        }
+        activity.setActor(actor);
+    }
+
+    private void addObjectSummary(Activity activity, JsonNode value) {
+        activity.getObject().setSummary(value.asText());
+    }
+
+    private void addGenerator(Activity activity, JsonNode value) {
+        Generator generator = new Generator();
+        if(value.has("id")) {
+            generator.setId(getObjectId(PROVIDER_NAME, "generator", value.get("id").asText()));
+        }
+        if(value.has("name")) {
+            generator.setDisplayName(value.get("name").asText());
+        }
+        if(value.has("namespace")) {
+            generator.setSummary(value.get("namespace").asText());
+        }
+        activity.setGenerator(generator);
+    }
+
+    private void addAttachments(Activity activity, JsonNode value) {
+        //No direct mapping at this time
+    }
+
+    private static void ensureMoreFields(Iterator<Map.Entry<String, JsonNode>> fields) {
+        if(!fields.hasNext()) {
+            throw new IllegalStateException("Facebook activity must have one and only one root element");
+        }
+    }
+    private static void ensureNoMoreFields(Iterator<Map.Entry<String, JsonNode>> fields) {
+        if(fields.hasNext()) {
+            throw new IllegalStateException("Facebook activity must have one and only one root element");
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dc432af2/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/feed/FacebookPublicFeedXmlActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/feed/FacebookPublicFeedXmlActivitySerializer.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/feed/FacebookPublicFeedXmlActivitySerializer.java
new file mode 100644
index 0000000..a44f982
--- /dev/null
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/feed/FacebookPublicFeedXmlActivitySerializer.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.facebook.feed;
+
+/**
+ * Created with IntelliJ IDEA.
+ * User: sblackmon
+ * Date: 10/2/13
+ * Time: 6:32 PM
+ * To change this template use File | Settings | File Templates.
+ */
+public class FacebookPublicFeedXmlActivitySerializer {
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dc432af2/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/processor/FacebookTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/processor/FacebookTypeConverter.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/processor/FacebookTypeConverter.java
new file mode 100644
index 0000000..2c6fd8e
--- /dev/null
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/processor/FacebookTypeConverter.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.facebook.processor;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.facebook.Post;
+import org.apache.streams.facebook.api.FacebookPostActivitySerializer;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Queue;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class FacebookTypeConverter implements StreamsProcessor {
+
+    public final static String STREAMS_ID = "TwitterTypeConverter";
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(FacebookTypeConverter.class);
+
+    private ObjectMapper mapper;
+
+    private Queue<StreamsDatum> inQueue;
+    private Queue<StreamsDatum> outQueue;
+
+    private Class inClass;
+    private Class outClass;
+
+    private FacebookPostActivitySerializer facebookPostActivitySerializer;
+
+    private int count = 0;
+
+    public final static String TERMINATE = new String("TERMINATE");
+
+    public FacebookTypeConverter(Class inClass, Class outClass) {
+        this.inClass = inClass;
+        this.outClass = outClass;
+    }
+
+    public Queue<StreamsDatum> getProcessorOutputQueue() {
+        return outQueue;
+    }
+
+    public void setProcessorInputQueue(Queue<StreamsDatum> inputQueue) {
+        inQueue = inputQueue;
+    }
+
+    public Object convert(ObjectNode event, Class inClass, Class outClass) throws ActivitySerializerException, JsonProcessingException {
+
+        Object result = null;
+
+        if( outClass.equals( Activity.class )) {
+            LOGGER.debug("ACTIVITY");
+            result = facebookPostActivitySerializer.deserialize(mapper.convertValue(event, Post.class));
+        } else if( outClass.equals( Post.class )) {
+            LOGGER.debug("POST");
+            result = mapper.convertValue(event, Post.class);
+        } else if( outClass.equals( ObjectNode.class )) {
+            LOGGER.debug("OBJECTNODE");
+            result = mapper.convertValue(event, ObjectNode.class);
+        }
+
+        // no supported conversion were applied
+        if( result != null ) {
+            count ++;
+            return result;
+        }
+
+        LOGGER.debug("CONVERT FAILED");
+
+        return null;
+
+    }
+
+    public boolean validate(Object document, Class klass) {
+
+        // TODO
+        return true;
+    }
+
+    public boolean isValidJSON(final String json) {
+        boolean valid = false;
+        try {
+            final JsonParser parser = new ObjectMapper().getJsonFactory()
+                    .createJsonParser(json);
+            while (parser.nextToken() != null) {
+            }
+            valid = true;
+        } catch (JsonParseException jpe) {
+            LOGGER.warn("validate: {}", jpe);
+        } catch (IOException ioe) {
+            LOGGER.warn("validate: {}", ioe);
+        }
+
+        return valid;
+    }
+
+    @Override
+    public List<StreamsDatum> process(StreamsDatum entry) {
+
+        StreamsDatum result = null;
+
+        try {
+
+            Object item = entry.getDocument();
+            ObjectNode node;
+
+            LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass());
+
+            if( item instanceof String ) {
+
+                // if the target is string, just pass-through
+                if( String.class.equals(outClass)) {
+                    result = entry;
+                }
+                else {
+                    // first check for valid json
+                    node = (ObjectNode)mapper.readTree((String)item);
+
+                    // since data is coming from outside provider, we don't know what type the events are
+                    // for now we'll assume post
+
+                    Object out = convert(node, Post.class, outClass);
+
+                    if( out != null && validate(out, outClass))
+                        result = new StreamsDatum(out);
+                }
+
+            } else if( item instanceof ObjectNode || item instanceof Post) {
+
+                // first check for valid json
+                node = (ObjectNode)mapper.valueToTree(item);
+
+                // since data is coming from outside provider, we don't know what type the events are
+                // for now we'll assume post
+
+                Object out = convert(node, Post.class, outClass);
+
+                if( out != null && validate(out, outClass))
+                    result = new StreamsDatum(out);
+
+            }
+
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+
+        if( result != null )
+            return Lists.newArrayList(result);
+        else
+            return Lists.newArrayList();
+    }
+
+    @Override
+    public void prepare(Object o) {
+        mapper = new StreamsJacksonMapper();
+        facebookPostActivitySerializer = new FacebookPostActivitySerializer();
+    }
+
+    @Override
+    public void cleanUp() {
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dc432af2/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendFeedProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendFeedProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendFeedProvider.java
new file mode 100644
index 0000000..381e6f3
--- /dev/null
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendFeedProvider.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.facebook.provider;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Queues;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigRenderOptions;
+import facebook4j.*;
+import facebook4j.conf.ConfigurationBuilder;
+import facebook4j.json.DataObjectFactory;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.DatumStatusCounter;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.facebook.FacebookUserInformationConfiguration;
+import org.apache.streams.facebook.FacebookUserstreamConfiguration;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.util.ComponentUtils;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.math.BigInteger;
+import java.util.Iterator;
+import java.util.Queue;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class FacebookFriendFeedProvider implements StreamsProvider, Serializable
+{
+
+    public static final String STREAMS_ID = "FacebookFriendFeedProvider";
+    private static final Logger LOGGER = LoggerFactory.getLogger(FacebookFriendFeedProvider.class);
+
+    private static final ObjectMapper mapper = new StreamsJacksonMapper();
+
+    private static final String ALL_PERMISSIONS = "ads_management,ads_read,create_event,create_note,email,export_stream,friends_about_me,friends_actions.books,friends_actions.music,friends_actions.news,friends_actions.video,friends_activities,friends_birthday,friends_education_history,friends_events,friends_games_activity,friends_groups,friends_hometown,friends_interests,friends_likes,friends_location,friends_notes,friends_online_presence,friends_photo_video_tags,friends_photos,friends_questions,friends_relationship_details,friends_relationships,friends_religion_politics,friends_status,friends_subscriptions,friends_videos,friends_website,friends_work_history,manage_friendlists,manage_notifications,manage_pages,photo_upload,publish_actions,publish_stream,read_friendlists,read_insights,read_mailbox,read_page_mailboxes,read_requests,read_stream,rsvp_event,share_item,sms,status_update,user_about_me,user_actions.books,user_actions.music,user_actions.news,user_actions.video,user_activitie
 s,user_birthday,user_education_history,user_events,user_friends,user_games_activity,user_groups,user_hometown,user_interests,user_likes,user_location,user_notes,user_online_presence,user_photo_video_tags,user_photos,user_questions,user_relationship_details,user_relationships,user_religion_politics,user_status,user_subscriptions,user_videos,user_website,user_work_history,video_upload,xmpp_login";
+    private FacebookUserstreamConfiguration configuration;
+
+    private Class klass;
+    protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+    protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>();
+
+    public FacebookUserstreamConfiguration getConfig()              { return configuration; }
+
+    public void setConfig(FacebookUserstreamConfiguration config)   { this.configuration = config; }
+
+    protected Iterator<String[]> idsBatches;
+
+    protected ExecutorService executor;
+
+    protected DateTime start;
+    protected DateTime end;
+
+    protected final AtomicBoolean running = new AtomicBoolean();
+
+    private DatumStatusCounter countersCurrent = new DatumStatusCounter();
+    private DatumStatusCounter countersTotal = new DatumStatusCounter();
+
+    private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
+        return new ThreadPoolExecutor(nThreads, nThreads,
+                5000L, TimeUnit.MILLISECONDS,
+                new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
+    }
+
+    public FacebookFriendFeedProvider() {
+        Config config = StreamsConfigurator.config.getConfig("facebook");
+        FacebookUserInformationConfiguration configuration;
+        try {
+            configuration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class);
+        } catch (IOException e) {
+            e.printStackTrace();
+            return;
+        }
+    }
+
+    public FacebookFriendFeedProvider(FacebookUserstreamConfiguration config) {
+        this.configuration = config;
+    }
+
+    public FacebookFriendFeedProvider(Class klass) {
+        Config config = StreamsConfigurator.config.getConfig("facebook");
+        FacebookUserInformationConfiguration configuration;
+        try {
+            configuration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class);
+        } catch (IOException e) {
+            e.printStackTrace();
+            return;
+        }
+        this.klass = klass;
+    }
+
+    public FacebookFriendFeedProvider(FacebookUserstreamConfiguration config, Class klass) {
+        this.configuration = config;
+        this.klass = klass;
+    }
+
+    public Queue<StreamsDatum> getProviderQueue() {
+        return this.providerQueue;
+    }
+
+    @Override
+    public void startStream() {
+        shutdownAndAwaitTermination(executor);
+        running.set(true);
+    }
+
+    public StreamsResultSet readCurrent() {
+
+        StreamsResultSet current;
+
+        synchronized (FacebookUserstreamProvider.class) {
+            current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(providerQueue));
+            current.setCounter(new DatumStatusCounter());
+            current.getCounter().add(countersCurrent);
+            countersTotal.add(countersCurrent);
+            countersCurrent = new DatumStatusCounter();
+            providerQueue.clear();
+        }
+
+        return current;
+
+    }
+
+    public StreamsResultSet readNew(BigInteger sequence) {
+        LOGGER.debug("{} readNew", STREAMS_ID);
+        throw new NotImplementedException();
+    }
+
+    public StreamsResultSet readRange(DateTime start, DateTime end) {
+        LOGGER.debug("{} readRange", STREAMS_ID);
+        this.start = start;
+        this.end = end;
+        readCurrent();
+        StreamsResultSet result = (StreamsResultSet)providerQueue.iterator();
+        return result;
+    }
+
+    @Override
+    public boolean isRunning() {
+        return running.get();
+    }
+
+    void shutdownAndAwaitTermination(ExecutorService pool) {
+        pool.shutdown(); // Disable new tasks from being submitted
+        try {
+            // Wait a while for existing tasks to terminate
+            if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+                pool.shutdownNow(); // Cancel currently executing tasks
+                // Wait a while for tasks to respond to being cancelled
+                if (!pool.awaitTermination(10, TimeUnit.SECONDS))
+                    System.err.println("Pool did not terminate");
+            }
+        } catch (InterruptedException ie) {
+            // (Re-)Cancel if current thread also interrupted
+            pool.shutdownNow();
+            // Preserve interrupt status
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    @Override
+    public void prepare(Object o) {
+
+        executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
+
+        Preconditions.checkNotNull(providerQueue);
+        Preconditions.checkNotNull(this.klass);
+        Preconditions.checkNotNull(configuration.getOauth().getAppId());
+        Preconditions.checkNotNull(configuration.getOauth().getAppSecret());
+        Preconditions.checkNotNull(configuration.getOauth().getUserAccessToken());
+
+        Facebook client = getFacebookClient();
+
+        try {
+            ResponseList<Friend> friendResponseList = client.friends().getFriends();
+            Paging<Friend> friendPaging;
+            do {
+
+                for( Friend friend : friendResponseList ) {
+
+                    executor.submit(new FacebookFriendFeedTask(this, friend.getId()));
+                }
+                friendPaging = friendResponseList.getPaging();
+                friendResponseList = client.fetchNext(friendPaging);
+            } while( friendPaging != null &&
+                    friendResponseList != null );
+        } catch (FacebookException e) {
+            e.printStackTrace();
+        }
+
+    }
+
+    protected Facebook getFacebookClient()
+    {
+        ConfigurationBuilder cb = new ConfigurationBuilder();
+        cb.setDebugEnabled(true)
+            .setOAuthAppId(configuration.getOauth().getAppId())
+            .setOAuthAppSecret(configuration.getOauth().getAppSecret())
+            .setOAuthAccessToken(configuration.getOauth().getUserAccessToken())
+            .setOAuthPermissions(ALL_PERMISSIONS)
+            .setJSONStoreEnabled(true)
+            .setClientVersion("v1.0");
+
+        FacebookFactory ff = new FacebookFactory(cb.build());
+        Facebook facebook = ff.getInstance();
+
+        return facebook;
+    }
+
+    @Override
+    public void cleanUp() {
+        shutdownAndAwaitTermination(executor);
+    }
+
+    private class FacebookFriendFeedTask implements Runnable {
+
+        FacebookFriendFeedProvider provider;
+        Facebook client;
+        String id;
+
+        public FacebookFriendFeedTask(FacebookFriendFeedProvider provider, String id) {
+            this.provider = provider;
+            this.id = id;
+        }
+
+        @Override
+        public void run() {
+            client = provider.getFacebookClient();
+                try {
+                    ResponseList<Post> postResponseList = client.getFeed(id);
+                    Paging<Post> postPaging;
+                    do {
+
+                        for (Post item : postResponseList) {
+                            String json = DataObjectFactory.getRawJSON(item);
+                            org.apache.streams.facebook.Post post = mapper.readValue(json, org.apache.streams.facebook.Post.class);
+                            try {
+                                lock.readLock().lock();
+                                ComponentUtils.offerUntilSuccess(new StreamsDatum(post), providerQueue);
+                                countersCurrent.incrementAttempt();
+                            } finally {
+                                lock.readLock().unlock();
+                            }
+                        }
+                        postPaging = postResponseList.getPaging();
+                        postResponseList = client.fetchNext(postPaging);
+                    } while( postPaging != null &&
+                            postResponseList != null );
+
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dc432af2/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendUpdatesProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendUpdatesProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendUpdatesProvider.java
new file mode 100644
index 0000000..2a5ec65
--- /dev/null
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendUpdatesProvider.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.facebook.provider;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigRenderOptions;
+import facebook4j.*;
+import facebook4j.conf.ConfigurationBuilder;
+import facebook4j.json.DataObjectFactory;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.DatumStatusCounter;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.facebook.FacebookUserInformationConfiguration;
+import org.apache.streams.facebook.FacebookUserstreamConfiguration;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.util.ComponentUtils;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.math.BigInteger;
+import java.util.Iterator;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class FacebookFriendUpdatesProvider implements StreamsProvider, Serializable
+{
+
+    public static final String STREAMS_ID = "FacebookFriendPostsProvider";
+    private static final Logger LOGGER = LoggerFactory.getLogger(FacebookFriendUpdatesProvider.class);
+
+    private static final ObjectMapper mapper = new StreamsJacksonMapper();
+
+    private static final String ALL_PERMISSIONS = "ads_management,ads_read,create_event,create_note,email,export_stream,friends_about_me,friends_actions.books,friends_actions.music,friends_actions.news,friends_actions.video,friends_activities,friends_birthday,friends_education_history,friends_events,friends_games_activity,friends_groups,friends_hometown,friends_interests,friends_likes,friends_location,friends_notes,friends_online_presence,friends_photo_video_tags,friends_photos,friends_questions,friends_relationship_details,friends_relationships,friends_religion_politics,friends_status,friends_subscriptions,friends_videos,friends_website,friends_work_history,manage_friendlists,manage_notifications,manage_pages,photo_upload,publish_actions,publish_stream,read_friendlists,read_insights,read_mailbox,read_page_mailboxes,read_requests,read_stream,rsvp_event,share_item,sms,status_update,user_about_me,user_actions.books,user_actions.music,user_actions.news,user_actions.video,user_activitie
 s,user_birthday,user_education_history,user_events,user_friends,user_games_activity,user_groups,user_hometown,user_interests,user_likes,user_location,user_notes,user_online_presence,user_photo_video_tags,user_photos,user_questions,user_relationship_details,user_relationships,user_religion_politics,user_status,user_subscriptions,user_videos,user_website,user_work_history,video_upload,xmpp_login";
+    private FacebookUserstreamConfiguration configuration;
+
+    private Class klass;
+    protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+    protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>();
+
+    public FacebookUserstreamConfiguration getConfig()              { return configuration; }
+
+    public void setConfig(FacebookUserstreamConfiguration config)   { this.configuration = config; }
+
+    protected Iterator<String[]> idsBatches;
+
+    protected ExecutorService executor;
+
+    protected DateTime start;
+    protected DateTime end;
+
+    protected final AtomicBoolean running = new AtomicBoolean();
+
+    private DatumStatusCounter countersCurrent = new DatumStatusCounter();
+    private DatumStatusCounter countersTotal = new DatumStatusCounter();
+
+    private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
+        return new ThreadPoolExecutor(nThreads, nThreads,
+                5000L, TimeUnit.MILLISECONDS,
+                new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
+    }
+
+    public FacebookFriendUpdatesProvider() {
+        Config config = StreamsConfigurator.config.getConfig("facebook");
+        FacebookUserInformationConfiguration configuration;
+        try {
+            configuration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class);
+        } catch (IOException e) {
+            e.printStackTrace();
+            return;
+        }
+    }
+
+    public FacebookFriendUpdatesProvider(FacebookUserstreamConfiguration config) {
+        this.configuration = config;
+    }
+
+    public FacebookFriendUpdatesProvider(Class klass) {
+        Config config = StreamsConfigurator.config.getConfig("facebook");
+        FacebookUserInformationConfiguration configuration;
+        try {
+            configuration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class);
+        } catch (IOException e) {
+            e.printStackTrace();
+            return;
+        }
+        this.klass = klass;
+    }
+
+    public FacebookFriendUpdatesProvider(FacebookUserstreamConfiguration config, Class klass) {
+        this.configuration = config;
+        this.klass = klass;
+    }
+
+    public Queue<StreamsDatum> getProviderQueue() {
+        return this.providerQueue;
+    }
+
+    @Override
+    public void startStream() {
+        running.set(true);
+    }
+
+    public StreamsResultSet readCurrent() {
+
+        Preconditions.checkArgument(idsBatches.hasNext());
+
+        LOGGER.info("readCurrent");
+
+        // return stuff
+
+        LOGGER.info("Finished.  Cleaning up...");
+
+        LOGGER.info("Providing {} docs", providerQueue.size());
+
+        StreamsResultSet result =  new StreamsResultSet(providerQueue);
+        running.set(false);
+
+        LOGGER.info("Exiting");
+
+        return result;
+
+    }
+
+    public StreamsResultSet readNew(BigInteger sequence) {
+        LOGGER.debug("{} readNew", STREAMS_ID);
+        throw new NotImplementedException();
+    }
+
+    public StreamsResultSet readRange(DateTime start, DateTime end) {
+        LOGGER.debug("{} readRange", STREAMS_ID);
+        this.start = start;
+        this.end = end;
+        readCurrent();
+        StreamsResultSet result = (StreamsResultSet)providerQueue.iterator();
+        return result;
+    }
+
+    @Override
+    public boolean isRunning() {
+        return running.get();
+    }
+
+    void shutdownAndAwaitTermination(ExecutorService pool) {
+        pool.shutdown(); // Disable new tasks from being submitted
+        try {
+            // Wait a while for existing tasks to terminate
+            if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+                pool.shutdownNow(); // Cancel currently executing tasks
+                // Wait a while for tasks to respond to being cancelled
+                if (!pool.awaitTermination(10, TimeUnit.SECONDS))
+                    System.err.println("Pool did not terminate");
+            }
+        } catch (InterruptedException ie) {
+            // (Re-)Cancel if current thread also interrupted
+            pool.shutdownNow();
+            // Preserve interrupt status
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    @Override
+    public void prepare(Object o) {
+
+        executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
+
+        Preconditions.checkNotNull(providerQueue);
+        Preconditions.checkNotNull(this.klass);
+        Preconditions.checkNotNull(configuration.getOauth().getAppId());
+        Preconditions.checkNotNull(configuration.getOauth().getAppSecret());
+        Preconditions.checkNotNull(configuration.getOauth().getUserAccessToken());
+
+        Facebook client = getFacebookClient();
+
+        try {
+            ResponseList<Friend> friendResponseList = client.friends().getFriends();
+            Paging<Friend> friendPaging;
+            do {
+
+                for( Friend friend : friendResponseList ) {
+
+                    //client.rawAPI().callPostAPI();
+                    // add a subscription
+                }
+                friendPaging = friendResponseList.getPaging();
+                friendResponseList = client.fetchNext(friendPaging);
+            } while( friendPaging != null &&
+                    friendResponseList != null );
+        } catch (FacebookException e) {
+            e.printStackTrace();
+        }
+
+    }
+
+    protected Facebook getFacebookClient()
+    {
+        ConfigurationBuilder cb = new ConfigurationBuilder();
+        cb.setDebugEnabled(true)
+            .setOAuthAppId(configuration.getOauth().getAppId())
+            .setOAuthAppSecret(configuration.getOauth().getAppSecret())
+            .setOAuthAccessToken(configuration.getOauth().getUserAccessToken())
+            .setOAuthPermissions(ALL_PERMISSIONS)
+            .setJSONStoreEnabled(true)
+            .setClientVersion("v1.0");
+
+        FacebookFactory ff = new FacebookFactory(cb.build());
+        Facebook facebook = ff.getInstance();
+
+        return facebook;
+    }
+
+    @Override
+    public void cleanUp() {
+        shutdownAndAwaitTermination(executor);
+    }
+
+    private class FacebookFeedPollingTask implements Runnable {
+
+        FacebookUserstreamProvider provider;
+        Facebook client;
+
+        private Set<Post> priorPollResult = Sets.newHashSet();
+
+        public FacebookFeedPollingTask(FacebookUserstreamProvider facebookUserstreamProvider) {
+            provider = facebookUserstreamProvider;
+        }
+
+        @Override
+        public void run() {
+            client = provider.getFacebookClient();
+            while (provider.isRunning()) {
+                try {
+                    ResponseList<Post> postResponseList = client.getHome();
+                    Set<Post> update = Sets.newHashSet(postResponseList);
+                    Set<Post> repeats = Sets.intersection(priorPollResult, Sets.newHashSet(update));
+                    Set<Post> entrySet = Sets.difference(update, repeats);
+                    for (Post item : entrySet) {
+                        String json = DataObjectFactory.getRawJSON(item);
+                        org.apache.streams.facebook.Post post = mapper.readValue(json, org.apache.streams.facebook.Post.class);
+                        try {
+                            lock.readLock().lock();
+                            ComponentUtils.offerUntilSuccess(new StreamsDatum(post), providerQueue);
+                            countersCurrent.incrementAttempt();
+                        } finally {
+                            lock.readLock().unlock();
+                        }
+                    }
+                    priorPollResult = update;
+                    Thread.sleep(configuration.getPollIntervalMillis());
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dc432af2/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserInformationProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserInformationProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserInformationProvider.java
new file mode 100644
index 0000000..fce5f22
--- /dev/null
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserInformationProvider.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.facebook.provider;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigRenderOptions;
+import facebook4j.*;
+import facebook4j.conf.ConfigurationBuilder;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.facebook.FacebookUserInformationConfiguration;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class FacebookUserInformationProvider implements StreamsProvider, Serializable
+{
+
+    public static final String STREAMS_ID = "FacebookUserInformationProvider";
+    private static final Logger LOGGER = LoggerFactory.getLogger(FacebookUserInformationProvider.class);
+
+    private static final ObjectMapper mapper = new StreamsJacksonMapper();
+
+    private static final String ALL_PERMISSIONS = "ads_management,ads_read,create_event,create_note,email,export_stream,friends_about_me,friends_actions.books,friends_actions.music,friends_actions.news,friends_actions.video,friends_activities,friends_birthday,friends_education_history,friends_events,friends_games_activity,friends_groups,friends_hometown,friends_interests,friends_likes,friends_location,friends_notes,friends_online_presence,friends_photo_video_tags,friends_photos,friends_questions,friends_relationship_details,friends_relationships,friends_religion_politics,friends_status,friends_subscriptions,friends_videos,friends_website,friends_work_history,manage_friendlists,manage_notifications,manage_pages,photo_upload,publish_actions,publish_stream,read_friendlists,read_insights,read_mailbox,read_page_mailboxes,read_requests,read_stream,rsvp_event,share_item,sms,status_update,user_about_me,user_actions.books,user_actions.music,user_actions.news,user_actions.video,user_activitie
 s,user_birthday,user_education_history,user_events,user_friends,user_games_activity,user_groups,user_hometown,user_interests,user_likes,user_location,user_notes,user_online_presence,user_photo_video_tags,user_photos,user_questions,user_relationship_details,user_relationships,user_religion_politics,user_status,user_subscriptions,user_videos,user_website,user_work_history,video_upload,xmpp_login";
+    private FacebookUserInformationConfiguration facebookUserInformationConfiguration;
+
+    private Class klass;
+    protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>();
+
+    public FacebookUserInformationConfiguration getConfig()              { return facebookUserInformationConfiguration; }
+
+    public void setConfig(FacebookUserInformationConfiguration config)   { this.facebookUserInformationConfiguration = config; }
+
+    protected Iterator<String[]> idsBatches;
+
+    protected ExecutorService executor;
+
+    protected DateTime start;
+    protected DateTime end;
+
+    protected final AtomicBoolean running = new AtomicBoolean();
+
+    private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
+        return new ThreadPoolExecutor(nThreads, nThreads,
+                5000L, TimeUnit.MILLISECONDS,
+                new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
+    }
+
+    public FacebookUserInformationProvider() {
+        Config config = StreamsConfigurator.config.getConfig("facebook");
+        FacebookUserInformationConfiguration facebookUserInformationConfiguration;
+        try {
+            facebookUserInformationConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class);
+        } catch (IOException e) {
+            e.printStackTrace();
+            return;
+        }
+    }
+
+    public FacebookUserInformationProvider(FacebookUserInformationConfiguration config) {
+        this.facebookUserInformationConfiguration = config;
+    }
+
+    public FacebookUserInformationProvider(Class klass) {
+        Config config = StreamsConfigurator.config.getConfig("facebook");
+        FacebookUserInformationConfiguration facebookUserInformationConfiguration;
+        try {
+            facebookUserInformationConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class);
+        } catch (IOException e) {
+            e.printStackTrace();
+            return;
+        }
+        this.klass = klass;
+    }
+
+    public FacebookUserInformationProvider(FacebookUserInformationConfiguration config, Class klass) {
+        this.facebookUserInformationConfiguration = config;
+        this.klass = klass;
+    }
+
+    public Queue<StreamsDatum> getProviderQueue() {
+        return this.providerQueue;
+    }
+
+    @Override
+    public void startStream() {
+        running.set(true);
+    }
+
+    public StreamsResultSet readCurrent() {
+
+        Preconditions.checkArgument(idsBatches.hasNext());
+
+        LOGGER.info("readCurrent");
+
+        Facebook client = getFacebookClient();
+
+        try {
+            User me = client.users().getMe();
+            String json = mapper.writeValueAsString(me);
+            providerQueue.add(
+                new StreamsDatum(json, DateTime.now())
+            );
+        } catch (JsonProcessingException e) {
+            e.printStackTrace();
+        } catch (FacebookException e) {
+            e.printStackTrace();
+        }
+
+        if( idsBatches.hasNext()) {
+            while (idsBatches.hasNext()) {
+                try {
+                    List<User> userList = client.users().getUsers(idsBatches.next());
+                    for (User user : userList) {
+
+                        try {
+                            String json = mapper.writeValueAsString(user);
+                            providerQueue.add(
+                                    new StreamsDatum(json, DateTime.now())
+                            );
+                        } catch (JsonProcessingException e) {
+                            //                        e.printStackTrace();
+                        }
+                    }
+
+                } catch (FacebookException e) {
+                    e.printStackTrace();
+                }
+            }
+        } else {
+            try {
+                ResponseList<Friend> friendResponseList = client.friends().getFriends();
+                Paging<Friend> friendPaging;
+                do {
+
+                    for( Friend friend : friendResponseList ) {
+
+                        String json;
+                        try {
+                            json = mapper.writeValueAsString(friend);
+                            providerQueue.add(
+                                    new StreamsDatum(json)
+                            );
+                        } catch (JsonProcessingException e) {
+//                        e.printStackTrace();
+                        }
+                    }
+                    friendPaging = friendResponseList.getPaging();
+                    friendResponseList = client.fetchNext(friendPaging);
+                } while( friendPaging != null &&
+                        friendResponseList != null );
+            } catch (FacebookException e) {
+                e.printStackTrace();
+            }
+
+        }
+
+        LOGGER.info("Finished.  Cleaning up...");
+
+        LOGGER.info("Providing {} docs", providerQueue.size());
+
+        StreamsResultSet result =  new StreamsResultSet(providerQueue);
+        running.set(false);
+
+        LOGGER.info("Exiting");
+
+        return result;
+
+    }
+
+    public StreamsResultSet readNew(BigInteger sequence) {
+        LOGGER.debug("{} readNew", STREAMS_ID);
+        throw new NotImplementedException();
+    }
+
+    public StreamsResultSet readRange(DateTime start, DateTime end) {
+        LOGGER.debug("{} readRange", STREAMS_ID);
+        this.start = start;
+        this.end = end;
+        readCurrent();
+        StreamsResultSet result = (StreamsResultSet)providerQueue.iterator();
+        return result;
+    }
+
+    @Override
+    public boolean isRunning() {
+        return running.get();
+    }
+
+    void shutdownAndAwaitTermination(ExecutorService pool) {
+        pool.shutdown(); // Disable new tasks from being submitted
+        try {
+            // Wait a while for existing tasks to terminate
+            if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+                pool.shutdownNow(); // Cancel currently executing tasks
+                // Wait a while for tasks to respond to being cancelled
+                if (!pool.awaitTermination(10, TimeUnit.SECONDS))
+                    System.err.println("Pool did not terminate");
+            }
+        } catch (InterruptedException ie) {
+            // (Re-)Cancel if current thread also interrupted
+            pool.shutdownNow();
+            // Preserve interrupt status
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    @Override
+    public void prepare(Object o) {
+
+        executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
+
+        Preconditions.checkNotNull(providerQueue);
+        Preconditions.checkNotNull(this.klass);
+        Preconditions.checkNotNull(facebookUserInformationConfiguration.getOauth().getAppId());
+        Preconditions.checkNotNull(facebookUserInformationConfiguration.getOauth().getAppSecret());
+        Preconditions.checkNotNull(facebookUserInformationConfiguration.getOauth().getUserAccessToken());
+        Preconditions.checkNotNull(facebookUserInformationConfiguration.getInfo());
+
+        List<String> ids = new ArrayList<String>();
+        List<String[]> idsBatches = new ArrayList<String[]>();
+
+        for(String s : facebookUserInformationConfiguration.getInfo()) {
+            if(s != null)
+            {
+                ids.add(s);
+
+                if(ids.size() >= 100) {
+                    // add the batch
+                    idsBatches.add(ids.toArray(new String[ids.size()]));
+                    // reset the Ids
+                    ids = new ArrayList<String>();
+                }
+
+            }
+        }
+
+        if(ids.size() > 0)
+            idsBatches.add(ids.toArray(new String[ids.size()]));
+
+        this.idsBatches = idsBatches.iterator();
+    }
+
+    protected Facebook getFacebookClient()
+    {
+        ConfigurationBuilder cb = new ConfigurationBuilder();
+        cb.setDebugEnabled(true)
+            .setOAuthAppId(facebookUserInformationConfiguration.getOauth().getAppId())
+            .setOAuthAppSecret(facebookUserInformationConfiguration.getOauth().getAppSecret())
+            .setOAuthAccessToken(facebookUserInformationConfiguration.getOauth().getUserAccessToken())
+            .setOAuthPermissions(ALL_PERMISSIONS)
+            .setJSONStoreEnabled(true)
+            .setClientVersion("v1.0");
+
+        FacebookFactory ff = new FacebookFactory(cb.build());
+        Facebook facebook = ff.getInstance();
+
+        return facebook;
+    }
+
+    @Override
+    public void cleanUp() {
+        shutdownAndAwaitTermination(executor);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dc432af2/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserstreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserstreamProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserstreamProvider.java
new file mode 100644
index 0000000..d4f30e2
--- /dev/null
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserstreamProvider.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.facebook.provider;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Queues;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigRenderOptions;
+import facebook4j.*;
+import facebook4j.Post;
+import facebook4j.conf.ConfigurationBuilder;
+import facebook4j.json.DataObjectFactory;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.DatumStatusCounter;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.facebook.FacebookUserInformationConfiguration;
+import org.apache.streams.facebook.FacebookUserstreamConfiguration;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.util.ComponentUtils;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class FacebookUserstreamProvider implements StreamsProvider, Serializable {
+
+    public static final String STREAMS_ID = "FacebookUserstreamProvider";
+    private static final Logger LOGGER = LoggerFactory.getLogger(FacebookUserstreamProvider.class);
+
+    private static final ObjectMapper mapper = new StreamsJacksonMapper();
+
+    private static final String ALL_PERMISSIONS = "read_stream";
+    private FacebookUserstreamConfiguration configuration;
+
+    private Class klass;
+    protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+    protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>();
+
+    public FacebookUserstreamConfiguration getConfig() {
+        return configuration;
+    }
+
+    public void setConfig(FacebookUserstreamConfiguration config) {
+        this.configuration = config;
+    }
+
+    protected ListeningExecutorService executor;
+
+    protected DateTime start;
+    protected DateTime end;
+
+    protected final AtomicBoolean running = new AtomicBoolean();
+
+    private DatumStatusCounter countersCurrent = new DatumStatusCounter();
+    private DatumStatusCounter countersTotal = new DatumStatusCounter();
+
+    protected Facebook client;
+
+    private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
+        return new ThreadPoolExecutor(nThreads, nThreads,
+                5000L, TimeUnit.MILLISECONDS,
+                new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
+    }
+
+    public FacebookUserstreamProvider() {
+        Config config = StreamsConfigurator.config.getConfig("facebook");
+        FacebookUserInformationConfiguration facebookUserInformationConfiguration;
+        try {
+            facebookUserInformationConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class);
+        } catch (IOException e) {
+            e.printStackTrace();
+            return;
+        }
+    }
+
+    public FacebookUserstreamProvider(FacebookUserstreamConfiguration config) {
+        this.configuration = config;
+    }
+
+    public FacebookUserstreamProvider(Class klass) {
+        Config config = StreamsConfigurator.config.getConfig("facebook");
+        FacebookUserInformationConfiguration facebookUserInformationConfiguration;
+        try {
+            facebookUserInformationConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class);
+        } catch (IOException e) {
+            e.printStackTrace();
+            return;
+        }
+        this.klass = klass;
+    }
+
+    public FacebookUserstreamProvider(FacebookUserstreamConfiguration config, Class klass) {
+        this.configuration = config;
+        this.klass = klass;
+    }
+
+    public Queue<StreamsDatum> getProviderQueue() {
+        return this.providerQueue;
+    }
+
+    @Override
+    public void startStream() {
+
+        client = getFacebookClient();
+
+        if( configuration.getInfo() != null &&
+            configuration.getInfo().size() > 0 ) {
+            for( String id : configuration.getInfo()) {
+                executor.submit(new FacebookFeedPollingTask(this, id));
+            }
+            running.set(true);
+        } else {
+            try {
+                String id = client.getMe().getId();
+                executor.submit(new FacebookFeedPollingTask(this, id));
+                running.set(true);
+            } catch (FacebookException e) {
+                LOGGER.error(e.getMessage());
+                running.set(false);
+            }
+        }
+    }
+
+    public StreamsResultSet readCurrent() {
+
+        StreamsResultSet current;
+
+        synchronized (FacebookUserstreamProvider.class) {
+            current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(providerQueue));
+            current.setCounter(new DatumStatusCounter());
+            current.getCounter().add(countersCurrent);
+            countersTotal.add(countersCurrent);
+            countersCurrent = new DatumStatusCounter();
+            providerQueue.clear();
+        }
+
+        return current;
+
+    }
+
+    public StreamsResultSet readNew(BigInteger sequence) {
+        LOGGER.debug("{} readNew", STREAMS_ID);
+        throw new NotImplementedException();
+    }
+
+    public StreamsResultSet readRange(DateTime start, DateTime end) {
+        LOGGER.debug("{} readRange", STREAMS_ID);
+        this.start = start;
+        this.end = end;
+        readCurrent();
+        StreamsResultSet result = (StreamsResultSet) providerQueue.iterator();
+        return result;
+    }
+
+    @Override
+    public boolean isRunning() {
+        return running.get();
+    }
+
+    void shutdownAndAwaitTermination(ExecutorService pool) {
+        pool.shutdown(); // Disable new tasks from being submitted
+        try {
+            // Wait a while for existing tasks to terminate
+            if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+                pool.shutdownNow(); // Cancel currently executing tasks
+                // Wait a while for tasks to respond to being cancelled
+                if (!pool.awaitTermination(10, TimeUnit.SECONDS))
+                    System.err.println("Pool did not terminate");
+            }
+        } catch (InterruptedException ie) {
+            // (Re-)Cancel if current thread also interrupted
+            pool.shutdownNow();
+            // Preserve interrupt status
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    @Override
+    public void prepare(Object o) {
+
+        executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
+
+        Preconditions.checkNotNull(providerQueue);
+        Preconditions.checkNotNull(this.klass);
+        Preconditions.checkNotNull(configuration.getOauth().getAppId());
+        Preconditions.checkNotNull(configuration.getOauth().getAppSecret());
+        Preconditions.checkNotNull(configuration.getOauth().getUserAccessToken());
+
+        client = getFacebookClient();
+
+        if( configuration.getInfo() != null &&
+            configuration.getInfo().size() > 0 ) {
+
+            List<String> ids = new ArrayList<String>();
+            List<String[]> idsBatches = new ArrayList<String[]>();
+
+            for (String s : configuration.getInfo()) {
+                if (s != null) {
+                    ids.add(s);
+
+                    if (ids.size() >= 100) {
+                        // add the batch
+                        idsBatches.add(ids.toArray(new String[ids.size()]));
+                        // reset the Ids
+                        ids = new ArrayList<String>();
+                    }
+
+                }
+            }
+        }
+    }
+
+    protected Facebook getFacebookClient() {
+        ConfigurationBuilder cb = new ConfigurationBuilder();
+        cb.setDebugEnabled(true)
+                .setOAuthAppId(configuration.getOauth().getAppId())
+                .setOAuthAppSecret(configuration.getOauth().getAppSecret())
+                .setOAuthAccessToken(configuration.getOauth().getUserAccessToken())
+                .setOAuthPermissions(ALL_PERMISSIONS)
+                .setJSONStoreEnabled(true);
+
+        FacebookFactory ff = new FacebookFactory(cb.build());
+        Facebook facebook = ff.getInstance();
+
+        return facebook;
+    }
+
+    @Override
+    public void cleanUp() {
+        shutdownAndAwaitTermination(executor);
+    }
+
+    private class FacebookFeedPollingTask implements Runnable {
+
+        FacebookUserstreamProvider provider;
+        Facebook client;
+        String id;
+
+        private Set<Post> priorPollResult = Sets.newHashSet();
+
+        public FacebookFeedPollingTask(FacebookUserstreamProvider facebookUserstreamProvider) {
+            this.provider = facebookUserstreamProvider;
+        }
+
+        public FacebookFeedPollingTask(FacebookUserstreamProvider facebookUserstreamProvider, String id) {
+            this.provider = facebookUserstreamProvider;
+            this.client = provider.client;
+            this.id = id;
+        }
+        @Override
+        public void run() {
+            while (provider.isRunning()) {
+                ResponseList<Post> postResponseList;
+                try {
+                    postResponseList = client.getFeed(id);
+
+                    Set<Post> update = Sets.newHashSet(postResponseList);
+                    Set<Post> repeats = Sets.intersection(priorPollResult, Sets.newHashSet(update));
+                    Set<Post> entrySet = Sets.difference(update, repeats);
+                    LOGGER.debug(this.id + " response: " + update.size() + " previous: " + repeats.size() + " new: " + entrySet.size());
+                    for (Post item : entrySet) {
+                        String json = DataObjectFactory.getRawJSON(item);
+                        org.apache.streams.facebook.Post post = mapper.readValue(json, org.apache.streams.facebook.Post.class);
+                        try {
+                            lock.readLock().lock();
+                            ComponentUtils.offerUntilSuccess(new StreamsDatum(post), providerQueue);
+                            countersCurrent.incrementAttempt();
+                        } finally {
+                            lock.readLock().unlock();
+                        }
+                    }
+                    priorPollResult = update;
+                } catch (Exception e) {
+                    e.printStackTrace();
+                } finally {
+                    try {
+                        Thread.sleep(configuration.getPollIntervalMillis());
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                    }
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dc432af2/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/FacebookConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/FacebookConfiguration.json b/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/FacebookConfiguration.json
deleted file mode 100644
index b4e5afb..0000000
--- a/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/FacebookConfiguration.json
+++ /dev/null
@@ -1,49 +0,0 @@
-{
-    "type": "object",
-    "$schema": "http://json-schema.org/draft-03/schema",
-    "id": "#",
-    "javaType" : "org.apache.streams.facebook.FacebookConfiguration",
-    "javaInterfaces": ["java.io.Serializable"],
-    "properties": {
-        "protocol": {
-            "type": "string",
-            "description": "The protocol"
-        },
-        "host": {
-            "type": "string",
-            "description": "The host"
-        },
-        "port": {
-            "type": "integer",
-            "description": "The port"
-        },
-        "version": {
-            "type": "string",
-            "description": "The version"
-        },
-        "endpoint": {
-            "type": "string",
-            "description": "The endpoint"
-        },
-        "oauth": {
-            "type": "object",
-            "dynamic": "true",
-            "javaType" : "org.apache.streams.facebook.FacebookOAuthConfiguration",
-            "javaInterfaces": ["java.io.Serializable"],
-            "properties": {
-                "appId": {
-                    "type": "string"
-                },
-                "appSecret": {
-                    "type": "string"
-                },
-                "appAccessToken": {
-                    "type": "string"
-                },
-                "userAccessToken": {
-                    "type": "string"
-                }
-            }
-        }
-   }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dc432af2/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/FacebookUserInformationConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/FacebookUserInformationConfiguration.json b/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/FacebookUserInformationConfiguration.json
deleted file mode 100644
index b351be9..0000000
--- a/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/FacebookUserInformationConfiguration.json
+++ /dev/null
@@ -1,23 +0,0 @@
-{
-    "type": "object",
-    "$schema": "http://json-schema.org/draft-03/schema",
-    "id": "#",
-    "javaType" : "org.apache.streams.facebook.FacebookUserInformationConfiguration",
-    "javaInterfaces": ["java.io.Serializable"],
-    "extends": {"$ref":"FacebookConfiguration.json"},
-    "javaInterfaces": ["java.io.Serializable"],
-    "properties": {
-        "info": {
-            "type": "array",
-            "description": "A list of user IDs, indicating users of interest",
-            "items": {
-                "type": "string"
-            }
-        },
-        "pollIntervalMillis": {
-            "type": "integer",
-            "default" : "60000",
-            "description": "Polling interval in ms"
-        }
-     }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dc432af2/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/FacebookUserstreamConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/FacebookUserstreamConfiguration.json b/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/FacebookUserstreamConfiguration.json
deleted file mode 100644
index bcb2258..0000000
--- a/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/FacebookUserstreamConfiguration.json
+++ /dev/null
@@ -1,22 +0,0 @@
-{
-    "type": "object",
-    "$schema": "http://json-schema.org/draft-03/schema",
-    "id": "#",
-    "javaType" : "org.apache.streams.facebook.FacebookUserstreamConfiguration",
-    "javaInterfaces": ["java.io.Serializable"],
-    "extends": {"$ref":"FacebookConfiguration.json"},
-    "properties": {
-        "info": {
-            "type": "array",
-            "description": "A list of user IDs, indicating users of interest",
-            "items": {
-                "type": "string"
-            }
-        },
-        "pollIntervalMillis": {
-            "type": "integer",
-            "default" : "60000",
-            "description": "Polling interval in ms"
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dc432af2/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/graph/Post.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/graph/Post.json b/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/graph/Post.json
deleted file mode 100644
index 23bcb08..0000000
--- a/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/graph/Post.json
+++ /dev/null
@@ -1,203 +0,0 @@
-{
-    "type": "object",
-    "$schema": "http://json-schema.org/draft-03/schema",
-    "id": "#",
-    "javaType": "org.apache.streams.facebook.Post",
-    "properties": {
-        "id": {
-            "type": "string"
-        },
-        "from": {
-            "type": "object",
-            "properties": {
-                "id": {
-                    "type": "string"
-                },
-                "name": {
-                    "type": "string"
-                }
-            }
-        },
-        "to": {
-            "type": "object",
-            "properties": {
-                "data": {
-                    "type": "array",
-                    "items": {
-                        "type": "object",
-                        "properties": {
-                            "id": {
-                                "type": "string"
-                            },
-                            "name": {
-                                "type": "string"
-                            }
-                        }
-                    }
-                }
-            }
-        },
-        "message": {
-            "type": "string"
-        },
-        "message_tags": {
-            "type": "object",
-            "properties": {
-                "data": {
-                    "type": "array",
-                    "items": {
-                        "type": "object",
-                        "properties": {
-                            "id": {
-                                "type": "string"
-                            },
-                            "name": {
-                                "type": "string"
-                            }
-                        }
-                    }
-                }
-            }
-        },
-        "picture": {
-            "type": "string"
-        },
-        "link": {
-            "type": "string"
-        },
-        "name": {
-            "type": "string"
-        },
-        "caption": {
-            "type": "string"
-        },
-        "description": {
-            "type": "string"
-        },
-        "source": {
-            "type": "string"
-        },
-        "icon": {
-            "type": "string"
-        },
-        "actions": {
-            "type": "array",
-            "items": {
-                "type": "object",
-                "properties": {
-                    "name": {
-                        "type": "string"
-                    },
-                    "link": {
-                        "type": "string"
-                    }
-                }
-            }
-        },
-        "comments": {
-            "type": "array",
-            "items": {
-                "type": "object",
-                "properties": {
-                    "id": {
-                        "type": "string"
-                    },
-                    "from": {
-                        "type": "string"
-                    },
-                    "message": {
-                        "type": "string"
-                    },
-                    "created_time": {
-                        "type": "string",
-                        "format" : "date-time"
-                    }
-                }
-            }
-        },
-        "likes": {
-            "type": "array",
-            "items": {
-                "type": "object",
-                "properties": {
-                    "name": {
-                        "type": "string"
-                    },
-                    "link": {
-                        "type": "string"
-                    }
-                }
-            }
-        },
-        "type": {
-            "type": "string"
-        },
-        "place": {
-            "type": "object",
-            "properties": {
-                "name": {
-                    "type": "string"
-                },
-                "id": {
-                    "type": "string"
-                }
-            }
-        },
-        "story": {
-            "type": "string"
-        },
-        "shares": {
-            "type": "int"
-        },
-        "object_id": {
-            "type": "int"
-        },
-        "application": {
-            "type": "object",
-            "properties": {
-                "name": {
-                    "type": "string"
-                },
-                "id": {
-                    "type": "string"
-                }
-            }
-        },
-        "created_time": {
-            "type": "string",
-            "format" : "date-time"
-        },
-        "updated_time": {
-            "type": "string",
-            "format" : "date-time"
-        },
-        "include_hidden": {
-            "type": "boolean"
-        },
-        "status_type": {
-            "type": "string"
-        },
-        "properties": {
-            "type": "array",
-            "items": {
-                "type": "object",
-                "properties": {
-                    "name": {
-                        "type": "string"
-                    },
-                    "text": {
-                        "type": "string"
-                    }
-                }
-            }
-        },
-        "privacy": {
-            "type": "object",
-            "properties": {
-                "value": {
-                    "type": "string"
-                }
-            }
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dc432af2/streams-contrib/streams-provider-facebook/src/main/jsonschema/org/apache/streams/facebook/FacebookConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/jsonschema/org/apache/streams/facebook/FacebookConfiguration.json b/streams-contrib/streams-provider-facebook/src/main/jsonschema/org/apache/streams/facebook/FacebookConfiguration.json
new file mode 100644
index 0000000..b4e5afb
--- /dev/null
+++ b/streams-contrib/streams-provider-facebook/src/main/jsonschema/org/apache/streams/facebook/FacebookConfiguration.json
@@ -0,0 +1,49 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "id": "#",
+    "javaType" : "org.apache.streams.facebook.FacebookConfiguration",
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "protocol": {
+            "type": "string",
+            "description": "The protocol"
+        },
+        "host": {
+            "type": "string",
+            "description": "The host"
+        },
+        "port": {
+            "type": "integer",
+            "description": "The port"
+        },
+        "version": {
+            "type": "string",
+            "description": "The version"
+        },
+        "endpoint": {
+            "type": "string",
+            "description": "The endpoint"
+        },
+        "oauth": {
+            "type": "object",
+            "dynamic": "true",
+            "javaType" : "org.apache.streams.facebook.FacebookOAuthConfiguration",
+            "javaInterfaces": ["java.io.Serializable"],
+            "properties": {
+                "appId": {
+                    "type": "string"
+                },
+                "appSecret": {
+                    "type": "string"
+                },
+                "appAccessToken": {
+                    "type": "string"
+                },
+                "userAccessToken": {
+                    "type": "string"
+                }
+            }
+        }
+   }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dc432af2/streams-contrib/streams-provider-facebook/src/main/jsonschema/org/apache/streams/facebook/FacebookUserInformationConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/jsonschema/org/apache/streams/facebook/FacebookUserInformationConfiguration.json b/streams-contrib/streams-provider-facebook/src/main/jsonschema/org/apache/streams/facebook/FacebookUserInformationConfiguration.json
new file mode 100644
index 0000000..b351be9
--- /dev/null
+++ b/streams-contrib/streams-provider-facebook/src/main/jsonschema/org/apache/streams/facebook/FacebookUserInformationConfiguration.json
@@ -0,0 +1,23 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "id": "#",
+    "javaType" : "org.apache.streams.facebook.FacebookUserInformationConfiguration",
+    "javaInterfaces": ["java.io.Serializable"],
+    "extends": {"$ref":"FacebookConfiguration.json"},
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "info": {
+            "type": "array",
+            "description": "A list of user IDs, indicating users of interest",
+            "items": {
+                "type": "string"
+            }
+        },
+        "pollIntervalMillis": {
+            "type": "integer",
+            "default" : "60000",
+            "description": "Polling interval in ms"
+        }
+     }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dc432af2/streams-contrib/streams-provider-facebook/src/main/jsonschema/org/apache/streams/facebook/FacebookUserstreamConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/jsonschema/org/apache/streams/facebook/FacebookUserstreamConfiguration.json b/streams-contrib/streams-provider-facebook/src/main/jsonschema/org/apache/streams/facebook/FacebookUserstreamConfiguration.json
new file mode 100644
index 0000000..bcb2258
--- /dev/null
+++ b/streams-contrib/streams-provider-facebook/src/main/jsonschema/org/apache/streams/facebook/FacebookUserstreamConfiguration.json
@@ -0,0 +1,22 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "id": "#",
+    "javaType" : "org.apache.streams.facebook.FacebookUserstreamConfiguration",
+    "javaInterfaces": ["java.io.Serializable"],
+    "extends": {"$ref":"FacebookConfiguration.json"},
+    "properties": {
+        "info": {
+            "type": "array",
+            "description": "A list of user IDs, indicating users of interest",
+            "items": {
+                "type": "string"
+            }
+        },
+        "pollIntervalMillis": {
+            "type": "integer",
+            "default" : "60000",
+            "description": "Polling interval in ms"
+        }
+    }
+}
\ No newline at end of file