You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/08/12 00:53:04 UTC

[04/15] git commit: added support for FB user stream and user profile collection

added support for FB user stream and user profile collection


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/49d00d7c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/49d00d7c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/49d00d7c

Branch: refs/heads/master
Commit: 49d00d7c3a65f8ef08ee9fa84f99f6290a8e796e
Parents: 7c5e290
Author: sblackmon <sb...@w2odigital.com>
Authored: Thu Jul 3 17:42:19 2014 -0700
Committer: sblackmon <sb...@w2odigital.com>
Committed: Mon Jul 21 10:30:44 2014 -0500

----------------------------------------------------------------------
 .../streams-provider-facebook/pom.xml           |  20 ++
 .../api/FacebookPostActivitySerializer.java     |  74 ++---
 .../processor/FacebookTypeConverter.java        | 194 +++++++++++
 .../FacebookUserInformationProvider.java        | 331 +++++++++++++++++++
 .../provider/FacebookUserstreamProvider.java    | 289 ++++++++++++++++
 .../com/facebook/FacebookConfiguration.json     |  49 +++
 .../FacebookUserInformationConfiguration.json   |  18 +
 .../FacebookUserstreamConfiguration.json        |  15 +
 .../jsonschema/com/facebook/graph/Post.json     |  21 +-
 .../test/FacebookActivitySerDeTest.java         |  80 +++++
 .../FacebookPostActivitySerializerTest.java     | 215 ------------
 .../facebook/test/FacebookPostSerDeTest.java    |   6 +-
 .../src/test/resources/Facebook.json            | 250 ++++++++++++++
 .../org/apache/streams/data/Facebook.json       | 251 --------------
 14 files changed, 1285 insertions(+), 528 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/49d00d7c/streams-contrib/streams-provider-facebook/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/pom.xml b/streams-contrib/streams-provider-facebook/pom.xml
index 0d54255..34d53a9 100644
--- a/streams-contrib/streams-provider-facebook/pom.xml
+++ b/streams-contrib/streams-provider-facebook/pom.xml
@@ -11,6 +11,17 @@
     <artifactId>streams-provider-facebook</artifactId>
 
     <dependencies>
+
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-core</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-config</artifactId>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-pojo</artifactId>
@@ -42,6 +53,12 @@
             <artifactId>guava</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.facebook4j</groupId>
+            <artifactId>facebook4j-core</artifactId>
+            <version>2.1.0</version>
+        </dependency>
+
     </dependencies>
 
     <build>
@@ -84,6 +101,9 @@
                     <addCompileSourceRoot>true</addCompileSourceRoot>
                     <generateBuilders>true</generateBuilders>
                     <sourcePaths>
+                        <sourcePath>src/main/jsonschema/com/facebook/FacebookConfiguration.json</sourcePath>
+                        <sourcePath>src/main/jsonschema/com/facebook/FacebookUserInformationConfiguration.json</sourcePath>
+                        <sourcePath>src/main/jsonschema/com/facebook/FacebookUserstreamConfiguration.json</sourcePath>
                         <sourcePath>src/main/jsonschema/com/facebook/graph/Post.json</sourcePath>
                     </sourcePaths>
                     <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/49d00d7c/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/api/FacebookPostActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/api/FacebookPostActivitySerializer.java b/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/api/FacebookPostActivitySerializer.java
index 2d53d48..71bc5c9 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/api/FacebookPostActivitySerializer.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/api/FacebookPostActivitySerializer.java
@@ -29,12 +29,15 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.SerializerProvider;
 import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
 import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.fasterxml.jackson.databind.ser.std.StdSerializer;
 import com.fasterxml.jackson.datatype.joda.JodaModule;
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.streams.data.ActivitySerializer;
 import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.facebook.Post;
 import org.apache.streams.pojo.json.*;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.joda.time.DateTime;
 import org.joda.time.format.DateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;
@@ -54,7 +57,7 @@ import static org.apache.streams.data.util.JsonUtil.jsonToJsonNode;
  * Serializes activity posts
  *   sblackmon: This class needs a rewrite
  */
