You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/11/25 20:25:12 UTC
[31/42] incubator-streams git commit: STREAMS-440: custom
checkstyle.xml, address compliance
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookProvider.java
index e907082..be59bd7 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookProvider.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookProvider.java
@@ -15,16 +15,9 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.streams.facebook.provider;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Queues;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.typesafe.config.ConfigRenderOptions;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
@@ -34,6 +27,16 @@ import org.apache.streams.facebook.IdConfig;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.util.ComponentUtils;
import org.apache.streams.util.SerializationUtil;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Queues;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.typesafe.config.ConfigRenderOptions;
+
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,7 +47,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -52,103 +56,109 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/
public abstract class FacebookProvider implements StreamsProvider {
- private final static String STREAMS_ID = "FacebookProvider";
-
- private static final Logger LOGGER = LoggerFactory.getLogger(FacebookProvider.class);
- private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
- private static final int MAX_BATCH_SIZE = 2000;
-
- protected FacebookConfiguration configuration;
- protected BlockingQueue<StreamsDatum> datums;
-
- private AtomicBoolean isComplete;
- private ListeningExecutorService executor;
- List<ListenableFuture<Object>> futures = new ArrayList<>();
-
- private FacebookDataCollector dataCollector;
-
- public FacebookProvider() {
- try {
- this.configuration = MAPPER.readValue(StreamsConfigurator.config.getConfig("facebook").root().render(ConfigRenderOptions.concise()), FacebookConfiguration.class);
- } catch (IOException ioe) {
- LOGGER.error("Exception trying to read default config : {}", ioe);
- }
- }
-
- public FacebookProvider(FacebookConfiguration configuration) {
- this.configuration = (FacebookConfiguration) SerializationUtil.cloneBySerialization(configuration);
- }
-
- @Override
- public String getId() {
- return STREAMS_ID;
- }
+ private static final String STREAMS_ID = "FacebookProvider";
- @Override
- public void startStream() {
- ListenableFuture future = executor.submit(getDataCollector());
- futures.add(future);
- executor.shutdown();
- }
+ private static final Logger LOGGER = LoggerFactory.getLogger(FacebookProvider.class);
+ private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+ private static final int MAX_BATCH_SIZE = 2000;
- protected abstract FacebookDataCollector getDataCollector();
-
- @Override
- public StreamsResultSet readCurrent() {
- int batchSize = 0;
- BlockingQueue<StreamsDatum> batch = Queues.newLinkedBlockingQueue();
- while(!this.datums.isEmpty() && batchSize < MAX_BATCH_SIZE) {
- ComponentUtils.offerUntilSuccess(ComponentUtils.pollWhileNotEmpty(this.datums), batch);
- ++batchSize;
- }
- return new StreamsResultSet(batch);
- }
+ protected FacebookConfiguration configuration;
+ protected BlockingQueue<StreamsDatum> datums;
- @Override
- public StreamsResultSet readNew(BigInteger sequence) {
- return null;
- }
+ private AtomicBoolean isComplete;
+ private ListeningExecutorService executor;
+ List<ListenableFuture<Object>> futures = new ArrayList<>();
- @Override
- public StreamsResultSet readRange(DateTime start, DateTime end) {
- return null;
- }
+ private FacebookDataCollector dataCollector;
- @Override
- public void prepare(Object configurationObject) {
- this.datums = Queues.newLinkedBlockingQueue();
- this.isComplete = new AtomicBoolean(false);
- this.executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
+ /**
+ * FacebookProvider constructor - resolves FacebookConfiguration from JVM 'facebook'.
+ */
+ public FacebookProvider() {
+ try {
+ this.configuration = MAPPER.readValue(StreamsConfigurator.config.getConfig("facebook").root().render(ConfigRenderOptions.concise()), FacebookConfiguration.class);
+ } catch (IOException ioe) {
+ LOGGER.error("Exception trying to read default config : {}", ioe);
}
-
- @Override
- public void cleanUp() {
- ComponentUtils.shutdownExecutor(executor, 5, 5);
- executor = null;
+ }
+
+ /**
+ * FacebookProvider constructor - uses supplied FacebookConfiguration.
+ */
+ public FacebookProvider(FacebookConfiguration configuration) {
+ this.configuration = (FacebookConfiguration) SerializationUtil.cloneBySerialization(configuration);
+ }
+
+ @Override
+ public String getId() {
+ return STREAMS_ID;
+ }
+
+ @Override
+ public void startStream() {
+ ListenableFuture future = executor.submit(getDataCollector());
+ futures.add(future);
+ executor.shutdown();
+ }
+
+ protected abstract FacebookDataCollector getDataCollector();
+
+ @Override
+ public StreamsResultSet readCurrent() {
+ int batchSize = 0;
+ BlockingQueue<StreamsDatum> batch = Queues.newLinkedBlockingQueue();
+ while (!this.datums.isEmpty() && batchSize < MAX_BATCH_SIZE) {
+ ComponentUtils.offerUntilSuccess(ComponentUtils.pollWhileNotEmpty(this.datums), batch);
+ ++batchSize;
}
-
- /**
- * Overrides the ids and addedAfter time in the configuration
- * @param idsToAfterDate
- */
- public void overrideIds(Map<String, DateTime> idsToAfterDate) {
- Set<IdConfig> ids = Sets.newHashSet();
- for(String id : idsToAfterDate.keySet()) {
- IdConfig idConfig = new IdConfig();
- idConfig.setId(id);
- idConfig.setAfterDate(idsToAfterDate.get(id));
- ids.add(idConfig);
- }
- this.configuration.setIds(ids);
+ return new StreamsResultSet(batch);
+ }
+
+ @Override
+ public StreamsResultSet readNew(BigInteger sequence) {
+ return null;
+ }
+
+ @Override
+ public StreamsResultSet readRange(DateTime start, DateTime end) {
+ return null;
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+ this.datums = Queues.newLinkedBlockingQueue();
+ this.isComplete = new AtomicBoolean(false);
+ this.executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
+ }
+
+ @Override
+ public void cleanUp() {
+ ComponentUtils.shutdownExecutor(executor, 5, 5);
+ executor = null;
+ }
+
+ /**
+ * Overrides the ids and addedAfter time in the configuration.
+ * @param idsToAfterDate idsToAfterDate
+ */
+ public void overrideIds(Map<String, DateTime> idsToAfterDate) {
+ Set<IdConfig> ids = Sets.newHashSet();
+ for (String id : idsToAfterDate.keySet()) {
+ IdConfig idConfig = new IdConfig();
+ idConfig.setId(id);
+ idConfig.setAfterDate(idsToAfterDate.get(id));
+ ids.add(idConfig);
}
-
- @Override
- public boolean isRunning() {
- if (datums.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone()) {
- LOGGER.info("Completed");
- isComplete.set(true);
- LOGGER.info("Exiting");
- }
- return !isComplete.get();
+ this.configuration.setIds(ids);
+ }
+
+ @Override
+ public boolean isRunning() {
+ if (datums.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone()) {
+ LOGGER.info("Completed");
+ isComplete.set(true);
+ LOGGER.info("Exiting");
}
+ return !isComplete.get();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserInformationProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserInformationProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserInformationProvider.java
index 1262106..3939f23 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserInformationProvider.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserInformationProvider.java
@@ -18,24 +18,24 @@
package org.apache.streams.facebook.provider;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigRenderOptions;
-import facebook4j.*;
-import facebook4j.conf.ConfigurationBuilder;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
import org.apache.streams.core.StreamsResultSet;
import org.apache.streams.facebook.FacebookUserInformationConfiguration;
import org.apache.streams.jackson.StreamsJacksonMapper;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigRenderOptions;
+
+import org.apache.commons.lang.NotImplementedException;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.commons.lang.NotImplementedException;
import java.io.IOException;
import java.io.Serializable;
@@ -44,262 +44,291 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
-import java.util.concurrent.*;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-public class FacebookUserInformationProvider implements StreamsProvider, Serializable
-{
+import facebook4j.Facebook;
+import facebook4j.FacebookException;
+import facebook4j.FacebookFactory;
+import facebook4j.Friend;
+import facebook4j.Paging;
+import facebook4j.ResponseList;
+import facebook4j.User;
+import facebook4j.conf.ConfigurationBuilder;
- public static final String STREAMS_ID = "FacebookUserInformationProvider";
- private static final Logger LOGGER = LoggerFactory.getLogger(FacebookUserInformationProvider.class);
+public class FacebookUserInformationProvider implements StreamsProvider, Serializable {
- private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+ public static final String STREAMS_ID = "FacebookUserInformationProvider";
- private static final String ALL_PERMISSIONS = "ads_management,ads_read,create_event,create_note,email,export_stream,friends_about_me,friends_actions.books,friends_actions.music,friends_actions.news,friends_actions.video,friends_activities,friends_birthday,friends_education_history,friends_events,friends_games_activity,friends_groups,friends_hometown,friends_interests,friends_likes,friends_location,friends_notes,friends_online_presence,friends_photo_video_tags,friends_photos,friends_questions,friends_relationship_details,friends_relationships,friends_religion_politics,friends_status,friends_subscriptions,friends_videos,friends_website,friends_work_history,manage_friendlists,manage_notifications,manage_pages,photo_upload,publish_actions,publish_stream,read_friendlists,read_insights,read_mailbox,read_page_mailboxes,read_requests,read_stream,rsvp_event,share_item,sms,status_update,user_about_me,user_actions.books,user_actions.music,user_actions.news,user_actions.video,user_activitie
s,user_birthday,user_education_history,user_events,user_friends,user_games_activity,user_groups,user_hometown,user_interests,user_likes,user_location,user_notes,user_online_presence,user_photo_video_tags,user_photos,user_questions,user_relationship_details,user_relationships,user_religion_politics,user_status,user_subscriptions,user_videos,user_website,user_work_history,video_upload,xmpp_login";
- private FacebookUserInformationConfiguration facebookUserInformationConfiguration;
+ private static final Logger LOGGER = LoggerFactory.getLogger(FacebookUserInformationProvider.class);
- private Class klass;
- protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>();
+ private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
- public FacebookUserInformationConfiguration getConfig() { return facebookUserInformationConfiguration; }
+ private static final String ALL_PERMISSIONS = "ads_management,ads_read,create_event,create_note,email,export_stream,friends_about_me,friends_actions.books,friends_actions.music,friends_actions.news,friends_actions.video,friends_activities,friends_birthday,friends_education_history,friends_events,friends_games_activity,friends_groups,friends_hometown,friends_interests,friends_likes,friends_location,friends_notes,friends_online_presence,friends_photo_video_tags,friends_photos,friends_questions,friends_relationship_details,friends_relationships,friends_religion_politics,friends_status,friends_subscriptions,friends_videos,friends_website,friends_work_history,manage_friendlists,manage_notifications,manage_pages,photo_upload,publish_actions,publish_stream,read_friendlists,read_insights,read_mailbox,read_page_mailboxes,read_requests,read_stream,rsvp_event,share_item,sms,status_update,user_about_me,user_actions.books,user_actions.music,user_actions.news,user_actions.video,user_activities,
user_birthday,user_education_history,user_events,user_friends,user_games_activity,user_groups,user_hometown,user_interests,user_likes,user_location,user_notes,user_online_presence,user_photo_video_tags,user_photos,user_questions,user_relationship_details,user_relationships,user_religion_politics,user_status,user_subscriptions,user_videos,user_website,user_work_history,video_upload,xmpp_login";
+ private FacebookUserInformationConfiguration facebookUserInformationConfiguration;
- public void setConfig(FacebookUserInformationConfiguration config) { this.facebookUserInformationConfiguration = config; }
+ private Class klass;
+ protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>();
- protected Iterator<String[]> idsBatches;
+ public FacebookUserInformationConfiguration getConfig() {
+ return facebookUserInformationConfiguration;
+ }
- protected ExecutorService executor;
+ public void setConfig(FacebookUserInformationConfiguration config) {
+ this.facebookUserInformationConfiguration = config;
+ }
- protected DateTime start;
- protected DateTime end;
+ protected Iterator<String[]> idsBatches;
- protected final AtomicBoolean running = new AtomicBoolean();
+ protected ExecutorService executor;
- private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
- return new ThreadPoolExecutor(nThreads, nThreads,
- 5000L, TimeUnit.MILLISECONDS,
- new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
- }
+ protected DateTime start;
+ protected DateTime end;
- public FacebookUserInformationProvider() {
- Config config = StreamsConfigurator.config.getConfig("facebook");
- FacebookUserInformationConfiguration facebookUserInformationConfiguration;
- try {
- facebookUserInformationConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class);
- } catch (IOException e) {
- e.printStackTrace();
- return;
- }
- }
+ protected final AtomicBoolean running = new AtomicBoolean();
- public FacebookUserInformationProvider(FacebookUserInformationConfiguration config) {
- this.facebookUserInformationConfiguration = config;
- }
+ private static ExecutorService newFixedThreadPoolWithQueueSize(int numThreads, int queueSize) {
+ return new ThreadPoolExecutor(numThreads, numThreads,
+ 5000L, TimeUnit.MILLISECONDS,
+ new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
+ }
- public FacebookUserInformationProvider(Class klass) {
- Config config = StreamsConfigurator.config.getConfig("facebook");
- FacebookUserInformationConfiguration facebookUserInformationConfiguration;
- try {
- facebookUserInformationConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class);
- } catch (IOException e) {
- e.printStackTrace();
- return;
- }
- this.klass = klass;
+ /**
+ * FacebookUserInformationProvider constructor - resolves FacebookUserInformationConfiguration from JVM 'facebook'.
+ */
+ public FacebookUserInformationProvider() {
+ Config config = StreamsConfigurator.config.getConfig("facebook");
+ FacebookUserInformationConfiguration facebookUserInformationConfiguration;
+ try {
+ facebookUserInformationConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class);
+ } catch (IOException ex) {
+ ex.printStackTrace();
+ return;
}
-
- public FacebookUserInformationProvider(FacebookUserInformationConfiguration config, Class klass) {
- this.facebookUserInformationConfiguration = config;
- this.klass = klass;
+ }
+
+ /**
+ * FacebookUserInformationProvider constructor - uses supplie FacebookUserInformationConfiguration.
+ * @param config
+ */
+ public FacebookUserInformationProvider(FacebookUserInformationConfiguration config) {
+ this.facebookUserInformationConfiguration = config;
+ }
+
+ public FacebookUserInformationProvider(Class klass) {
+ Config config = StreamsConfigurator.config.getConfig("facebook");
+ FacebookUserInformationConfiguration facebookUserInformationConfiguration;
+ try {
+ facebookUserInformationConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class);
+ } catch (IOException ex) {
+ ex.printStackTrace();
+ return;
}
-
- public Queue<StreamsDatum> getProviderQueue() {
- return this.providerQueue;
+ this.klass = klass;
+ }
+
+ public FacebookUserInformationProvider(FacebookUserInformationConfiguration config, Class klass) {
+ this.facebookUserInformationConfiguration = config;
+ this.klass = klass;
+ }
+
+ public Queue<StreamsDatum> getProviderQueue() {
+ return this.providerQueue;
+ }
+
+ @Override
+ public String getId() {
+ return STREAMS_ID;
+ }
+
+ @Override
+ public void startStream() {
+ running.set(true);
+ }
+
+ @Override
+ public StreamsResultSet readCurrent() {
+
+ Preconditions.checkArgument(idsBatches.hasNext());
+
+ LOGGER.info("readCurrent");
+
+ Facebook client = getFacebookClient();
+
+ try {
+ User me = client.users().getMe();
+ String json = mapper.writeValueAsString(me);
+ providerQueue.add(
+ new StreamsDatum(json, DateTime.now())
+ );
+ } catch (JsonProcessingException ex) {
+ ex.printStackTrace();
+ } catch (FacebookException ex) {
+ ex.printStackTrace();
}
- @Override
- public String getId() {
- return STREAMS_ID;
- }
-
- @Override
- public void startStream() {
- running.set(true);
- }
-
- public StreamsResultSet readCurrent() {
-
- Preconditions.checkArgument(idsBatches.hasNext());
-
- LOGGER.info("readCurrent");
-
- Facebook client = getFacebookClient();
-
+ if (idsBatches.hasNext()) {
+ while (idsBatches.hasNext()) {
try {
- User me = client.users().getMe();
- String json = mapper.writeValueAsString(me);
- providerQueue.add(
- new StreamsDatum(json, DateTime.now())
- );
- } catch (JsonProcessingException e) {
- e.printStackTrace();
- } catch (FacebookException e) {
- e.printStackTrace();
- }
+ List<User> userList = client.users().getUsers(idsBatches.next());
+ for (User user : userList) {
- if( idsBatches.hasNext()) {
- while (idsBatches.hasNext()) {
- try {
- List<User> userList = client.users().getUsers(idsBatches.next());
- for (User user : userList) {
-
- try {
- String json = mapper.writeValueAsString(user);
- providerQueue.add(
- new StreamsDatum(json, DateTime.now())
- );
- } catch (JsonProcessingException e) {
- // e.printStackTrace();
- }
- }
-
- } catch (FacebookException e) {
- e.printStackTrace();
- }
- }
- } else {
try {
- ResponseList<Friend> friendResponseList = client.friends().getFriends();
- Paging<Friend> friendPaging;
- do {
-
- for( Friend friend : friendResponseList ) {
-
- String json;
- try {
- json = mapper.writeValueAsString(friend);
- providerQueue.add(
- new StreamsDatum(json)
- );
- } catch (JsonProcessingException e) {
-// e.printStackTrace();
- }
- }
- friendPaging = friendResponseList.getPaging();
- friendResponseList = client.fetchNext(friendPaging);
- } while( friendPaging != null &&
- friendResponseList != null );
- } catch (FacebookException e) {
- e.printStackTrace();
+ String json = mapper.writeValueAsString(user);
+ providerQueue.add(
+ new StreamsDatum(json, DateTime.now())
+ );
+ } catch (JsonProcessingException ex) {
+ LOGGER.trace("JsonProcessingException", ex);
}
+ }
+ } catch (FacebookException ex) {
+ ex.printStackTrace();
}
+ }
+ } else {
+ try {
+ ResponseList<Friend> friendResponseList = client.friends().getFriends();
+ Paging<Friend> friendPaging;
+ do {
- LOGGER.info("Finished. Cleaning up...");
-
- LOGGER.info("Providing {} docs", providerQueue.size());
-
- StreamsResultSet result = new StreamsResultSet(providerQueue);
- running.set(false);
+ for ( Friend friend : friendResponseList ) {
- LOGGER.info("Exiting");
-
- return result;
-
- }
-
- public StreamsResultSet readNew(BigInteger sequence) {
- LOGGER.debug("{} readNew", STREAMS_ID);
- throw new NotImplementedException();
- }
-
- public StreamsResultSet readRange(DateTime start, DateTime end) {
- LOGGER.debug("{} readRange", STREAMS_ID);
- this.start = start;
- this.end = end;
- readCurrent();
- StreamsResultSet result = (StreamsResultSet)providerQueue.iterator();
- return result;
- }
+ String json;
+ try {
+ json = mapper.writeValueAsString(friend);
+ providerQueue.add(
+ new StreamsDatum(json)
+ );
+ } catch (JsonProcessingException ex) {
+ LOGGER.trace("JsonProcessingException", ex);
+ }
+ }
+ friendPaging = friendResponseList.getPaging();
+ friendResponseList = client.fetchNext(friendPaging);
+ }
+ while ( friendPaging != null
+ &&
+ friendResponseList != null );
+ } catch (FacebookException ex) {
+ ex.printStackTrace();
+ }
- @Override
- public boolean isRunning() {
- return running.get();
}
- void shutdownAndAwaitTermination(ExecutorService pool) {
- pool.shutdown(); // Disable new tasks from being submitted
- try {
- // Wait a while for existing tasks to terminate
- if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
- pool.shutdownNow(); // Cancel currently executing tasks
- // Wait a while for tasks to respond to being cancelled
- if (!pool.awaitTermination(10, TimeUnit.SECONDS))
- System.err.println("Pool did not terminate");
- }
- } catch (InterruptedException ie) {
- // (Re-)Cancel if current thread also interrupted
- pool.shutdownNow();
- // Preserve interrupt status
- Thread.currentThread().interrupt();
+ LOGGER.info("Finished. Cleaning up...");
+
+ LOGGER.info("Providing {} docs", providerQueue.size());
+
+ StreamsResultSet result = new StreamsResultSet(providerQueue);
+ running.set(false);
+
+ LOGGER.info("Exiting");
+
+ return result;
+
+ }
+
+ @Override
+ public StreamsResultSet readNew(BigInteger sequence) {
+ LOGGER.debug("{} readNew", STREAMS_ID);
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public StreamsResultSet readRange(DateTime start, DateTime end) {
+ LOGGER.debug("{} readRange", STREAMS_ID);
+ this.start = start;
+ this.end = end;
+ readCurrent();
+ StreamsResultSet result = (StreamsResultSet)providerQueue.iterator();
+ return result;
+ }
+
+ @Override
+ public boolean isRunning() {
+ return running.get();
+ }
+
+ void shutdownAndAwaitTermination(ExecutorService pool) {
+ pool.shutdown(); // Disable new tasks from being submitted
+ try {
+ // Wait a while for existing tasks to terminate
+ if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+ pool.shutdownNow(); // Cancel currently executing tasks
+ // Wait a while for tasks to respond to being cancelled
+ if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+ System.err.println("Pool did not terminate");
}
+ }
+ } catch (InterruptedException ie) {
+ // (Re-)Cancel if current thread also interrupted
+ pool.shutdownNow();
+ // Preserve interrupt status
+ Thread.currentThread().interrupt();
}
+ }
- @Override
- public void prepare(Object o) {
-
- executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
+ @Override
+ public void prepare(Object configurationObject) {
- Preconditions.checkNotNull(providerQueue);
- Preconditions.checkNotNull(this.klass);
- Preconditions.checkNotNull(facebookUserInformationConfiguration.getOauth().getAppId());
- Preconditions.checkNotNull(facebookUserInformationConfiguration.getOauth().getAppSecret());
- Preconditions.checkNotNull(facebookUserInformationConfiguration.getOauth().getUserAccessToken());
- Preconditions.checkNotNull(facebookUserInformationConfiguration.getInfo());
+ executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
- List<String> ids = new ArrayList<String>();
- List<String[]> idsBatches = new ArrayList<String[]>();
+ Preconditions.checkNotNull(providerQueue);
+ Preconditions.checkNotNull(this.klass);
+ Preconditions.checkNotNull(facebookUserInformationConfiguration.getOauth().getAppId());
+ Preconditions.checkNotNull(facebookUserInformationConfiguration.getOauth().getAppSecret());
+ Preconditions.checkNotNull(facebookUserInformationConfiguration.getOauth().getUserAccessToken());
+ Preconditions.checkNotNull(facebookUserInformationConfiguration.getInfo());
- for(String s : facebookUserInformationConfiguration.getInfo()) {
- if(s != null)
- {
- ids.add(s);
+ List<String> ids = new ArrayList<String>();
+ List<String[]> idsBatches = new ArrayList<String[]>();
- if(ids.size() >= 100) {
- // add the batch
- idsBatches.add(ids.toArray(new String[ids.size()]));
- // reset the Ids
- ids = new ArrayList<String>();
- }
+ for (String s : facebookUserInformationConfiguration.getInfo()) {
+ if (s != null) {
+ ids.add(s);
- }
+ if (ids.size() >= 100) {
+ // add the batch
+ idsBatches.add(ids.toArray(new String[ids.size()]));
+ // reset the Ids
+ ids = new ArrayList<String>();
}
- if(ids.size() > 0)
- idsBatches.add(ids.toArray(new String[ids.size()]));
-
- this.idsBatches = idsBatches.iterator();
+ }
}
- protected Facebook getFacebookClient()
- {
- ConfigurationBuilder cb = new ConfigurationBuilder();
- cb.setDebugEnabled(true)
- .setOAuthAppId(facebookUserInformationConfiguration.getOauth().getAppId())
- .setOAuthAppSecret(facebookUserInformationConfiguration.getOauth().getAppSecret())
- .setOAuthAccessToken(facebookUserInformationConfiguration.getOauth().getUserAccessToken())
- .setOAuthPermissions(ALL_PERMISSIONS)
- .setJSONStoreEnabled(true)
- .setClientVersion("v1.0");
-
- FacebookFactory ff = new FacebookFactory(cb.build());
- Facebook facebook = ff.getInstance();
-
- return facebook;
+ if (ids.size() > 0) {
+ idsBatches.add(ids.toArray(new String[ids.size()]));
}
- @Override
- public void cleanUp() {
- shutdownAndAwaitTermination(executor);
- }
+ this.idsBatches = idsBatches.iterator();
+ }
+
+ protected Facebook getFacebookClient() {
+ ConfigurationBuilder cb = new ConfigurationBuilder();
+ cb.setDebugEnabled(true)
+ .setOAuthAppId(facebookUserInformationConfiguration.getOauth().getAppId())
+ .setOAuthAppSecret(facebookUserInformationConfiguration.getOauth().getAppSecret())
+ .setOAuthAccessToken(facebookUserInformationConfiguration.getOauth().getUserAccessToken())
+ .setOAuthPermissions(ALL_PERMISSIONS)
+ .setJSONStoreEnabled(true)
+ .setClientVersion("v1.0");
+
+ FacebookFactory ff = new FacebookFactory(cb.build());
+ Facebook facebook = ff.getInstance();
+
+ return facebook;
+ }
+
+ @Override
+ public void cleanUp() {
+ shutdownAndAwaitTermination(executor);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserstreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserstreamProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserstreamProvider.java
index 0f2121a..b292d30 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserstreamProvider.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserstreamProvider.java
@@ -18,18 +18,6 @@
package org.apache.streams.facebook.provider;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Queues;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigRenderOptions;
-import facebook4j.*;
-import facebook4j.Post;
-import facebook4j.conf.ConfigurationBuilder;
-import facebook4j.json.DataObjectFactory;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.DatumStatusCounter;
import org.apache.streams.core.StreamsDatum;
@@ -39,10 +27,20 @@ import org.apache.streams.facebook.FacebookUserInformationConfiguration;
import org.apache.streams.facebook.FacebookUserstreamConfiguration;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.util.ComponentUtils;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Queues;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigRenderOptions;
+
+import org.apache.commons.lang.NotImplementedException;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.commons.lang.NotImplementedException;
import java.io.IOException;
import java.io.Serializable;
@@ -51,276 +49,306 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.Set;
-import java.util.concurrent.*;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import facebook4j.Facebook;
+import facebook4j.FacebookException;
+import facebook4j.FacebookFactory;
+import facebook4j.Post;
+import facebook4j.ResponseList;
+import facebook4j.conf.ConfigurationBuilder;
+import facebook4j.json.DataObjectFactory;
+
public class FacebookUserstreamProvider implements StreamsProvider, Serializable {
- public static final String STREAMS_ID = "FacebookUserstreamProvider";
+ public static final String STREAMS_ID = "FacebookUserstreamProvider";
- private static final Logger LOGGER = LoggerFactory.getLogger(FacebookUserstreamProvider.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(FacebookUserstreamProvider.class);
- private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+ private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
- private static final String ALL_PERMISSIONS = "read_stream";
- private FacebookUserstreamConfiguration configuration;
+ private static final String ALL_PERMISSIONS = "read_stream";
+ private FacebookUserstreamConfiguration configuration;
- private Class klass;
- protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+ private Class klass;
+ protected final ReadWriteLock lock = new ReentrantReadWriteLock();
- protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>();
+ protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>();
- public FacebookUserstreamConfiguration getConfig() {
- return configuration;
- }
+ public FacebookUserstreamConfiguration getConfig() {
+ return configuration;
+ }
- public void setConfig(FacebookUserstreamConfiguration config) {
- this.configuration = config;
- }
+ public void setConfig(FacebookUserstreamConfiguration config) {
+ this.configuration = config;
+ }
- protected ListeningExecutorService executor;
+ protected ListeningExecutorService executor;
- protected DateTime start;
- protected DateTime end;
+ protected DateTime start;
+ protected DateTime end;
- protected final AtomicBoolean running = new AtomicBoolean();
+ protected final AtomicBoolean running = new AtomicBoolean();
- private DatumStatusCounter countersCurrent = new DatumStatusCounter();
- private DatumStatusCounter countersTotal = new DatumStatusCounter();
+ private DatumStatusCounter countersCurrent = new DatumStatusCounter();
+ private DatumStatusCounter countersTotal = new DatumStatusCounter();
- protected Facebook client;
+ protected Facebook client;
- private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
- return new ThreadPoolExecutor(nThreads, nThreads,
- 5000L, TimeUnit.MILLISECONDS,
- new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
- }
+ private static ExecutorService newFixedThreadPoolWithQueueSize(int numThreads, int queueSize) {
+ return new ThreadPoolExecutor(numThreads, numThreads,
+ 5000L, TimeUnit.MILLISECONDS,
+ new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
+ }
- public FacebookUserstreamProvider() {
- Config config = StreamsConfigurator.config.getConfig("facebook");
- FacebookUserInformationConfiguration facebookUserInformationConfiguration;
- try {
- facebookUserInformationConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class);
- } catch (IOException e) {
- e.printStackTrace();
- return;
- }
+ /**
+ * FacebookUserstreamProvider constructor.
+ */
+ public FacebookUserstreamProvider() {
+ Config config = StreamsConfigurator.config.getConfig("facebook");
+ FacebookUserInformationConfiguration facebookUserInformationConfiguration;
+ try {
+ facebookUserInformationConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class);
+ } catch (IOException ex) {
+ ex.printStackTrace();
+ return;
}
-
- public FacebookUserstreamProvider(FacebookUserstreamConfiguration config) {
- this.configuration = config;
+ }
+
+ /**
+ * FacebookUserstreamProvider constructor.
+ * @param config config
+ */
+ public FacebookUserstreamProvider(FacebookUserstreamConfiguration config) {
+ this.configuration = config;
+ }
+
+ /**
+ * FacebookUserstreamProvider constructor.
+ * @param klass output Class
+ */
+ public FacebookUserstreamProvider(Class klass) {
+ Config config = StreamsConfigurator.config.getConfig("facebook");
+ FacebookUserInformationConfiguration facebookUserInformationConfiguration;
+ try {
+ facebookUserInformationConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class);
+ } catch (IOException ex) {
+ ex.printStackTrace();
+ return;
}
-
- public FacebookUserstreamProvider(Class klass) {
- Config config = StreamsConfigurator.config.getConfig("facebook");
- FacebookUserInformationConfiguration facebookUserInformationConfiguration;
- try {
- facebookUserInformationConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class);
- } catch (IOException e) {
- e.printStackTrace();
- return;
- }
- this.klass = klass;
+ this.klass = klass;
+ }
+
+ public FacebookUserstreamProvider(FacebookUserstreamConfiguration config, Class klass) {
+ this.configuration = config;
+ this.klass = klass;
+ }
+
+ public Queue<StreamsDatum> getProviderQueue() {
+ return this.providerQueue;
+ }
+
+ @Override
+ public String getId() {
+ return STREAMS_ID;
+ }
+
+ @Override
+ public void startStream() {
+
+ client = getFacebookClient();
+
+ if ( configuration.getInfo() != null
+ &&
+ configuration.getInfo().size() > 0 ) {
+ for ( String id : configuration.getInfo()) {
+ executor.submit(new FacebookFeedPollingTask(this, id));
+ }
+ running.set(true);
+ } else {
+ try {
+ String id = client.getMe().getId();
+ executor.submit(new FacebookFeedPollingTask(this, id));
+ running.set(true);
+ } catch (FacebookException ex) {
+ LOGGER.error(ex.getMessage());
+ running.set(false);
+ }
}
+ }
- public FacebookUserstreamProvider(FacebookUserstreamConfiguration config, Class klass) {
- this.configuration = config;
- this.klass = klass;
- }
+ @Override
+ public StreamsResultSet readCurrent() {
- public Queue<StreamsDatum> getProviderQueue() {
- return this.providerQueue;
- }
+ StreamsResultSet current;
- @Override
- public String getId() {
- return STREAMS_ID;
+ synchronized (FacebookUserstreamProvider.class) {
+ current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(providerQueue));
+ current.setCounter(new DatumStatusCounter());
+ current.getCounter().add(countersCurrent);
+ countersTotal.add(countersCurrent);
+ countersCurrent = new DatumStatusCounter();
+ providerQueue.clear();
}
- @Override
- public void startStream() {
-
- client = getFacebookClient();
-
- if( configuration.getInfo() != null &&
- configuration.getInfo().size() > 0 ) {
- for( String id : configuration.getInfo()) {
- executor.submit(new FacebookFeedPollingTask(this, id));
- }
- running.set(true);
- } else {
- try {
- String id = client.getMe().getId();
- executor.submit(new FacebookFeedPollingTask(this, id));
- running.set(true);
- } catch (FacebookException e) {
- LOGGER.error(e.getMessage());
- running.set(false);
- }
+ return current;
+
+ }
+
+ @Override
+ public StreamsResultSet readNew(BigInteger sequence) {
+ LOGGER.debug("{} readNew", STREAMS_ID);
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public StreamsResultSet readRange(DateTime start, DateTime end) {
+ LOGGER.debug("{} readRange", STREAMS_ID);
+ this.start = start;
+ this.end = end;
+ readCurrent();
+ StreamsResultSet result = (StreamsResultSet) providerQueue.iterator();
+ return result;
+ }
+
+ @Override
+ public boolean isRunning() {
+ return running.get();
+ }
+
+ void shutdownAndAwaitTermination(ExecutorService pool) {
+ pool.shutdown(); // Disable new tasks from being submitted
+ try {
+ // Wait a while for existing tasks to terminate
+ if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+ pool.shutdownNow(); // Cancel currently executing tasks
+ // Wait a while for tasks to respond to being cancelled
+ if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+ System.err.println("Pool did not terminate");
}
+ }
+ } catch (InterruptedException ie) {
+ // (Re-)Cancel if current thread also interrupted
+ pool.shutdownNow();
+ // Preserve interrupt status
+ Thread.currentThread().interrupt();
}
+ }
- public StreamsResultSet readCurrent() {
+ @Override
+ public void prepare(Object configurationObject) {
- StreamsResultSet current;
+ executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
- synchronized (FacebookUserstreamProvider.class) {
- current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(providerQueue));
- current.setCounter(new DatumStatusCounter());
- current.getCounter().add(countersCurrent);
- countersTotal.add(countersCurrent);
- countersCurrent = new DatumStatusCounter();
- providerQueue.clear();
- }
+ Preconditions.checkNotNull(providerQueue);
+ Preconditions.checkNotNull(this.klass);
+ Preconditions.checkNotNull(configuration.getOauth().getAppId());
+ Preconditions.checkNotNull(configuration.getOauth().getAppSecret());
+ Preconditions.checkNotNull(configuration.getOauth().getUserAccessToken());
- return current;
+ client = getFacebookClient();
- }
+ if ( configuration.getInfo() != null
+ &&
+ configuration.getInfo().size() > 0 ) {
- public StreamsResultSet readNew(BigInteger sequence) {
- LOGGER.debug("{} readNew", STREAMS_ID);
- throw new NotImplementedException();
- }
+ List<String> ids = new ArrayList<String>();
+ List<String[]> idsBatches = new ArrayList<String[]>();
- public StreamsResultSet readRange(DateTime start, DateTime end) {
- LOGGER.debug("{} readRange", STREAMS_ID);
- this.start = start;
- this.end = end;
- readCurrent();
- StreamsResultSet result = (StreamsResultSet) providerQueue.iterator();
- return result;
- }
+ for (String s : configuration.getInfo()) {
+ if (s != null) {
+ ids.add(s);
- @Override
- public boolean isRunning() {
- return running.get();
- }
+ if (ids.size() >= 100) {
+ // add the batch
+ idsBatches.add(ids.toArray(new String[ids.size()]));
+ // reset the Ids
+ ids = new ArrayList<String>();
+ }
- void shutdownAndAwaitTermination(ExecutorService pool) {
- pool.shutdown(); // Disable new tasks from being submitted
- try {
- // Wait a while for existing tasks to terminate
- if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
- pool.shutdownNow(); // Cancel currently executing tasks
- // Wait a while for tasks to respond to being cancelled
- if (!pool.awaitTermination(10, TimeUnit.SECONDS))
- System.err.println("Pool did not terminate");
- }
- } catch (InterruptedException ie) {
- // (Re-)Cancel if current thread also interrupted
- pool.shutdownNow();
- // Preserve interrupt status
- Thread.currentThread().interrupt();
}
+ }
}
+ }
- @Override
- public void prepare(Object o) {
-
- executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
+ protected Facebook getFacebookClient() {
+ ConfigurationBuilder cb = new ConfigurationBuilder();
+ cb.setDebugEnabled(true)
+ .setOAuthAppId(configuration.getOauth().getAppId())
+ .setOAuthAppSecret(configuration.getOauth().getAppSecret())
+ .setOAuthAccessToken(configuration.getOauth().getUserAccessToken())
+ .setOAuthPermissions(ALL_PERMISSIONS)
+ .setJSONStoreEnabled(true);
- Preconditions.checkNotNull(providerQueue);
- Preconditions.checkNotNull(this.klass);
- Preconditions.checkNotNull(configuration.getOauth().getAppId());
- Preconditions.checkNotNull(configuration.getOauth().getAppSecret());
- Preconditions.checkNotNull(configuration.getOauth().getUserAccessToken());
+ FacebookFactory ff = new FacebookFactory(cb.build());
+ Facebook facebook = ff.getInstance();
- client = getFacebookClient();
+ return facebook;
+ }
- if( configuration.getInfo() != null &&
- configuration.getInfo().size() > 0 ) {
+ @Override
+ public void cleanUp() {
+ shutdownAndAwaitTermination(executor);
+ }
- List<String> ids = new ArrayList<String>();
- List<String[]> idsBatches = new ArrayList<String[]>();
+ private class FacebookFeedPollingTask implements Runnable {
- for (String s : configuration.getInfo()) {
- if (s != null) {
- ids.add(s);
+ FacebookUserstreamProvider provider;
+ Facebook client;
+ String id;
- if (ids.size() >= 100) {
- // add the batch
- idsBatches.add(ids.toArray(new String[ids.size()]));
- // reset the Ids
- ids = new ArrayList<String>();
- }
+ private Set<Post> priorPollResult = Sets.newHashSet();
- }
- }
- }
+ public FacebookFeedPollingTask(FacebookUserstreamProvider facebookUserstreamProvider) {
+ this.provider = facebookUserstreamProvider;
}
- protected Facebook getFacebookClient() {
- ConfigurationBuilder cb = new ConfigurationBuilder();
- cb.setDebugEnabled(true)
- .setOAuthAppId(configuration.getOauth().getAppId())
- .setOAuthAppSecret(configuration.getOauth().getAppSecret())
- .setOAuthAccessToken(configuration.getOauth().getUserAccessToken())
- .setOAuthPermissions(ALL_PERMISSIONS)
- .setJSONStoreEnabled(true);
-
- FacebookFactory ff = new FacebookFactory(cb.build());
- Facebook facebook = ff.getInstance();
-
- return facebook;
+ public FacebookFeedPollingTask(FacebookUserstreamProvider facebookUserstreamProvider, String id) {
+ this.provider = facebookUserstreamProvider;
+ this.client = provider.client;
+ this.id = id;
}
@Override
- public void cleanUp() {
- shutdownAndAwaitTermination(executor);
- }
-
- private class FacebookFeedPollingTask implements Runnable {
-
- FacebookUserstreamProvider provider;
- Facebook client;
- String id;
-
- private Set<Post> priorPollResult = Sets.newHashSet();
-
- public FacebookFeedPollingTask(FacebookUserstreamProvider facebookUserstreamProvider) {
- this.provider = facebookUserstreamProvider;
- }
-
- public FacebookFeedPollingTask(FacebookUserstreamProvider facebookUserstreamProvider, String id) {
- this.provider = facebookUserstreamProvider;
- this.client = provider.client;
- this.id = id;
- }
- @Override
- public void run() {
- while (provider.isRunning()) {
- ResponseList<Post> postResponseList;
- try {
- postResponseList = client.getFeed(id);
-
- Set<Post> update = Sets.newHashSet(postResponseList);
- Set<Post> repeats = Sets.intersection(priorPollResult, Sets.newHashSet(update));
- Set<Post> entrySet = Sets.difference(update, repeats);
- LOGGER.debug(this.id + " response: " + update.size() + " previous: " + repeats.size() + " new: " + entrySet.size());
- for (Post item : entrySet) {
- String json = DataObjectFactory.getRawJSON(item);
- org.apache.streams.facebook.Post post = mapper.readValue(json, org.apache.streams.facebook.Post.class);
- try {
- lock.readLock().lock();
- ComponentUtils.offerUntilSuccess(new StreamsDatum(post), providerQueue);
- countersCurrent.incrementAttempt();
- } finally {
- lock.readLock().unlock();
- }
- }
- priorPollResult = update;
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- try {
- Thread.sleep(configuration.getPollIntervalMillis());
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
+ public void run() {
+ while (provider.isRunning()) {
+ ResponseList<Post> postResponseList;
+ try {
+ postResponseList = client.getFeed(id);
+
+ Set<Post> update = Sets.newHashSet(postResponseList);
+ Set<Post> repeats = Sets.intersection(priorPollResult, Sets.newHashSet(update));
+ Set<Post> entrySet = Sets.difference(update, repeats);
+ LOGGER.debug(this.id + " response: " + update.size() + " previous: " + repeats.size() + " new: " + entrySet.size());
+ for (Post item : entrySet) {
+ String json = DataObjectFactory.getRawJSON(item);
+ org.apache.streams.facebook.Post post = mapper.readValue(json, org.apache.streams.facebook.Post.class);
+ try {
+ lock.readLock().lock();
+ ComponentUtils.offerUntilSuccess(new StreamsDatum(post), providerQueue);
+ countersCurrent.incrementAttempt();
+ } finally {
+ lock.readLock().unlock();
}
+ }
+ priorPollResult = update;
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ } finally {
+ try {
+ Thread.sleep(configuration.getPollIntervalMillis());
+ } catch (InterruptedException interrupt) {
+ Thread.currentThread().interrupt();
+ }
}
+ }
}
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageDataCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageDataCollector.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageDataCollector.java
index 0e88dd4..68d8f06 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageDataCollector.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageDataCollector.java
@@ -20,7 +20,6 @@ package org.apache.streams.facebook.provider.page;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.facebook.FacebookConfiguration;
-import org.apache.streams.facebook.FacebookPageProviderConfiguration;
import org.apache.streams.facebook.IdConfig;
import org.apache.streams.facebook.provider.FacebookDataCollector;
import org.apache.streams.jackson.StreamsJacksonMapper;
@@ -28,7 +27,6 @@ import org.apache.streams.jackson.StreamsJacksonMapper;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,11 +34,10 @@ import java.util.concurrent.BlockingQueue;
import facebook4j.FacebookException;
import facebook4j.Page;
-import facebook4j.Reading;
import facebook4j.json.DataObjectFactory;
/**
- * Collects the page data from public Facebook pages
+ * Collects the page data from public Facebook pages.
*/
public class FacebookPageDataCollector extends FacebookDataCollector {
@@ -48,11 +45,8 @@ public class FacebookPageDataCollector extends FacebookDataCollector {
private static final int MAX_ATTEMPTS = 5;
private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
- private String fields;
-
- public FacebookPageDataCollector(BlockingQueue<StreamsDatum> queue, FacebookPageProviderConfiguration configuration) {
+ public FacebookPageDataCollector(BlockingQueue<StreamsDatum> queue, FacebookConfiguration configuration) {
super(configuration, queue);
- fields = StringUtils.join(configuration.getFields(), ',');
}
@Override
@@ -70,7 +64,7 @@ public class FacebookPageDataCollector extends FacebookDataCollector {
while (attempt < MAX_ATTEMPTS) {
++attempt;
try {
- Page page = getNextFacebookClient().getPage(pageId, new Reading().fields(fields));
+ Page page = getNextFacebookClient().getPage(pageId);
return page;
} catch (FacebookException fe) {
LOGGER.error("Facebook returned an exception : {}", fe);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageProvider.java
index d11a486..e7bbcfa 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageProvider.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageProvider.java
@@ -80,9 +80,8 @@ public class FacebookPageProvider extends FacebookProvider {
private FacebookPageProviderConfiguration configuration;
- public FacebookPageProvider(FacebookPageProviderConfiguration facebookConfiguration) {
+ public FacebookPageProvider(FacebookConfiguration facebookConfiguration) {
super(facebookConfiguration);
- configuration = facebookConfiguration;
}
@VisibleForTesting
@@ -92,7 +91,7 @@ public class FacebookPageProvider extends FacebookProvider {
@Override
protected FacebookDataCollector getDataCollector() {
- return new FacebookPageDataCollector(super.datums, configuration);
+ return new FacebookPageDataCollector(super.datums, super.configuration);
}
/**
@@ -115,7 +114,7 @@ public class FacebookPageProvider extends FacebookProvider {
Config typesafe = conf.withFallback(reference).resolve();
StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe);
- FacebookPageProviderConfiguration config = new ComponentConfigurator<>(FacebookPageProviderConfiguration.class).detectConfiguration(typesafe, "facebook");
+ FacebookConfiguration config = new ComponentConfigurator<>(FacebookConfiguration.class).detectConfiguration(typesafe, "facebook");
FacebookPageProvider provider = new FacebookPageProvider(config);
PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookPageFeedDataCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookPageFeedDataCollector.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookPageFeedDataCollector.java
index c2ba700..f509170 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookPageFeedDataCollector.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookPageFeedDataCollector.java
@@ -15,116 +15,124 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.streams.facebook.provider.pagefeed;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import facebook4j.*;
-import facebook4j.json.DataObjectFactory;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.facebook.FacebookConfiguration;
import org.apache.streams.facebook.IdConfig;
import org.apache.streams.facebook.provider.FacebookDataCollector;
import org.apache.streams.jackson.StreamsJacksonMapper;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.BlockingQueue;
+import facebook4j.FacebookException;
+import facebook4j.Paging;
+import facebook4j.Post;
+import facebook4j.Reading;
+import facebook4j.ResponseList;
+import facebook4j.json.DataObjectFactory;
+
/**
- * Collects the page feed data from public Facebook pages
+ * Collects the page feed data from public Facebook pages.
*/
public class FacebookPageFeedDataCollector extends FacebookDataCollector {
- private static final Logger LOGGER = LoggerFactory.getLogger(FacebookPageFeedDataCollector.class);
- private static final int MAX_ATTEMPTS = 5;
- private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
- private static final int LIMIT = 100;
+ private static final Logger LOGGER = LoggerFactory.getLogger(FacebookPageFeedDataCollector.class);
+ private static final int MAX_ATTEMPTS = 5;
+ private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+ private static final int LIMIT = 100;
- public FacebookPageFeedDataCollector(BlockingQueue<StreamsDatum> queue, FacebookConfiguration configuration) {
- super(configuration, queue);
- }
+ public FacebookPageFeedDataCollector(BlockingQueue<StreamsDatum> queue, FacebookConfiguration configuration) {
+ super(configuration, queue);
+ }
+
+ @Override
+ protected void getData(IdConfig id) throws Exception {
+ boolean exit = false;
- @Override
- protected void getData(IdConfig id) throws Exception {
- boolean exit = false;
+ ResponseList<Post> facebookPosts = getPosts(id.getId());
+ LOGGER.debug("Post received : {}", facebookPosts.size());
+ backOff.reset();
+ do {
+ for (Post post : facebookPosts) {
+ if (id.getBeforeDate() != null && id.getAfterDate() != null) {
+ if (id.getBeforeDate().isAfter(post.getCreatedTime().getTime())
+ && id.getAfterDate().isBefore(post.getCreatedTime().getTime())) {
+ super.outputData(MAPPER.readValue(DataObjectFactory.getRawJSON(post), org.apache.streams.facebook.Post.class), post.getId());
- ResponseList<Post> facebookPosts = getPosts(id.getId());
- LOGGER.debug("Post received : {}", facebookPosts.size());
+ }
+ } else if (id.getBeforeDate() != null && id.getBeforeDate().isAfter(post.getCreatedTime().getTime())) {
+ super.outputData(MAPPER.readValue(DataObjectFactory.getRawJSON(post), org.apache.streams.facebook.Post.class), post.getId());
+ } else if (id.getAfterDate() != null && id.getAfterDate().isBefore(post.getCreatedTime().getTime())) {
+ super.outputData(MAPPER.readValue(DataObjectFactory.getRawJSON(post), org.apache.streams.facebook.Post.class), post.getId());
+ } else if (id.getBeforeDate() == null && id.getAfterDate() == null) {
+ super.outputData(MAPPER.readValue(DataObjectFactory.getRawJSON(post), org.apache.streams.facebook.Post.class), post.getId());
+ } else {
+ exit = true;
+ LOGGER.debug("Breaking on post, {}, with createdAtDate {}", post.getId(), post.getCreatedTime());
+ break;
+ }
+ }
+ if (facebookPosts.getPaging() != null && !exit) {
+ LOGGER.debug("Paging. . .");
+ facebookPosts = getPosts(facebookPosts.getPaging());
backOff.reset();
- do {
- for(Post post : facebookPosts) {
- if(id.getBeforeDate() != null && id.getAfterDate() != null) {
- if(id.getBeforeDate().isAfter(post.getCreatedTime().getTime())
- && id.getAfterDate().isBefore(post.getCreatedTime().getTime())) {
- super.outputData(MAPPER.readValue(DataObjectFactory.getRawJSON(post), org.apache.streams.facebook.Post.class), post.getId());
-
- }
- } else if(id.getBeforeDate() != null && id.getBeforeDate().isAfter(post.getCreatedTime().getTime())) {
- super.outputData(MAPPER.readValue(DataObjectFactory.getRawJSON(post), org.apache.streams.facebook.Post.class), post.getId());
- } else if(id.getAfterDate() != null && id.getAfterDate().isBefore(post.getCreatedTime().getTime())) {
- super.outputData(MAPPER.readValue(DataObjectFactory.getRawJSON(post), org.apache.streams.facebook.Post.class), post.getId());
- } else if(id.getBeforeDate() == null && id.getAfterDate() == null) {
- super.outputData(MAPPER.readValue(DataObjectFactory.getRawJSON(post), org.apache.streams.facebook.Post.class), post.getId());
- } else {
- exit = true;
- LOGGER.debug("Breaking on post, {}, with createdAtDate {}", post.getId(), post.getCreatedTime());
- break;
- }
- }
- if(facebookPosts.getPaging() != null && !exit) {
- LOGGER.debug("Paging. . .");
- facebookPosts = getPosts(facebookPosts.getPaging());
- backOff.reset();
- LOGGER.debug("Paging received {} posts*", facebookPosts.size());
- } else {
- LOGGER.debug("No more paging.");
- facebookPosts = null;
- }
- } while(facebookPosts != null && facebookPosts.size() != 0);
+ LOGGER.debug("Paging received {} posts*", facebookPosts.size());
+ } else {
+ LOGGER.debug("No more paging.");
+ facebookPosts = null;
+ }
+ }
+ while (facebookPosts != null && facebookPosts.size() != 0);
+ }
- }
+ private ResponseList<Post> getPosts(Paging<Post> paging) throws Exception {
+ return getPosts(null, paging);
+ }
- private ResponseList<Post> getPosts(Paging<Post> paging) throws Exception{
- return getPosts(null, paging);
- }
+ private ResponseList<Post> getPosts(String pageId) throws Exception {
+ return getPosts(pageId, null);
+ }
- private ResponseList<Post> getPosts(String pageId) throws Exception {
- return getPosts(pageId, null);
- }
+ /**
+ * Queries facebook. Attempts requests up to 5 times and backs off on each facebook exception.
+ * @param pageId pageId
+ * @param paging paging
+ * @return ResponseList of $link{facebook4j.Post}
+ * @throws Exception Exception
+ */
+ private ResponseList<Post> getPosts(String pageId, Paging<Post> paging) throws Exception {
+ int attempt = 0;
+ while (attempt < MAX_ATTEMPTS) {
+ ++attempt;
+ try {
+ if (pageId != null) {
+ Reading reading = new Reading();
+ reading.limit(LIMIT);
+ return getNextFacebookClient().getPosts(pageId, reading);
+ } else {
+ return getNextFacebookClient().fetchNext(paging);
+ }
+ } catch (FacebookException fe) {
+ LOGGER.error("Facebook returned an exception : {}", fe);
+ LOGGER.error("Facebook returned an exception while trying to get feed for page, {} : {}", pageId, fe.getMessage());
+ //TODO Rate limit exceptions with facebook4j unclear http://facebook4j.org/oldjavadocs/1.1.12-2.0.0/2.0.0/index.html?facebook4j/internal/http/HttpResponseCode.html
+ // back off at all exceptions until figured out.
+ int errorCode = fe.getErrorCode();
- /**
- * Queries facebook. Attempts requests up to 5 times and backs off on each facebook exception.
- * @param pageId
- * @param paging
- * @return
- * @throws Exception
- */
- private ResponseList<Post> getPosts(String pageId, Paging<Post> paging) throws Exception {
- int attempt = 0;
- while(attempt < MAX_ATTEMPTS) {
- ++attempt;
- try {
- if (pageId != null) {
- Reading reading = new Reading();
- reading.limit(LIMIT);
- return getNextFacebookClient().getPosts(pageId, reading);
- }
- else
- return getNextFacebookClient().fetchNext(paging);
- } catch (FacebookException fe) {
- LOGGER.error("Facebook returned an exception : {}", fe);
- LOGGER.error("Facebook returned an exception while trying to get feed for page, {} : {}", pageId, fe.getMessage());
- //TODO Rate limit exceptions with facebook4j unclear http://facebook4j.org/oldjavadocs/1.1.12-2.0.0/2.0.0/index.html?facebook4j/internal/http/HttpResponseCode.html
- // back off at all exceptions until figured out.
- int errorCode = fe.getErrorCode();
-
- //Some sort of rate limiting
- if(errorCode == 17 || errorCode == 4 || errorCode == 341) {
- super.backOff.backOff();
- }
- }
+ //Some sort of rate limiting
+ if (errorCode == 17 || errorCode == 4 || errorCode == 341) {
+ super.backOff.backOff();
}
- throw new Exception("Failed to get data from facebook after "+MAX_ATTEMPTS);
+ }
}
+ throw new Exception("Failed to get data from facebook after " + MAX_ATTEMPTS);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookPageFeedProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookPageFeedProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookPageFeedProvider.java
index 308b129..5d977e0 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookPageFeedProvider.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookPageFeedProvider.java
@@ -15,15 +15,9 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.streams.facebook.provider.pagefeed;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.Uninterruptibles;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigParseOptions;
import org.apache.streams.config.ComponentConfigurator;
import org.apache.streams.config.StreamsConfiguration;
import org.apache.streams.config.StreamsConfigurator;
@@ -33,6 +27,15 @@ import org.apache.streams.facebook.provider.FacebookDataCollector;
import org.apache.streams.facebook.provider.FacebookProvider;
import org.apache.streams.facebook.provider.page.FacebookPageProvider;
import org.apache.streams.jackson.StreamsJacksonMapper;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Uninterruptibles;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,65 +47,71 @@ import java.util.Iterator;
import java.util.concurrent.TimeUnit;
/**
- *
+ * FacebookPageFeedProvider provides content from facebook public page.
*/
public class FacebookPageFeedProvider extends FacebookProvider {
- public static final String STREAMS_ID = "FacebookPageFeedProvider";
-
- private static final Logger LOGGER = LoggerFactory.getLogger(FacebookPageProvider.class);
-
- private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
-
- public FacebookPageFeedProvider() {
- super();
- }
-
- public FacebookPageFeedProvider(FacebookConfiguration config) {
- super(config);
- }
-
- @Override
- protected FacebookDataCollector getDataCollector() {
- return new FacebookPageFeedDataCollector(super.datums, super.configuration);
- }
-
- public static void main(String[] args) throws Exception {
-
- Preconditions.checkArgument(args.length >= 2);
-
- String configfile = args[0];
- String outfile = args[1];
-
- Config reference = ConfigFactory.load();
- File conf_file = new File(configfile);
- assert(conf_file.exists());
- Config conf = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
-
- Config typesafe = conf.withFallback(reference).resolve();
-
- StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe);
- FacebookConfiguration config = new ComponentConfigurator<>(FacebookConfiguration.class).detectConfiguration(typesafe, "facebook");
- FacebookPageFeedProvider provider = new FacebookPageFeedProvider(config);
-
- PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
- provider.prepare(config);
- provider.startStream();
- do {
- Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
- Iterator<StreamsDatum> iterator = provider.readCurrent().iterator();
- while(iterator.hasNext()) {
- StreamsDatum datum = iterator.next();
- String json;
- try {
- json = MAPPER.writeValueAsString(datum.getDocument());
- outStream.println(json);
- } catch (JsonProcessingException e) {
- System.err.println(e.getMessage());
- }
- }
- } while( provider.isRunning());
- provider.cleanUp();
- outStream.flush();
+ public static final String STREAMS_ID = "FacebookPageFeedProvider";
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(FacebookPageProvider.class);
+
+ private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+ public FacebookPageFeedProvider() {
+ super();
+ }
+
+ public FacebookPageFeedProvider(FacebookConfiguration config) {
+ super(config);
+ }
+
+ @Override
+ protected FacebookDataCollector getDataCollector() {
+ return new FacebookPageFeedDataCollector(super.datums, super.configuration);
+ }
+
+ /**
+ * Run FacebookPageFeedProvider from command line.
+ * @param args configfile outfile
+ * @throws Exception Exception
+ */
+ public static void main(String[] args) throws Exception {
+
+ Preconditions.checkArgument(args.length >= 2);
+
+ String configfile = args[0];
+ String outfile = args[1];
+
+ Config reference = ConfigFactory.load();
+ File confFile = new File(configfile);
+ assert (confFile.exists());
+ Config conf = ConfigFactory.parseFileAnySyntax(confFile, ConfigParseOptions.defaults().setAllowMissing(false));
+
+ Config typesafe = conf.withFallback(reference).resolve();
+
+ StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe);
+ FacebookConfiguration config = new ComponentConfigurator<>(FacebookConfiguration.class).detectConfiguration(typesafe, "facebook");
+ FacebookPageFeedProvider provider = new FacebookPageFeedProvider(config);
+
+ PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
+ provider.prepare(config);
+ provider.startStream();
+ do {
+ Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
+ Iterator<StreamsDatum> iterator = provider.readCurrent().iterator();
+ while (iterator.hasNext()) {
+ StreamsDatum datum = iterator.next();
+ String json;
+ try {
+ json = MAPPER.writeValueAsString(datum.getDocument());
+ outStream.println(json);
+ } catch (JsonProcessingException ex) {
+ System.err.println(ex.getMessage());
+ }
+ }
}
+ while ( provider.isRunning());
+ provider.cleanUp();
+ outStream.flush();
+ }
}