You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/07/21 20:44:04 UTC
git commit: STREAMS-105 | Updated the InstagramTypeConverter to use
the conversion utility functions provided in InstagramActivityUtil
Repository: incubator-streams
Updated Branches:
refs/heads/STREAMS-46 a03120223 -> e8511ada0
STREAMS-105 | Updated the InstagramTypeConverter to use the conversion utility functions provided in InstagramActivityUtil
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/e8511ada
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/e8511ada
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/e8511ada
Branch: refs/heads/STREAMS-46
Commit: e8511ada025c1837db8ddd4fb62d24eb231a9f7f
Parents: a031202
Author: Robert Douglas <rd...@w2odigital.com>
Authored: Wed Jul 2 10:48:35 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Mon Jul 21 12:25:43 2014 -0500
----------------------------------------------------------------------
.../provider/FacebookUserstreamProvider.java | 72 +++++++++++++-------
.../FacebookUserstreamConfiguration.json | 7 ++
2 files changed, 53 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e8511ada/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/provider/FacebookUserstreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/provider/FacebookUserstreamProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/provider/FacebookUserstreamProvider.java
index eae8069..af7868b 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/provider/FacebookUserstreamProvider.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/provider/FacebookUserstreamProvider.java
@@ -62,7 +62,7 @@ public class FacebookUserstreamProvider implements StreamsProvider, Serializable
private static final ObjectMapper mapper = new StreamsJacksonMapper();
- private static final String ALL_PERMISSIONS = "ads_management,ads_read,create_event,create_note,email,export_stream,friends_about_me,friends_actions.books,friends_actions.music,friends_actions.news,friends_actions.video,friends_activities,friends_birthday,friends_education_history,friends_events,friends_games_activity,friends_groups,friends_hometown,friends_interests,friends_likes,friends_location,friends_notes,friends_online_presence,friends_photo_video_tags,friends_photos,friends_questions,friends_relationship_details,friends_relationships,friends_religion_politics,friends_status,friends_subscriptions,friends_videos,friends_website,friends_work_history,manage_friendlists,manage_notifications,manage_pages,photo_upload,publish_actions,publish_stream,read_friendlists,read_insights,read_mailbox,read_page_mailboxes,read_requests,read_stream,rsvp_event,share_item,sms,status_update,user_about_me,user_actions.books,user_actions.music,user_actions.news,user_actions.video,user_activitie
s,user_birthday,user_education_history,user_events,user_friends,user_games_activity,user_groups,user_hometown,user_interests,user_likes,user_location,user_notes,user_online_presence,user_photo_video_tags,user_photos,user_questions,user_relationship_details,user_relationships,user_religion_politics,user_status,user_subscriptions,user_videos,user_website,user_work_history,video_upload,xmpp_login";
+ private static final String ALL_PERMISSIONS = "read_stream";
private FacebookUserstreamConfiguration configuration;
private Class klass;
@@ -88,6 +88,8 @@ public class FacebookUserstreamProvider implements StreamsProvider, Serializable
private DatumStatusCounter countersCurrent = new DatumStatusCounter();
private DatumStatusCounter countersTotal = new DatumStatusCounter();
+ protected Facebook client;
+
private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
return new ThreadPoolExecutor(nThreads, nThreads,
5000L, TimeUnit.MILLISECONDS,
@@ -133,31 +135,17 @@ public class FacebookUserstreamProvider implements StreamsProvider, Serializable
@Override
public void startStream() {
- executor.submit(new FacebookFeedPollingTask(this));
- running.set(true);
- }
-
- private void loadBatch(String[] ids) {
- Facebook client = getFacebookClient();
- int keepTrying = 0;
-
- // keep trying to load, give it 5 attempts.
- //while (keepTrying < 10)
- while (keepTrying < 1) {
- try {
- for (User user : client.getUsers(ids)) {
- String json = DataObjectFactory.getRawJSON(user);
+ client = getFacebookClient();
- providerQueue.offer(new StreamsDatum(json));
-//
- }
- ;
- } catch (FacebookException e) {
- e.printStackTrace();
- return;
+ if( configuration.getInfo() != null &&
+ configuration.getInfo().size() > 0 ) {
+ for( String id : configuration.getInfo()) {
+ executor.submit(new FacebookFeedPollingTask(this, id));
}
-
+ } else {
+ executor.submit(new FacebookFeedPollingTask(this));
}
+ running.set(true);
}
public StreamsResultSet readCurrent() {
@@ -225,6 +213,28 @@ public class FacebookUserstreamProvider implements StreamsProvider, Serializable
Preconditions.checkNotNull(configuration.getOauth().getAppSecret());
Preconditions.checkNotNull(configuration.getOauth().getUserAccessToken());
+ client = getFacebookClient();
+
+ if( configuration.getInfo() != null &&
+ configuration.getInfo().size() > 0 ) {
+
+ List<String> ids = new ArrayList<String>();
+ List<String[]> idsBatches = new ArrayList<String[]>();
+
+ for (String s : configuration.getInfo()) {
+ if (s != null) {
+ ids.add(s);
+
+ if (ids.size() >= 100) {
+ // add the batch
+ idsBatches.add(ids.toArray(new String[ids.size()]));
+ // reset the Ids
+ ids = new ArrayList<String>();
+ }
+
+ }
+ }
+ }
}
protected Facebook getFacebookClient() {
@@ -251,19 +261,29 @@ public class FacebookUserstreamProvider implements StreamsProvider, Serializable
FacebookUserstreamProvider provider;
Facebook client;
+ String id;
private Set<Post> priorPollResult = Sets.newHashSet();
public FacebookFeedPollingTask(FacebookUserstreamProvider facebookUserstreamProvider) {
- provider = facebookUserstreamProvider;
+ this.provider = facebookUserstreamProvider;
}
+ public FacebookFeedPollingTask(FacebookUserstreamProvider facebookUserstreamProvider, String id) {
+ this.provider = facebookUserstreamProvider;
+ this.client = provider.client;
+ this.id = id;
+ }
@Override
public void run() {
- client = provider.getFacebookClient();
while (provider.isRunning()) {
+ ResponseList<Post> postResponseList;
try {
- ResponseList<Post> postResponseList = client.getHome();
+ if( id != null )
+ postResponseList = client.getFeed(id);
+ else
+ postResponseList = client.getHome();
+
Set<Post> update = Sets.newHashSet(postResponseList);
Set<Post> repeats = Sets.intersection(priorPollResult, Sets.newHashSet(update));
Set<Post> entrySet = Sets.difference(update, repeats);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e8511ada/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/FacebookUserstreamConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/FacebookUserstreamConfiguration.json b/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/FacebookUserstreamConfiguration.json
index c823a12..bcb2258 100644
--- a/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/FacebookUserstreamConfiguration.json
+++ b/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/FacebookUserstreamConfiguration.json
@@ -6,6 +6,13 @@
"javaInterfaces": ["java.io.Serializable"],
"extends": {"$ref":"FacebookConfiguration.json"},
"properties": {
+ "info": {
+ "type": "array",
+ "description": "A list of user IDs, indicating users of interest",
+ "items": {
+ "type": "string"
+ }
+ },
"pollIntervalMillis": {
"type": "integer",
"default" : "60000",