-public class FacebookPostActivitySerializer implements ActivitySerializer<String> {
+public class FacebookPostActivitySerializer implements ActivitySerializer<org.apache.streams.facebook.Post> {
 
     public static final DateTimeFormatter FACEBOOK_FORMAT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ssZ");
     public static final DateTimeFormatter ACTIVITY_FORMAT = ISODateTimeFormat.basicDateTime();
@@ -63,25 +66,7 @@ public class FacebookPostActivitySerializer implements ActivitySerializer<String
 
     public static ObjectMapper mapper;
     static {
-        mapper = new ObjectMapper();
-        mapper.registerModule(new JodaModule());
-        mapper.disable(com.fasterxml.jackson.databind.SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
-        mapper.registerModule(new SimpleModule() {
-            {
-                addSerializer(DateTime.class, new StdSerializer<DateTime>(DateTime.class) {
-                    @Override
-                    public void serialize(DateTime value, JsonGenerator jgen, SerializerProvider provider) throws IOException, JsonGenerationException {
-                        jgen.writeString(ACTIVITY_FORMAT.print(value));
-                    }
-                });
-                addDeserializer(DateTime.class, new StdDeserializer<DateTime>(DateTime.class) {
-                    @Override
-                    public DateTime deserialize(JsonParser jpar, DeserializationContext context) throws IOException, JsonProcessingException {
-                        return FACEBOOK_FORMAT.parseDateTime(jpar.getValueAsString());
-                    }
-                });
-            }
-        });
+        mapper = StreamsJacksonMapper.getInstance();
     }
 
     @Override
@@ -90,26 +75,26 @@ public class FacebookPostActivitySerializer implements ActivitySerializer<String
     }
 
     @Override
-    public String serialize(Activity deserialized) {
-        throw new UnsupportedOperationException();
+    public Post serialize(Activity deserialized) throws ActivitySerializerException {
+        throw new NotImplementedException("Not currently supported by this deserializer");
     }
 
     @Override
-    public Activity deserialize(String serialized) throws ActivitySerializerException {
-        //Deserialize the JSON string into a Jackson JsonNode
-        JsonNode node = jsonToJsonNode(serialized);
-        Map.Entry<String, JsonNode> field = getObjectType(node);
+    public Activity deserialize(Post post) throws ActivitySerializerException {
         Activity activity = new Activity();
+        activity.setPublished(post.getCreatedTime());
+        activity.setUpdated(post.getUpdatedTime());
+        addActor(activity, mapper.convertValue(post.getFrom(), ObjectNode.class));
         setProvider(activity);
-        setObjectType(field, activity);
-        parseObject(activity, field.getValue());
+        setObjectType(post.getType(), activity);
+        parseObject(activity, mapper.convertValue(post, ObjectNode.class));
         fixObjectId(activity);
         fixContentFromSummary(activity);
         return activity;
     }
 
     @Override
-    public java.util.List<Activity> deserializeAll(List<String> serializedList) {
+    public List<Activity> deserializeAll(List<Post> serializedList) {
         throw new NotImplementedException("Not currently supported by this deserializer");
     }
 
@@ -129,10 +114,10 @@ public class FacebookPostActivitySerializer implements ActivitySerializer<String
         }
     }
 
-    private void setObjectType(Map.Entry<String, JsonNode> field, Activity activity) {
+    private void setObjectType(String type, Activity activity) {
         ActivityObject object = new ActivityObject();
         activity.setObject(object);
-        object.setObjectType(field.getKey());
+        object.setObjectType(type);
     }
 
     private void setProvider(Activity activity) {
@@ -142,12 +127,12 @@ public class FacebookPostActivitySerializer implements ActivitySerializer<String
         activity.setProvider(provider);
     }
 
-    private Map.Entry<String, JsonNode> getObjectType(JsonNode node) {
+    private String getObjectType(JsonNode node) {
         Iterator<Map.Entry<String, JsonNode>> fields = node.fields();
         ensureMoreFields(fields);
         Map.Entry<String, JsonNode> field = fields.next();
-        ensureNoMoreFields(fields);
-        return field;
+        //ensureNoMoreFields(fields);
+        return node.asText();
     }
 
     private void parseObject(Activity activity, JsonNode jsonNode) throws ActivitySerializerException {
@@ -160,9 +145,7 @@ public class FacebookPostActivitySerializer implements ActivitySerializer<String
     }
 
     private void mapField(Activity activity, String name, JsonNode value) throws ActivitySerializerException {
-        if ("created_time".equals(name)) {
-            activity.setPublished(parseDate(value));
-        } else if("application".equals(name)) {
+        if("application".equals(name)) {
             addGenerator(activity, value);
         } else if ("caption".equals(name)) {
             addSummary(activity, value);
@@ -198,16 +181,6 @@ public class FacebookPostActivitySerializer implements ActivitySerializer<String
             addObjectLink(activity, value);
         } else if ("story".equals(name)) {
             addTitle(activity, value);
-        }  else if ("updated_time".equals(name)) {
-            addObjectUpdated(activity, value);
-        }
-    }
-
-    private void addObjectUpdated(Activity activity, JsonNode value) {
-        try {
-            activity.getObject().setUpdated(parseDate(value));
-        } catch( ActivitySerializerException e ) {
-            // losing this field is ok
         }
     }
 
@@ -324,11 +297,4 @@ public class FacebookPostActivitySerializer implements ActivitySerializer<String
         }
     }
 
-    private static DateTime parseDate(JsonNode value) throws ActivitySerializerException {
-        try {
-            return FACEBOOK_FORMAT.parseDateTime(value.asText());
-        } catch (Exception e) {
-            throw new ActivitySerializerException("Unable to parse date " + value.asText());
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/49d00d7c/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/processor/FacebookTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/processor/FacebookTypeConverter.java b/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/processor/FacebookTypeConverter.java
new file mode 100644
index 0000000..6ddb673
--- /dev/null
+++ b/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/processor/FacebookTypeConverter.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.facebook.processor;
+
+import com.facebook.api.FacebookPostActivitySerializer;
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.facebook.Post;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Queue;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class FacebookTypeConverter implements StreamsProcessor {
+
+    public final static String STREAMS_ID = "TwitterTypeConverter";
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(FacebookTypeConverter.class);
+
+    private ObjectMapper mapper;
+
+    private Queue<StreamsDatum> inQueue;
+    private Queue<StreamsDatum> outQueue;
+
+    private Class inClass;
+    private Class outClass;
+
+    private FacebookPostActivitySerializer facebookPostActivitySerializer;
+
+    private int count = 0;
+
+    public final static String TERMINATE = new String("TERMINATE");
+
+    public FacebookTypeConverter(Class inClass, Class outClass) {
+        this.inClass = inClass;
+        this.outClass = outClass;
+    }
+
+    public Queue<StreamsDatum> getProcessorOutputQueue() {
+        return outQueue;
+    }
+
+    public void setProcessorInputQueue(Queue<StreamsDatum> inputQueue) {
+        inQueue = inputQueue;
+    }
+
+    public Object convert(ObjectNode event, Class inClass, Class outClass) throws ActivitySerializerException, JsonProcessingException {
+
+        Object result = null;
+
+        if( outClass.equals( Activity.class )) {
+            LOGGER.debug("ACTIVITY");
+            result = facebookPostActivitySerializer.deserialize(mapper.convertValue(event, Post.class));
+        } else if( outClass.equals( Post.class )) {
+            LOGGER.debug("POST");
+            result = mapper.convertValue(event, Post.class);
+        } else if( outClass.equals( ObjectNode.class )) {
+            LOGGER.debug("OBJECTNODE");
+            result = mapper.convertValue(event, ObjectNode.class);
+        }
+
+        // no supported conversion were applied
+        if( result != null ) {
+            count ++;
+            return result;
+        }
+
+        LOGGER.debug("CONVERT FAILED");
+
+        return null;
+
+    }
+
+    public boolean validate(Object document, Class klass) {
+
+        // TODO
+        return true;
+    }
+
+    public boolean isValidJSON(final String json) {
+        boolean valid = false;
+        try {
+            final JsonParser parser = new ObjectMapper().getJsonFactory()
+                    .createJsonParser(json);
+            while (parser.nextToken() != null) {
+            }
+            valid = true;
+        } catch (JsonParseException jpe) {
+            LOGGER.warn("validate: {}", jpe);
+        } catch (IOException ioe) {
+            LOGGER.warn("validate: {}", ioe);
+        }
+
+        return valid;
+    }
+
+    @Override
+    public List<StreamsDatum> process(StreamsDatum entry) {
+
+        StreamsDatum result = null;
+
+        try {
+
+            Object item = entry.getDocument();
+            ObjectNode node;
+
+            LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass());
+
+            if( item instanceof String ) {
+
+                // if the target is string, just pass-through
+                if( String.class.equals(outClass)) {
+                    result = entry;
+                }
+                else {
+                    // first check for valid json
+                    node = (ObjectNode)mapper.readTree((String)item);
+
+                    // since data is coming from outside provider, we don't know what type the events are
+                    // for now we'll assume post
+
+                    Object out = convert(node, Post.class, outClass);
+
+                    if( out != null && validate(out, outClass))
+                        result = new StreamsDatum(out);
+                }
+
+            } else if( item instanceof ObjectNode || item instanceof Post) {
+
+                // first check for valid json
+                node = (ObjectNode)mapper.valueToTree(item);
+
+                // since data is coming from outside provider, we don't know what type the events are
+                // for now we'll assume post
+
+                Object out = convert(node, Post.class, outClass);
+
+                if( out != null && validate(out, outClass))
+                    result = new StreamsDatum(out);
+
+            }
+
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+
+        if( result != null )
+            return Lists.newArrayList(result);
+        else
+            return Lists.newArrayList();
+    }
+
+    @Override
+    public void prepare(Object o) {
+        mapper = new StreamsJacksonMapper();
+        facebookPostActivitySerializer = new FacebookPostActivitySerializer();
+    }
+
+    @Override
+    public void cleanUp() {
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/49d00d7c/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/provider/FacebookUserInformationProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/provider/FacebookUserInformationProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/provider/FacebookUserInformationProvider.java
new file mode 100644
index 0000000..b89a925
--- /dev/null
+++ b/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/provider/FacebookUserInformationProvider.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.facebook.provider;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigRenderOptions;
+import facebook4j.*;
+import facebook4j.conf.ConfigurationBuilder;
+import facebook4j.json.DataObjectFactory;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.facebook.FacebookUserInformationConfiguration;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.math.BigInteger;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class FacebookUserInformationProvider implements StreamsProvider, Serializable
+{
+
+    public static final String STREAMS_ID = "FacebookUserInformationProvider";
+    private static final Logger LOGGER = LoggerFactory.getLogger(FacebookUserInformationProvider.class);
+
+    private static final ObjectMapper mapper = new StreamsJacksonMapper();
+
+    private static final String ALL_PERMISSIONS = "ads_management,ads_read,create_event,create_note,email,export_stream,friends_about_me,friends_actions.books,friends_actions.music,friends_actions.news,friends_actions.video,friends_activities,friends_birthday,friends_education_history,friends_events,friends_games_activity,friends_groups,friends_hometown,friends_interests,friends_likes,friends_location,friends_notes,friends_online_presence,friends_photo_video_tags,friends_photos,friends_questions,friends_relationship_details,friends_relationships,friends_religion_politics,friends_status,friends_subscriptions,friends_videos,friends_website,friends_work_history,manage_friendlists,manage_notifications,manage_pages,photo_upload,publish_actions,publish_stream,read_friendlists,read_insights,read_mailbox,read_page_mailboxes,read_requests,read_stream,rsvp_event,share_item,sms,status_update,user_about_me,user_actions.books,user_actions.music,user_actions.news,user_actions.video,user_activitie
 s,user_birthday,user_education_history,user_events,user_friends,user_games_activity,user_groups,user_hometown,user_interests,user_likes,user_location,user_notes,user_online_presence,user_photo_video_tags,user_photos,user_questions,user_relationship_details,user_relationships,user_religion_politics,user_status,user_subscriptions,user_videos,user_website,user_work_history,video_upload,xmpp_login";
+    private FacebookUserInformationConfiguration facebookUserInformationConfiguration;
+
+    private Class klass;
+    protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>();
+
+    public FacebookUserInformationConfiguration getConfig()              { return facebookUserInformationConfiguration; }
+
+    public void setConfig(FacebookUserInformationConfiguration config)   { this.facebookUserInformationConfiguration = config; }
+
+    protected Iterator<Long[]> idsBatches;
+    protected Iterator<String[]> screenNameBatches;
+
+    protected ExecutorService executor;
+
+    protected DateTime start;
+    protected DateTime end;
+
+    protected final AtomicBoolean running = new AtomicBoolean();
+
+    private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
+        return new ThreadPoolExecutor(nThreads, nThreads,
+                5000L, TimeUnit.MILLISECONDS,
+                new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
+    }
+
+    public FacebookUserInformationProvider() {
+        Config config = StreamsConfigurator.config.getConfig("facebook");
+        FacebookUserInformationConfiguration facebookUserInformationConfiguration;
+        try {
+            facebookUserInformationConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class);
+        } catch (IOException e) {
+            e.printStackTrace();
+            return;
+        }
+    }
+
+    public FacebookUserInformationProvider(FacebookUserInformationConfiguration config) {
+        this.facebookUserInformationConfiguration = config;
+    }
+
+    public FacebookUserInformationProvider(Class klass) {
+        Config config = StreamsConfigurator.config.getConfig("facebook");
+        FacebookUserInformationConfiguration facebookUserInformationConfiguration;
+        try {
+            facebookUserInformationConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class);
+        } catch (IOException e) {
+            e.printStackTrace();
+            return;
+        }
+        this.klass = klass;
+    }
+
+    public FacebookUserInformationProvider(FacebookUserInformationConfiguration config, Class klass) {
+        this.facebookUserInformationConfiguration = config;
+        this.klass = klass;
+    }
+
+    public Queue<StreamsDatum> getProviderQueue() {
+        return this.providerQueue;
+    }
+
+    @Override
+    public void startStream() {
+        running.set(true);
+    }
+
+    private void loadBatch(String[] ids) {
+        Facebook client = getFacebookClient();
+        int keepTrying = 0;
+
+        // keep trying to load, give it 5 attempts.
+        //while (keepTrying < 10)
+        while (keepTrying < 1)
+        {
+            try {
+                for( User user : client.getUsers(ids)) {
+                    String json = DataObjectFactory.getRawJSON(user);
+
+                    providerQueue.offer(new StreamsDatum(json));
+//
+                };
+            } catch (FacebookException e) {
+                e.printStackTrace();
+                return;
+            }
+
+        }
+    }
+
+    public StreamsResultSet readCurrent() {
+
+//        Preconditions.checkArgument(idsBatches.hasNext() || screenNameBatches.hasNext());
+
+        LOGGER.info("readCurrent");
+
+        Facebook client = getFacebookClient();
+
+        try {
+            User me = client.users().getMe();
+            ResponseList<Friend> friendResponseList = client.friends().getFriends(me.getId());
+            Paging<Friend> friendPaging;
+            do {
+
+                for( Friend friend : friendResponseList ) {
+
+                    String json;
+                    try {
+                        json = mapper.writeValueAsString(friend);
+                        providerQueue.add(
+                                new StreamsDatum(json)
+                        );
+                    } catch (JsonProcessingException e) {
+//                        e.printStackTrace();
+                    }
+                }
+                friendPaging = friendResponseList.getPaging();
+                friendResponseList = client.fetchNext(friendPaging);
+            } while( friendPaging != null &&
+                    friendResponseList != null );
+
+//                    // Getting Next page
+//                for( Post post : feed ) {
+//                    String json;
+//                    try {
+//                        json = mapper.writeValueAsString(post);
+//                        providerQueue.add(
+//                                new StreamsDatum(json)
+//                        );
+//                    } catch (JsonProcessingException e) {
+//                        e.printStackTrace();
+//                    }
+//                }
+
+//                paging = feed.getPaging();
+//                feed = client.fetchPrevious(paging);
+//            } while( paging != null &&
+//                    feed != null );
+
+
+        } catch (FacebookException e) {
+            e.printStackTrace();
+        }
+
+        LOGGER.info("Finished.  Cleaning up...");
+
+        LOGGER.info("Providing {} docs", providerQueue.size());
+
+        StreamsResultSet result =  new StreamsResultSet(providerQueue);
+        running.set(false);
+
+        LOGGER.info("Exiting");
+
+        return result;
+
+    }
+
+    public StreamsResultSet readNew(BigInteger sequence) {
+        LOGGER.debug("{} readNew", STREAMS_ID);
+        throw new NotImplementedException();
+    }
+
+    public StreamsResultSet readRange(DateTime start, DateTime end) {
+        LOGGER.debug("{} readRange", STREAMS_ID);
+        this.start = start;
+        this.end = end;
+        readCurrent();
+        StreamsResultSet result = (StreamsResultSet)providerQueue.iterator();
+        return result;
+    }
+
+    @Override
+    public boolean isRunning() {
+        return running.get();
+    }
+
+    void shutdownAndAwaitTermination(ExecutorService pool) {
+        pool.shutdown(); // Disable new tasks from being submitted
+        try {
+            // Wait a while for existing tasks to terminate
+            if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+                pool.shutdownNow(); // Cancel currently executing tasks
+                // Wait a while for tasks to respond to being cancelled
+                if (!pool.awaitTermination(10, TimeUnit.SECONDS))
+                    System.err.println("Pool did not terminate");
+            }
+        } catch (InterruptedException ie) {
+            // (Re-)Cancel if current thread also interrupted
+            pool.shutdownNow();
+            // Preserve interrupt status
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    @Override
+    public void prepare(Object o) {
+
+        executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
+
+        Preconditions.checkNotNull(providerQueue);
+        Preconditions.checkNotNull(this.klass);
+        Preconditions.checkNotNull(facebookUserInformationConfiguration.getOauth().getAppId());
+        Preconditions.checkNotNull(facebookUserInformationConfiguration.getOauth().getAppSecret());
+        Preconditions.checkNotNull(facebookUserInformationConfiguration.getOauth().getUserAccessToken());
+        Preconditions.checkNotNull(facebookUserInformationConfiguration.getInfo());
+
+        List<String> screenNames = new ArrayList<String>();
+        List<String[]> screenNameBatches = new ArrayList<String[]>();
+
+        List<Long> ids = new ArrayList<Long>();
+        List<Long[]> idsBatches = new ArrayList<Long[]>();
+
+        for(String s : facebookUserInformationConfiguration.getInfo()) {
+            if(s != null)
+            {
+                String potentialScreenName = s.replaceAll("@", "").trim().toLowerCase();
+
+                // See if it is a long, if it is, add it to the user iD list, if it is not, add it to the
+                // screen name list
+                try {
+                    ids.add(Long.parseLong(potentialScreenName));
+                } catch (Exception e) {
+                    screenNames.add(potentialScreenName);
+                }
+
+                // Twitter allows for batches up to 100 per request, but you cannot mix types
+
+                if(ids.size() >= 100) {
+                    // add the batch
+                    idsBatches.add(ids.toArray(new Long[ids.size()]));
+                    // reset the Ids
+                    ids = new ArrayList<Long>();
+                }
+
+                if(screenNames.size() >= 100) {
+                    // add the batch
+                    screenNameBatches.add(screenNames.toArray(new String[ids.size()]));
+                    // reset the Ids
+                    screenNames = new ArrayList<String>();
+                }
+            }
+        }
+
+
+        if(ids.size() > 0)
+            idsBatches.add(ids.toArray(new Long[ids.size()]));
+
+        if(screenNames.size() > 0)
+            screenNameBatches.add(screenNames.toArray(new String[ids.size()]));
+
+        this.idsBatches = idsBatches.iterator();
+        this.screenNameBatches = screenNameBatches.iterator();
+    }
+
+    protected Facebook getFacebookClient()
+    {
+        ConfigurationBuilder cb = new ConfigurationBuilder();
+        cb.setDebugEnabled(true)
+            .setOAuthAppId(facebookUserInformationConfiguration.getOauth().getAppId())
+            .setOAuthAppSecret(facebookUserInformationConfiguration.getOauth().getAppSecret())
+            .setOAuthAccessToken(facebookUserInformationConfiguration.getOauth().getUserAccessToken())
+            .setOAuthPermissions(ALL_PERMISSIONS)
+            .setJSONStoreEnabled(true);
+
+        FacebookFactory ff = new FacebookFactory(cb.build());
+        Facebook facebook = ff.getInstance();
+
+        return facebook;
+    }
+
+    @Override
+    public void cleanUp() {
+        shutdownAndAwaitTermination(executor);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/49d00d7c/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/provider/FacebookUserstreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/provider/FacebookUserstreamProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/provider/FacebookUserstreamProvider.java
new file mode 100644
index 0000000..eae8069
--- /dev/null
+++ b/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/provider/FacebookUserstreamProvider.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.facebook.provider;
+
+import com.facebook.*;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Queues;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigRenderOptions;
+import facebook4j.*;
+import facebook4j.Post;
+import facebook4j.conf.ConfigurationBuilder;
+import facebook4j.json.DataObjectFactory;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.DatumStatusCounter;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.facebook.FacebookUserInformationConfiguration;
+import org.apache.streams.facebook.FacebookUserstreamConfiguration;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.util.ComponentUtils;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.math.BigInteger;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class FacebookUserstreamProvider implements StreamsProvider, Serializable {
+
+    public static final String STREAMS_ID = "FacebookUserstreamProvider";
+    private static final Logger LOGGER = LoggerFactory.getLogger(FacebookUserstreamProvider.class);
+
+    private static final ObjectMapper mapper = new StreamsJacksonMapper();
+
+    private static final String ALL_PERMISSIONS = "ads_management,ads_read,create_event,create_note,email,export_stream,friends_about_me,friends_actions.books,friends_actions.music,friends_actions.news,friends_actions.video,friends_activities,friends_birthday,friends_education_history,friends_events,friends_games_activity,friends_groups,friends_hometown,friends_interests,friends_likes,friends_location,friends_notes,friends_online_presence,friends_photo_video_tags,friends_photos,friends_questions,friends_relationship_details,friends_relationships,friends_religion_politics,friends_status,friends_subscriptions,friends_videos,friends_website,friends_work_history,manage_friendlists,manage_notifications,manage_pages,photo_upload,publish_actions,publish_stream,read_friendlists,read_insights,read_mailbox,read_page_mailboxes,read_requests,read_stream,rsvp_event,share_item,sms,status_update,user_about_me,user_actions.books,user_actions.music,user_actions.news,user_actions.video,user_activitie
 s,user_birthday,user_education_history,user_events,user_friends,user_games_activity,user_groups,user_hometown,user_interests,user_likes,user_location,user_notes,user_online_presence,user_photo_video_tags,user_photos,user_questions,user_relationship_details,user_relationships,user_religion_politics,user_status,user_subscriptions,user_videos,user_website,user_work_history,video_upload,xmpp_login";
+    private FacebookUserstreamConfiguration configuration;
+
+    private Class klass;
+    protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+    protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>();
+
+    public FacebookUserstreamConfiguration getConfig() {
+        return configuration;
+    }
+
+    public void setConfig(FacebookUserstreamConfiguration config) {
+        this.configuration = config;
+    }
+
+    protected ListeningExecutorService executor;
+
+    protected DateTime start;
+    protected DateTime end;
+
+    protected final AtomicBoolean running = new AtomicBoolean();
+
+    private DatumStatusCounter countersCurrent = new DatumStatusCounter();
+    private DatumStatusCounter countersTotal = new DatumStatusCounter();
+
+    private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
+        return new ThreadPoolExecutor(nThreads, nThreads,
+                5000L, TimeUnit.MILLISECONDS,
+                new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
+    }
+
+    public FacebookUserstreamProvider() {
+        Config config = StreamsConfigurator.config.getConfig("facebook");
+        FacebookUserInformationConfiguration facebookUserInformationConfiguration;
+        try {
+            facebookUserInformationConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class);
+        } catch (IOException e) {
+            e.printStackTrace();
+            return;
+        }
+    }
+
+    public FacebookUserstreamProvider(FacebookUserstreamConfiguration config) {
+        this.configuration = config;
+    }
+
+    public FacebookUserstreamProvider(Class klass) {
+        Config config = StreamsConfigurator.config.getConfig("facebook");
+        FacebookUserInformationConfiguration facebookUserInformationConfiguration;
+        try {
+            facebookUserInformationConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class);
+        } catch (IOException e) {
+            e.printStackTrace();
+            return;
+        }
+        this.klass = klass;
+    }
+
+    public FacebookUserstreamProvider(FacebookUserstreamConfiguration config, Class klass) {
+        this.configuration = config;
+        this.klass = klass;
+    }
+
+    public Queue<StreamsDatum> getProviderQueue() {
+        return this.providerQueue;
+    }
+
+    @Override
+    public void startStream() {
+
+        executor.submit(new FacebookFeedPollingTask(this));
+        running.set(true);
+    }
+
+    private void loadBatch(String[] ids) {
+        Facebook client = getFacebookClient();
+        int keepTrying = 0;
+
+        // keep trying to load, give it 5 attempts.
+        //while (keepTrying < 10)
+        while (keepTrying < 1) {
+            try {
+                for (User user : client.getUsers(ids)) {
+                    String json = DataObjectFactory.getRawJSON(user);
+
+                    providerQueue.offer(new StreamsDatum(json));
+//
+                }
+                ;
+            } catch (FacebookException e) {
+                e.printStackTrace();
+                return;
+            }
+
+        }
+    }
+
+    public StreamsResultSet readCurrent() {
+
+        StreamsResultSet current;
+
+        synchronized (FacebookUserstreamProvider.class) {
+            current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(providerQueue));
+            current.setCounter(new DatumStatusCounter());
+            current.getCounter().add(countersCurrent);
+            countersTotal.add(countersCurrent);
+            countersCurrent = new DatumStatusCounter();
+            providerQueue.clear();
+        }
+
+        return current;
+
+    }
+
+    public StreamsResultSet readNew(BigInteger sequence) {
+        LOGGER.debug("{} readNew", STREAMS_ID);
+        throw new NotImplementedException();
+    }
+
+    public StreamsResultSet readRange(DateTime start, DateTime end) {
+        LOGGER.debug("{} readRange", STREAMS_ID);
+        this.start = start;
+        this.end = end;
+        readCurrent();
+        StreamsResultSet result = (StreamsResultSet) providerQueue.iterator();
+        return result;
+    }
+
+    @Override
+    public boolean isRunning() {
+        return running.get();
+    }
+
+    void shutdownAndAwaitTermination(ExecutorService pool) {
+        pool.shutdown(); // Disable new tasks from being submitted
+        try {
+            // Wait a while for existing tasks to terminate
+            if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+                pool.shutdownNow(); // Cancel currently executing tasks
+                // Wait a while for tasks to respond to being cancelled
+                if (!pool.awaitTermination(10, TimeUnit.SECONDS))
+                    System.err.println("Pool did not terminate");
+            }
+        } catch (InterruptedException ie) {
+            // (Re-)Cancel if current thread also interrupted
+            pool.shutdownNow();
+            // Preserve interrupt status
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    @Override
+    public void prepare(Object o) {
+
+        executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
+
+        Preconditions.checkNotNull(providerQueue);
+        Preconditions.checkNotNull(this.klass);
+        Preconditions.checkNotNull(configuration.getOauth().getAppId());
+        Preconditions.checkNotNull(configuration.getOauth().getAppSecret());
+        Preconditions.checkNotNull(configuration.getOauth().getUserAccessToken());
+
+    }
+
+    protected Facebook getFacebookClient() {
+        ConfigurationBuilder cb = new ConfigurationBuilder();
+        cb.setDebugEnabled(true)
+                .setOAuthAppId(configuration.getOauth().getAppId())
+                .setOAuthAppSecret(configuration.getOauth().getAppSecret())
+                .setOAuthAccessToken(configuration.getOauth().getUserAccessToken())
+                .setOAuthPermissions(ALL_PERMISSIONS)
+                .setJSONStoreEnabled(true);
+
+        FacebookFactory ff = new FacebookFactory(cb.build());
+        Facebook facebook = ff.getInstance();
+
+        return facebook;
+    }
+
+    @Override
+    public void cleanUp() {
+        shutdownAndAwaitTermination(executor);
+    }
+
+    private class FacebookFeedPollingTask implements Runnable {
+
+        FacebookUserstreamProvider provider;
+        Facebook client;
+
+        private Set<Post> priorPollResult = Sets.newHashSet();
+
+        public FacebookFeedPollingTask(FacebookUserstreamProvider facebookUserstreamProvider) {
+            provider = facebookUserstreamProvider;
+        }
+
+        @Override
+        public void run() {
+            client = provider.getFacebookClient();
+            while (provider.isRunning()) {
+                try {
+                    ResponseList<Post> postResponseList = client.getHome();
+                    Set<Post> update = Sets.newHashSet(postResponseList);
+                    Set<Post> repeats = Sets.intersection(priorPollResult, Sets.newHashSet(update));
+                    Set<Post> entrySet = Sets.difference(update, repeats);
+                    for (Post item : entrySet) {
+                        String json = DataObjectFactory.getRawJSON(item);
+                        org.apache.streams.facebook.Post post = mapper.readValue(json, org.apache.streams.facebook.Post.class);
+                        try {
+                            lock.readLock().lock();
+                            ComponentUtils.offerUntilSuccess(new StreamsDatum(post), providerQueue);
+                            countersCurrent.incrementAttempt();
+                        } finally {
+                            lock.readLock().unlock();
+                        }
+                    }
+                    priorPollResult = update;
+                    Thread.sleep(configuration.getPollIntervalMillis());
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/49d00d7c/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/FacebookConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/FacebookConfiguration.json b/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/FacebookConfiguration.json
new file mode 100644
index 0000000..b4e5afb
--- /dev/null
+++ b/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/FacebookConfiguration.json
@@ -0,0 +1,49 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "id": "#",
+    "javaType" : "org.apache.streams.facebook.FacebookConfiguration",
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "protocol": {
+            "type": "string",
+            "description": "The protocol"
+        },
+        "host": {
+            "type": "string",
+            "description": "The host"
+        },
+        "port": {
+            "type": "integer",
+            "description": "The port"
+        },
+        "version": {
+            "type": "string",
+            "description": "The version"
+        },
+        "endpoint": {
+            "type": "string",
+            "description": "The endpoint"
+        },
+        "oauth": {
+            "type": "object",
+            "dynamic": "true",
+            "javaType" : "org.apache.streams.facebook.FacebookOAuthConfiguration",
+            "javaInterfaces": ["java.io.Serializable"],
+            "properties": {
+                "appId": {
+                    "type": "string"
+                },
+                "appSecret": {
+                    "type": "string"
+                },
+                "appAccessToken": {
+                    "type": "string"
+                },
+                "userAccessToken": {
+                    "type": "string"
+                }
+            }
+        }
+   }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/49d00d7c/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/FacebookUserInformationConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/FacebookUserInformationConfiguration.json b/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/FacebookUserInformationConfiguration.json
new file mode 100644
index 0000000..0454178
--- /dev/null
+++ b/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/FacebookUserInformationConfiguration.json
@@ -0,0 +1,18 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "id": "#",
+    "javaType" : "org.apache.streams.facebook.FacebookUserInformationConfiguration",
+    "javaInterfaces": ["java.io.Serializable"],
+    "extends": {"$ref":"FacebookConfiguration.json"},
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "info": {
+            "type": "array",
+            "description": "A list of user IDs, indicating users of interest",
+            "items": {
+                "type": "string"
+            }
+        }
+     }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/49d00d7c/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/FacebookUserstreamConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/FacebookUserstreamConfiguration.json b/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/FacebookUserstreamConfiguration.json
new file mode 100644
index 0000000..c823a12
--- /dev/null
+++ b/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/FacebookUserstreamConfiguration.json
@@ -0,0 +1,15 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "id": "#",
+    "javaType" : "org.apache.streams.facebook.FacebookUserstreamConfiguration",
+    "javaInterfaces": ["java.io.Serializable"],
+    "extends": {"$ref":"FacebookConfiguration.json"},
+    "properties": {
+        "pollIntervalMillis": {
+            "type": "integer",
+            "default" : "60000",
+            "description": "Polling interval in ms"
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/49d00d7c/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/graph/Post.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/graph/Post.json b/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/graph/Post.json
index c334f05..23bcb08 100644
--- a/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/graph/Post.json
+++ b/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/graph/Post.json
@@ -2,7 +2,7 @@
     "type": "object",
     "$schema": "http://json-schema.org/draft-03/schema",
     "id": "#",
-    "javaType": "com.facebook.Post",
+    "javaType": "org.apache.streams.facebook.Post",
     "properties": {
         "id": {
             "type": "string"
@@ -69,7 +69,7 @@
             "type": "string"
         },
         "caption": {
-            "type": "boolean"
+            "type": "string"
         },
         "description": {
             "type": "string"
@@ -109,7 +109,8 @@
                         "type": "string"
                     },
                     "created_time": {
-                        "type": "string"
+                        "type": "string",
+                        "format" : "date-time"
                     }
                 }
             }
@@ -163,10 +164,12 @@
             }
         },
         "created_time": {
-            "type": "string"
+            "type": "string",
+            "format" : "date-time"
         },
         "updated_time": {
-            "type": "string"
+            "type": "string",
+            "format" : "date-time"
         },
         "include_hidden": {
             "type": "boolean"
@@ -187,6 +190,14 @@
                     }
                 }
             }
+        },
+        "privacy": {
+            "type": "object",
+            "properties": {
+                "value": {
+                    "type": "string"
+                }
+            }
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/49d00d7c/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/FacebookActivitySerDeTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/FacebookActivitySerDeTest.java b/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/FacebookActivitySerDeTest.java
new file mode 100644
index 0000000..7f3ab27
--- /dev/null
+++ b/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/FacebookActivitySerDeTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.facebook.test;
+
+import com.facebook.api.FacebookPostActivitySerializer;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Joiner;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.input.BoundedInputStream;
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.facebook.Post;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+
+/**
+* Created with IntelliJ IDEA.
+* User: sblackmon
+* Date: 8/20/13
+* Time: 5:57 PM
+* To change this template use File | Settings | File Templates.
+*/
+public class FacebookActivitySerDeTest {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(FacebookActivitySerDeTest.class);
+    private FacebookPostActivitySerializer serializer = new FacebookPostActivitySerializer();
+    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+    @Test
+    public void Tests()
+    {
+        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.TRUE);
+        mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
+        mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
+
+        InputStream is = FacebookActivitySerDeTest.class.getResourceAsStream("/testpost.json");
+        Joiner joiner = Joiner.on(" ").skipNulls();
+        is = new BoundedInputStream(is, 10000);
+        String json;
+
+        try {
+            json = joiner.join(IOUtils.readLines(is));
+            LOGGER.debug(json);
+
+            Post post = mapper.readValue(json, Post.class);
+
+            Activity activity = serializer.deserialize(post);
+
+            LOGGER.debug(mapper.writeValueAsString(activity));
+
+        } catch( Exception e ) {
+            System.out.println(e);
+            e.printStackTrace();
+            Assert.fail();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/49d00d7c/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/FacebookPostActivitySerializerTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/FacebookPostActivitySerializerTest.java b/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/FacebookPostActivitySerializerTest.java
deleted file mode 100644
index a9b91e9..0000000
--- a/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/FacebookPostActivitySerializerTest.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.facebook.test;
-
-import com.facebook.api.FacebookPostActivitySerializer;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.streams.data.ActivitySerializer;
-import org.apache.streams.data.util.JsonUtil;
-import org.apache.streams.pojo.json.Activity;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.*;
-import java.util.regex.Pattern;
-
-import static java.util.regex.Pattern.matches;
-import static org.hamcrest.CoreMatchers.*;
-import static org.junit.Assert.assertThat;
-
-public class FacebookPostActivitySerializerTest {
-    Node fields;
-    JsonNode json;
-    ActivitySerializer serializer = new FacebookPostActivitySerializer();
-    ObjectMapper mapper;
-
-    @Before
-    public void setup() throws IOException {
-        json = JsonUtil.getFromFile("classpath:org/apache/streams/data/Facebook.json");
-        fields = discover(json);
-
-        mapper = new ObjectMapper();
-        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.FALSE);
-        mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
-        mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
-    }
-
-    @Test
-    public void loadData() throws Exception {
-        for (JsonNode item : json) {
-            Activity activity = serializer.deserialize(getString(item));
-            assertThat(activity, is(not(nullValue())));
-            assertThat(activity.getActor(), is(not(nullValue())));
-            assertThat(matches("id:facebook:people:[a-zA-Z0-9]*", activity.getActor().getId()), is(true));
-            assertThat(activity.getActor().getDisplayName(), is(not(nullValue())));
-            assertThat(activity.getObject(), is(not(nullValue())));
-            if(activity.getObject().getId() != null) {
-                assertThat(matches("id:facebook:[a-z]*s:[a-zA-Z0-9]*", activity.getObject().getId()), is(true));
-            }
-            assertThat(activity.getObject().getObjectType(), is(not(nullValue())));
-            assertThat(activity.getContent(), is(not(nullValue())));
-            assertThat(activity.getProvider().getId(), is(equalTo("id:providers:facebook")));
-            System.out.println(activity.getPublished());
-        }
-    }
-
-
-
-
-    public Node discover(JsonNode node) {
-        Node root = new Node(null, "root");
-        if (node == null || !node.isArray()) {
-            throw new RuntimeException("No data");
-        } else {
-            for (JsonNode item : node) {
-                mapNode(root, item);
-            }
-        }
-        //printTree(root, "");
-        //printUniqueFields(root);
-        return root;
-    }
-
-
-    private String getString(JsonNode jsonNode)  {
-        try {
-            return new ObjectMapper().writeValueAsString(jsonNode);
-        } catch (JsonProcessingException e) {
-            throw new RuntimeException(e);
-        }
-    }
-    private void printUniqueFields(Node root) {
-        Map<String, Set<String>> fieldsByType = new HashMap<String, Set<String>>();
-        fieldsByType.put("objectType", new HashSet<String>());
-        for(Node child : root.getChildren().values()) {
-           for(Node grandChild : child.getChildren().values()) {
-               fieldsByType.get("objectType").add(grandChild.getName());
-               addUniqueValues(grandChild, fieldsByType);
-           }
-        }
-        for(Map.Entry<String, Set<String>> entry : fieldsByType.entrySet()) {
-            System.out.println(entry.getKey());
-            List<String> value = new ArrayList<String>(entry.getValue());
-            Collections.sort(value);
-            for(String val : value) {
-                System.out.println("      " + val);
-            }
-            System.out.println();
-            System.out.println();
-        }
-    }
-
-    private void addUniqueValues(Node child, Map<String, Set<String>> fieldsByType) {
-        if(!fieldsByType.containsKey(child.getName()) && !isNumber(child.getName())) {
-            fieldsByType.put(child.getName(), new HashSet<String>());
-        }
-        for(Map.Entry<String, Node> gc : child.getChildren().entrySet()) {
-            if(!isNumber(gc.getKey()))
-                fieldsByType.get(child.getName()).add(gc.getKey());
-            addUniqueValues(gc.getValue(), fieldsByType);
-        }
-    }
-
-    private boolean isNumber(String key) {
-        Pattern p = Pattern.compile("[0-9]*");
-        return p.matcher(key.trim()).matches();
-    }
-
-    private void printTree(Node node, String spacer) {
-        System.out.println(String.format("%s %s (%s)", spacer, node.getName(), node.getType()));
-        List<Node> children = new ArrayList<Node>(node.getChildren().values());
-        Collections.sort(children);
-        for(Node child : children) {
-            printTree(child, spacer + "      ");
-        }
-    }
-
-    private void mapNode(Node parent, JsonNode jsonNode) {
-        for (Iterator<Map.Entry<String, JsonNode>> iter = jsonNode.fields(); iter.hasNext(); ) {
-            Map.Entry<String, JsonNode> property = iter.next();
-            Node current;
-            String key = property.getKey();
-            JsonNode value = property.getValue();
-            if (!parent.getChildren().containsKey(key)) {
-                current = new Node(null, key);
-                current.setType(value.getNodeType().toString());
-                parent.getChildren().put(key, current);
-            } else {
-                current = parent.getChildren().get(key);
-            }
-            if(!value.isArray() && value.isObject()){
-                mapNode(current, value);
-            }
-        }
-    }
-
-    private static class Node implements Comparable<Node>{
-        Node parent;
-        String name;
-        String type;
-        Map<String, Node> children = new HashMap<String, Node>();
-
-        private Node(Node parent, String name) {
-            this.parent = parent;
-            this.name = name;
-        }
-
-        private Node getParent() {
-            return parent;
-        }
-
-        private void setParent(Node parent) {
-            this.parent = parent;
-        }
-
-        private String getName() {
-            return name;
-        }
-
-        private void setName(String name) {
-            this.name = name;
-        }
-
-        private Map<String, Node> getChildren() {
-            return children;
-        }
-
-        private void setChildren(Map<String, Node> children) {
-            this.children = children;
-        }
-
-        private String getType() {
-            return type;
-        }
-
-        private void setType(String type) {
-            this.type = type;
-        }
-
-        @Override
-        public int compareTo(Node node) {
-            return this.name.compareTo(node.name);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/49d00d7c/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/FacebookPostSerDeTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/FacebookPostSerDeTest.java b/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/FacebookPostSerDeTest.java
index 4457ed3..6a761cb 100644
--- a/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/FacebookPostSerDeTest.java
+++ b/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/FacebookPostSerDeTest.java
@@ -23,7 +23,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Joiner;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.io.input.BoundedInputStream;
-import com.facebook.Post;
+import org.apache.streams.facebook.Post;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.Ignore;
@@ -42,8 +43,7 @@ import java.io.InputStream;
 public class FacebookPostSerDeTest {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(FacebookPostSerDeTest.class);
-    //private ActivitySerializer serializer = new TwitterJsonActivitySerializer();
-    private ObjectMapper mapper = new ObjectMapper();
+    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
 
     @Ignore
     @Test