You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/07/21 20:44:04 UTC

git commit: STREAMS-105 | Updated the InstagramTypeConverter to use the conversion utility functions provided in InstagramActivityUtil

Repository: incubator-streams
Updated Branches:
  refs/heads/STREAMS-46 a03120223 -> e8511ada0


STREAMS-105 | Updated the InstagramTypeConverter to use the conversion utility functions provided in InstagramActivityUtil


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/e8511ada
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/e8511ada
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/e8511ada

Branch: refs/heads/STREAMS-46
Commit: e8511ada025c1837db8ddd4fb62d24eb231a9f7f
Parents: a031202
Author: Robert Douglas <rd...@w2odigital.com>
Authored: Wed Jul 2 10:48:35 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Mon Jul 21 12:25:43 2014 -0500

----------------------------------------------------------------------
 .../provider/FacebookUserstreamProvider.java    | 72 +++++++++++++-------
 .../FacebookUserstreamConfiguration.json        |  7 ++
 2 files changed, 53 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e8511ada/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/provider/FacebookUserstreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/provider/FacebookUserstreamProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/provider/FacebookUserstreamProvider.java
index eae8069..af7868b 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/provider/FacebookUserstreamProvider.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/provider/FacebookUserstreamProvider.java
@@ -62,7 +62,7 @@ public class FacebookUserstreamProvider implements StreamsProvider, Serializable
 
     private static final ObjectMapper mapper = new StreamsJacksonMapper();
 
-    private static final String ALL_PERMISSIONS = "ads_management,ads_read,create_event,create_note,email,export_stream,friends_about_me,friends_actions.books,friends_actions.music,friends_actions.news,friends_actions.video,friends_activities,friends_birthday,friends_education_history,friends_events,friends_games_activity,friends_groups,friends_hometown,friends_interests,friends_likes,friends_location,friends_notes,friends_online_presence,friends_photo_video_tags,friends_photos,friends_questions,friends_relationship_details,friends_relationships,friends_religion_politics,friends_status,friends_subscriptions,friends_videos,friends_website,friends_work_history,manage_friendlists,manage_notifications,manage_pages,photo_upload,publish_actions,publish_stream,read_friendlists,read_insights,read_mailbox,read_page_mailboxes,read_requests,read_stream,rsvp_event,share_item,sms,status_update,user_about_me,user_actions.books,user_actions.music,user_actions.news,user_actions.video,user_activitie
 s,user_birthday,user_education_history,user_events,user_friends,user_games_activity,user_groups,user_hometown,user_interests,user_likes,user_location,user_notes,user_online_presence,user_photo_video_tags,user_photos,user_questions,user_relationship_details,user_relationships,user_religion_politics,user_status,user_subscriptions,user_videos,user_website,user_work_history,video_upload,xmpp_login";
+    private static final String ALL_PERMISSIONS = "read_stream";
     private FacebookUserstreamConfiguration configuration;
 
     private Class klass;
@@ -88,6 +88,8 @@ public class FacebookUserstreamProvider implements StreamsProvider, Serializable
     private DatumStatusCounter countersCurrent = new DatumStatusCounter();
     private DatumStatusCounter countersTotal = new DatumStatusCounter();
 
+    protected Facebook client;
+
     private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
         return new ThreadPoolExecutor(nThreads, nThreads,
                 5000L, TimeUnit.MILLISECONDS,
@@ -133,31 +135,17 @@ public class FacebookUserstreamProvider implements StreamsProvider, Serializable
     @Override
     public void startStream() {
 
-        executor.submit(new FacebookFeedPollingTask(this));
-        running.set(true);
-    }
-
-    private void loadBatch(String[] ids) {
-        Facebook client = getFacebookClient();
-        int keepTrying = 0;
-
-        // keep trying to load, give it 5 attempts.
-        //while (keepTrying < 10)
-        while (keepTrying < 1) {
-            try {
-                for (User user : client.getUsers(ids)) {
-                    String json = DataObjectFactory.getRawJSON(user);
+        client = getFacebookClient();
 
-                    providerQueue.offer(new StreamsDatum(json));
-//
-                }
-                ;
-            } catch (FacebookException e) {
-                e.printStackTrace();
-                return;
+        if( configuration.getInfo() != null &&
+            configuration.getInfo().size() > 0 ) {
+            for( String id : configuration.getInfo()) {
+                executor.submit(new FacebookFeedPollingTask(this, id));
             }
-
+        } else {
+            executor.submit(new FacebookFeedPollingTask(this));
         }
+        running.set(true);
     }
 
     public StreamsResultSet readCurrent() {
@@ -225,6 +213,28 @@ public class FacebookUserstreamProvider implements StreamsProvider, Serializable
         Preconditions.checkNotNull(configuration.getOauth().getAppSecret());
         Preconditions.checkNotNull(configuration.getOauth().getUserAccessToken());
 
+        client = getFacebookClient();
+
+        if( configuration.getInfo() != null &&
+            configuration.getInfo().size() > 0 ) {
+
+            List<String> ids = new ArrayList<String>();
+            List<String[]> idsBatches = new ArrayList<String[]>();
+
+            for (String s : configuration.getInfo()) {
+                if (s != null) {
+                    ids.add(s);
+
+                    if (ids.size() >= 100) {
+                        // add the batch
+                        idsBatches.add(ids.toArray(new String[ids.size()]));
+                        // reset the Ids
+                        ids = new ArrayList<String>();
+                    }
+
+                }
+            }
+        }
     }
 
     protected Facebook getFacebookClient() {
@@ -251,19 +261,29 @@ public class FacebookUserstreamProvider implements StreamsProvider, Serializable
 
         FacebookUserstreamProvider provider;
         Facebook client;
+        String id;
 
         private Set<Post> priorPollResult = Sets.newHashSet();
 
         public FacebookFeedPollingTask(FacebookUserstreamProvider facebookUserstreamProvider) {
-            provider = facebookUserstreamProvider;
+            this.provider = facebookUserstreamProvider;
         }
 
+        public FacebookFeedPollingTask(FacebookUserstreamProvider facebookUserstreamProvider, String id) {
+            this.provider = facebookUserstreamProvider;
+            this.client = provider.client;
+            this.id = id;
+        }
         @Override
         public void run() {
-            client = provider.getFacebookClient();
             while (provider.isRunning()) {
+                ResponseList<Post> postResponseList;
                 try {
-                    ResponseList<Post> postResponseList = client.getHome();
+                    if( id != null )
+                        postResponseList = client.getFeed(id);
+                    else
+                        postResponseList = client.getHome();
+
                     Set<Post> update = Sets.newHashSet(postResponseList);
                     Set<Post> repeats = Sets.intersection(priorPollResult, Sets.newHashSet(update));
                     Set<Post> entrySet = Sets.difference(update, repeats);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e8511ada/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/FacebookUserstreamConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/FacebookUserstreamConfiguration.json b/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/FacebookUserstreamConfiguration.json
index c823a12..bcb2258 100644
--- a/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/FacebookUserstreamConfiguration.json
+++ b/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/FacebookUserstreamConfiguration.json
@@ -6,6 +6,13 @@
     "javaInterfaces": ["java.io.Serializable"],
     "extends": {"$ref":"FacebookConfiguration.json"},
     "properties": {
+        "info": {
+            "type": "array",
+            "description": "A list of user IDs, indicating users of interest",
+            "items": {
+                "type": "string"
+            }
+        },
         "pollIntervalMillis": {
             "type": "integer",
             "default" : "60000",