You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/11/25 20:25:01 UTC
[20/42] incubator-streams git commit: STREAMS-440: custom
checkstyle.xml, address compliance
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
index 66c1104..2527d29 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
@@ -18,28 +18,26 @@
package org.apache.streams.twitter.provider;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.DatumStatusCounter;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.twitter.TwitterFollowingConfiguration;
+import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
+
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
-import com.google.common.collect.Queues;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigParseOptions;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfiguration;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.DatumStatusCounter;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsResultSet;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.twitter.TwitterFollowingConfiguration;
-import org.apache.streams.twitter.TwitterStreamConfiguration;
-import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
-import org.apache.streams.util.ComponentUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import twitter4j.Twitter;
@@ -51,162 +49,184 @@ import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
- * Created by sblackmon on 11/25/14.
+ * Retrieve all follow adjacencies from a list of user ids or names.
*/
public class TwitterFollowingProvider extends TwitterUserInformationProvider {
- public static final String STREAMS_ID = "TwitterFollowingProvider";
- private static final Logger LOGGER = LoggerFactory.getLogger(TwitterFollowingProvider.class);
-
- protected final ReadWriteLock lock = new ReentrantReadWriteLock();
-
- private TwitterFollowingConfiguration config;
-
- List<ListenableFuture<Object>> futures = new ArrayList<>();
-
- public static void main(String[] args) throws Exception {
+ public static final String STREAMS_ID = "TwitterFollowingProvider";
+ private static final Logger LOGGER = LoggerFactory.getLogger(TwitterFollowingProvider.class);
+
+ protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ private TwitterFollowingConfiguration config;
+
+ List<ListenableFuture<Object>> futures = new ArrayList<>();
+
+ /**
+ * To use from command line:
+ *
+ * <p/>
+ * Supply (at least) the following required configuration in application.conf:
+ *
+ * <p/>
+ * twitter.oauth.consumerKey
+ * twitter.oauth.consumerSecret
+ * twitter.oauth.accessToken
+ * twitter.oauth.accessTokenSecret
+ * twitter.info
+ *
+ * <p/>
+ * Launch using:
+ *
+ * <p/>
+ * mvn exec:java -Dexec.mainClass=org.apache.streams.twitter.provider.TwitterFollowingProvider -Dexec.args="application.conf tweets.json"
+ *
+ * @param args args
+ * @throws Exception Exception
+ */
+ public static void main(String[] args) throws Exception {
+
+ Preconditions.checkArgument(args.length >= 2);
+
+ String configfile = args[0];
+ String outfile = args[1];
+
+ Config reference = ConfigFactory.load();
+ File file = new File(configfile);
+ assert (file.exists());
+ Config testResourceConfig = ConfigFactory.parseFileAnySyntax(file, ConfigParseOptions.defaults().setAllowMissing(false));
+
+ Config typesafe = testResourceConfig.withFallback(reference).resolve();
+
+ StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe);
+ TwitterFollowingConfiguration config = new ComponentConfigurator<>(TwitterFollowingConfiguration.class).detectConfiguration(typesafe, "twitter");
+ TwitterFollowingProvider provider = new TwitterFollowingProvider(config);
+
+ ObjectMapper mapper = new StreamsJacksonMapper(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT));
+
+ PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
+ provider.prepare(config);
+ provider.startStream();
+ do {
+ Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
+ Iterator<StreamsDatum> iterator = provider.readCurrent().iterator();
+ while (iterator.hasNext()) {
+ StreamsDatum datum = iterator.next();
+ String json;
+ try {
+ json = mapper.writeValueAsString(datum.getDocument());
+ outStream.println(json);
+ } catch (JsonProcessingException ex) {
+ System.err.println(ex.getMessage());
+ }
+ }
+ }
+ while ( provider.isRunning());
+ provider.cleanUp();
+ outStream.flush();
+ }
- Preconditions.checkArgument(args.length >= 2);
+ public TwitterFollowingConfiguration getConfig() {
+ return config;
+ }
- String configfile = args[0];
- String outfile = args[1];
+ public static final int MAX_NUMBER_WAITING = 10000;
- Config reference = ConfigFactory.load();
- File conf_file = new File(configfile);
- assert(conf_file.exists());
- Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+ public TwitterFollowingProvider() {
+ this.config = new ComponentConfigurator<>(TwitterFollowingConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig().getConfig("twitter"));
+ }
- Config typesafe = testResourceConfig.withFallback(reference).resolve();
+ public TwitterFollowingProvider(TwitterFollowingConfiguration config) {
+ super(config);
+ this.config = config;
+ }
- StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe);
- TwitterFollowingConfiguration config = new ComponentConfigurator<>(TwitterFollowingConfiguration.class).detectConfiguration(typesafe, "twitter");
- TwitterFollowingProvider provider = new TwitterFollowingProvider(config);
+ @Override
+ public void prepare(Object configurationObject) {
+ super.prepare(config);
+ Preconditions.checkNotNull(getConfig().getEndpoint());
+ Preconditions.checkArgument(getConfig().getEndpoint().equals("friends") || getConfig().getEndpoint().equals("followers"));
+ return;
+ }
- ObjectMapper mapper = new StreamsJacksonMapper(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT));
+ @Override
+ public void startStream() {
- PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
- provider.prepare(config);
- provider.startStream();
- do {
- Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
- Iterator<StreamsDatum> iterator = provider.readCurrent().iterator();
- while(iterator.hasNext()) {
- StreamsDatum datum = iterator.next();
- String json;
- try {
- json = mapper.writeValueAsString(datum.getDocument());
- outStream.println(json);
- } catch (JsonProcessingException e) {
- System.err.println(e.getMessage());
- }
- }
- } while( provider.isRunning());
- provider.cleanUp();
- outStream.flush();
- }
+ Preconditions.checkNotNull(executor);
- public TwitterFollowingConfiguration getConfig() { return config; }
+ Preconditions.checkArgument(idsBatches.hasNext() || screenNameBatches.hasNext());
- public static final int MAX_NUMBER_WAITING = 10000;
+ LOGGER.info("startStream");
- public TwitterFollowingProvider() {
- this.config = new ComponentConfigurator<>(TwitterFollowingConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig().getConfig("twitter"));
- }
+ running.set(true);
- public TwitterFollowingProvider(TwitterFollowingConfiguration config) {
- super(config);
- this.config = config;
+ while (idsBatches.hasNext()) {
+ submitFollowingThreads(idsBatches.next());
}
-
- @Override
- public void prepare(Object o) {
- super.prepare(config);
- Preconditions.checkNotNull(getConfig().getEndpoint());
- Preconditions.checkArgument(getConfig().getEndpoint().equals("friends") || getConfig().getEndpoint().equals("followers"));
- return;
+ while (screenNameBatches.hasNext()) {
+ submitFollowingThreads(screenNameBatches.next());
}
- @Override
- public void startStream() {
+ executor.shutdown();
- Preconditions.checkNotNull(executor);
-
- Preconditions.checkArgument(idsBatches.hasNext() || screenNameBatches.hasNext());
-
- LOGGER.info("startStream");
-
- running.set(true);
-
- while (idsBatches.hasNext()) {
- submitFollowingThreads(idsBatches.next());
- }
- while (screenNameBatches.hasNext()) {
- submitFollowingThreads(screenNameBatches.next());
- }
+ }
- executor.shutdown();
+ protected void submitFollowingThreads(Long[] ids) {
+ Twitter client = getTwitterClient();
+ for (int i = 0; i < ids.length; i++) {
+ TwitterFollowingProviderTask providerTask = new TwitterFollowingProviderTask(this, client, ids[i]);
+ ListenableFuture future = executor.submit(providerTask);
+ futures.add(future);
+ LOGGER.info("submitted {}", ids[i]);
}
+ }
- protected void submitFollowingThreads(Long[] ids) {
- Twitter client = getTwitterClient();
+ protected void submitFollowingThreads(String[] screenNames) {
+ Twitter client = getTwitterClient();
- for (int i = 0; i < ids.length; i++) {
- TwitterFollowingProviderTask providerTask = new TwitterFollowingProviderTask(this, client, ids[i]);
- ListenableFuture future = executor.submit(providerTask);
- futures.add(future);
- LOGGER.info("submitted {}", ids[i]);
- }
+ for (int i = 0; i < screenNames.length; i++) {
+ TwitterFollowingProviderTask providerTask = new TwitterFollowingProviderTask(this, client, screenNames[i]);
+ ListenableFuture future = executor.submit(providerTask);
+ futures.add(future);
+ LOGGER.info("submitted {}", screenNames[i]);
}
- protected void submitFollowingThreads(String[] screenNames) {
- Twitter client = getTwitterClient();
-
- for (int i = 0; i < screenNames.length; i++) {
- TwitterFollowingProviderTask providerTask = new TwitterFollowingProviderTask(this, client, screenNames[i]);
- ListenableFuture future = executor.submit(providerTask);
- futures.add(future);
- LOGGER.info("submitted {}", screenNames[i]);
- }
-
- }
+ }
- @Override
- public StreamsResultSet readCurrent() {
+ @Override
+ public StreamsResultSet readCurrent() {
- LOGGER.info("{}{} - readCurrent", idsBatches, screenNameBatches);
+ LOGGER.info("{}{} - readCurrent", idsBatches, screenNameBatches);
- StreamsResultSet result;
+ StreamsResultSet result;
- try {
- lock.writeLock().lock();
- result = new StreamsResultSet(providerQueue);
- result.setCounter(new DatumStatusCounter());
- providerQueue = constructQueue();
- LOGGER.debug("{}{} - providing {} docs", idsBatches, screenNameBatches, result.size());
- } finally {
- lock.writeLock().unlock();
- }
+ try {
+ lock.writeLock().lock();
+ result = new StreamsResultSet(providerQueue);
+ result.setCounter(new DatumStatusCounter());
+ providerQueue = constructQueue();
+ LOGGER.debug("{}{} - providing {} docs", idsBatches, screenNameBatches, result.size());
+ } finally {
+ lock.writeLock().unlock();
+ }
- return result;
+ return result;
- }
+ }
- @Override
- public boolean isRunning() {
- if (providerQueue.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone()) {
- LOGGER.info("Completed");
- running.set(false);
- LOGGER.info("Exiting");
- }
- return running.get();
+ @Override
+ public boolean isRunning() {
+ if (providerQueue.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone()) {
+ LOGGER.info("Completed");
+ running.set(false);
+ LOGGER.info("Exiting");
}
+ return running.get();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
index f2346fb..ee800fa 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
@@ -18,13 +18,14 @@
package org.apache.streams.twitter.provider;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Preconditions;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.twitter.pojo.Follow;
import org.apache.streams.twitter.pojo.User;
import org.apache.streams.util.ComponentUtils;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import twitter4j.PagableResponseList;
@@ -37,188 +38,208 @@ import twitter4j.TwitterObjectFactory;
*/
public class TwitterFollowingProviderTask implements Runnable {
- private final static Logger LOGGER = LoggerFactory.getLogger(TwitterFollowingProviderTask.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(TwitterFollowingProviderTask.class);
+
+ private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+ protected TwitterFollowingProvider provider;
+ protected Twitter client;
+ protected Long id;
+ protected String screenName;
+
+ int count = 0;
+
+ /**
+ * TwitterFollowingProviderTask constructor.
+ * @param provider TwitterFollowingProvider
+ * @param twitter Twitter
+ * @param id numeric id
+ */
+ public TwitterFollowingProviderTask(TwitterFollowingProvider provider, Twitter twitter, Long id) {
+ this.provider = provider;
+ this.client = twitter;
+ this.id = id;
+ }
+
+ /**
+ * TwitterFollowingProviderTask constructor.
+ * @param provider TwitterFollowingProvider
+ * @param twitter Twitter
+ * @param screenName screenName
+ */
+ public TwitterFollowingProviderTask(TwitterFollowingProvider provider, Twitter twitter, String screenName) {
+ this.provider = provider;
+ this.client = twitter;
+ this.screenName = screenName;
+ }
+
+
+ @Override
+ public void run() {
+
+ Preconditions.checkArgument(id != null || screenName != null);
+
+ if ( id != null ) {
+ getFollowing(id);
+ } else if ( screenName != null) {
+ getFollowing(screenName);
+ }
- private final static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+ LOGGER.info(id != null ? id.toString() : screenName + " Thread Finished");
- protected TwitterFollowingProvider provider;
- protected Twitter client;
- protected Long id;
- protected String screenName;
+ }
- int count = 0;
+ protected void getFollowing(Long id) {
- public TwitterFollowingProviderTask(TwitterFollowingProvider provider, Twitter twitter, Long id) {
- this.provider = provider;
- this.client = twitter;
- this.id = id;
- }
+ Preconditions.checkArgument(
+ provider.getConfig().getEndpoint().equals("friends")
+ || provider.getConfig().getEndpoint().equals("followers")
+ );
- public TwitterFollowingProviderTask(TwitterFollowingProvider provider, Twitter twitter, String screenName) {
- this.provider = provider;
- this.client = twitter;
- this.screenName = screenName;
+ if ( provider.getConfig().getIdsOnly() ) {
+ collectIds(id);
+ } else {
+ collectUsers(id);
}
+ }
+ protected void getFollowing(String screenName) {
- @Override
- public void run() {
+ twitter4j.User user = null;
+ try {
+ user = client.users().showUser(screenName);
+ } catch (TwitterException ex) {
+ LOGGER.error("Failure looking up " + id);
+ }
+ Preconditions.checkNotNull(user);
+ getFollowing(user.getId());
+ }
- Preconditions.checkArgument(id != null || screenName != null);
+ private void collectUsers(Long id) {
+ int keepTrying = 0;
- if( id != null )
- getFollowing(id);
- else if( screenName != null)
- getFollowing(screenName);
+ long curser = -1;
- LOGGER.info(id != null ? id.toString() : screenName + " Thread Finished");
+ do {
+ try {
+ twitter4j.User user;
+ String userJson;
+ try {
+ user = client.users().showUser(id);
+ userJson = TwitterObjectFactory.getRawJSON(user);
+ } catch (TwitterException ex) {
+ LOGGER.error("Failure looking up " + id);
+ break;
+ }
- }
+ PagableResponseList<twitter4j.User> list = null;
+ if ( provider.getConfig().getEndpoint().equals("followers") ) {
+ list = client.friendsFollowers().getFollowersList(id.longValue(), curser, provider.getConfig().getMaxItems().intValue());
+ } else if ( provider.getConfig().getEndpoint().equals("friends") ) {
+ list = client.friendsFollowers().getFriendsList(id.longValue(), curser, provider.getConfig().getMaxItems().intValue());
+ }
- protected void getFollowing(Long id) {
+ Preconditions.checkNotNull(list);
+ Preconditions.checkArgument(list.size() > 0);
- Preconditions.checkArgument(provider.getConfig().getEndpoint().equals("friends") || provider.getConfig().getEndpoint().equals("followers"));
+ for (twitter4j.User other : list) {
- if( provider.getConfig().getIdsOnly() )
- collectIds(id);
- else
- collectUsers(id);
- }
+ String otherJson = TwitterObjectFactory.getRawJSON(other);
- private void collectUsers(Long id) {
- int keepTrying = 0;
-
- long curser = -1;
-
- do
- {
- try
- {
- twitter4j.User user;
- String userJson;
- try {
- user = client.users().showUser(id);
- userJson = TwitterObjectFactory.getRawJSON(user);
- } catch (TwitterException e) {
- LOGGER.error("Failure looking up " + id);
- break;
- }
-
- PagableResponseList<twitter4j.User> list = null;
- if( provider.getConfig().getEndpoint().equals("followers") )
- list = client.friendsFollowers().getFollowersList(id.longValue(), curser, provider.getConfig().getMaxItems().intValue());
- else if( provider.getConfig().getEndpoint().equals("friends") )
- list = client.friendsFollowers().getFriendsList(id.longValue(), curser, provider.getConfig().getMaxItems().intValue());
-
- Preconditions.checkNotNull(list);
- Preconditions.checkArgument(list.size() > 0);
-
- for (twitter4j.User other : list) {
-
- String otherJson = TwitterObjectFactory.getRawJSON(other);
-
- try {
- Follow follow = null;
- if( provider.getConfig().getEndpoint().equals("followers") ) {
- follow = new Follow()
- .withFollowee(mapper.readValue(userJson, User.class))
- .withFollower(mapper.readValue(otherJson, User.class));
- } else if( provider.getConfig().getEndpoint().equals("friends") ) {
- follow = new Follow()
- .withFollowee(mapper.readValue(otherJson, User.class))
- .withFollower(mapper.readValue(userJson, User.class));
- }
-
- Preconditions.checkNotNull(follow);
-
- if( count < provider.getConfig().getMaxItems()) {
- ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue);
- count++;
- }
-
- } catch (Exception e) {
- LOGGER.warn("Exception: {}", e);
- }
- }
- if( !list.hasNext() ) break;
- if( list.getNextCursor() == 0 ) break;
- curser = list.getNextCursor();
- }
- catch(TwitterException twitterException) {
- keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
+ try {
+ Follow follow = null;
+ if ( provider.getConfig().getEndpoint().equals("followers") ) {
+ follow = new Follow()
+ .withFollowee(mapper.readValue(userJson, User.class))
+ .withFollower(mapper.readValue(otherJson, User.class));
+ } else if ( provider.getConfig().getEndpoint().equals("friends") ) {
+ follow = new Follow()
+ .withFollowee(mapper.readValue(otherJson, User.class))
+ .withFollower(mapper.readValue(userJson, User.class));
}
- catch(Exception e) {
- keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
+
+ Preconditions.checkNotNull(follow);
+
+ if ( count < provider.getConfig().getMaxItems()) {
+ ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue);
+ count++;
}
- } while (curser != 0 && keepTrying < provider.getConfig().getRetryMax() && count < provider.getConfig().getMaxItems());
+
+ } catch (Exception ex) {
+ LOGGER.warn("Exception: {}", ex);
+ }
+ }
+ if ( !list.hasNext() ) {
+ break;
+ }
+ if ( list.getNextCursor() == 0 ) {
+ break;
+ }
+ curser = list.getNextCursor();
+ } catch (TwitterException twitterException) {
+ keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
+ } catch (Exception ex) {
+ keepTrying += TwitterErrorHandler.handleTwitterError(client, ex);
+ }
}
+ while (curser != 0 && keepTrying < provider.getConfig().getRetryMax() && count < provider.getConfig().getMaxItems());
+ }
- private void collectIds(Long id) {
- int keepTrying = 0;
-
- long curser = -1;
-
- do
- {
- try
- {
- twitter4j.IDs ids = null;
- if( provider.getConfig().getEndpoint().equals("followers") )
- ids = client.friendsFollowers().getFollowersIDs(id.longValue(), curser, provider.getConfig().getMaxItems().intValue());
- else if( provider.getConfig().getEndpoint().equals("friends") )
- ids = client.friendsFollowers().getFriendsIDs(id.longValue(), curser, provider.getConfig().getMaxItems().intValue());
-
- Preconditions.checkNotNull(ids);
- Preconditions.checkArgument(ids.getIDs().length > 0);
-
- for (long otherId : ids.getIDs()) {
-
- try {
- Follow follow = null;
- if( provider.getConfig().getEndpoint().equals("followers") ) {
- follow = new Follow()
- .withFollowee(new User().withId(id))
- .withFollower(new User().withId(otherId));
- } else if( provider.getConfig().getEndpoint().equals("friends") ) {
- follow = new Follow()
- .withFollowee(new User().withId(otherId))
- .withFollower(new User().withId(id));
- }
-
- Preconditions.checkNotNull(follow);
-
- if( count < provider.getConfig().getMaxItems()) {
- ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue);
- count++;
- }
- } catch (Exception e) {
- LOGGER.warn("Exception: {}", e);
- }
- }
- if( !ids.hasNext() ) break;
- if( ids.getNextCursor() == 0 ) break;
- curser = ids.getNextCursor();
- }
- catch(TwitterException twitterException) {
- keepTrying += TwitterErrorHandler.handleTwitterError(client, id, twitterException);
- }
- catch(Exception e) {
- keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
+ private void collectIds(Long id) {
+ int keepTrying = 0;
+
+ long curser = -1;
+
+ do {
+ try {
+ twitter4j.IDs ids = null;
+ if ( provider.getConfig().getEndpoint().equals("followers") ) {
+ ids = client.friendsFollowers().getFollowersIDs(id.longValue(), curser, provider.getConfig().getMaxItems().intValue());
+ } else if ( provider.getConfig().getEndpoint().equals("friends") ) {
+ ids = client.friendsFollowers().getFriendsIDs(id.longValue(), curser, provider.getConfig().getMaxItems().intValue());
+ }
+
+ Preconditions.checkNotNull(ids);
+ Preconditions.checkArgument(ids.getIDs().length > 0);
+
+ for (long otherId : ids.getIDs()) {
+
+ try {
+ Follow follow = null;
+ if ( provider.getConfig().getEndpoint().equals("followers") ) {
+ follow = new Follow()
+ .withFollowee(new User().withId(id))
+ .withFollower(new User().withId(otherId));
+ } else if ( provider.getConfig().getEndpoint().equals("friends") ) {
+ follow = new Follow()
+ .withFollowee(new User().withId(otherId))
+ .withFollower(new User().withId(id));
}
- } while (curser != 0 && keepTrying < provider.getConfig().getRetryMax() && count < provider.getConfig().getMaxItems());
- }
- protected void getFollowing(String screenName) {
+ Preconditions.checkNotNull(follow);
- twitter4j.User user = null;
- try {
- user = client.users().showUser(screenName);
- } catch (TwitterException e) {
- LOGGER.error("Failure looking up " + id);
+ if ( count < provider.getConfig().getMaxItems()) {
+ ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue);
+ count++;
+ }
+ } catch (Exception ex) {
+ LOGGER.warn("Exception: {}", ex);
+ }
+ }
+ if ( !ids.hasNext() ) {
+ break;
}
- Preconditions.checkNotNull(user);
- getFollowing(user.getId());
+ if ( ids.getNextCursor() == 0 ) {
+ break;
+ }
+ curser = ids.getNextCursor();
+ } catch (TwitterException twitterException) {
+ keepTrying += TwitterErrorHandler.handleTwitterError(client, id, twitterException);
+ } catch (Exception ex) {
+ keepTrying += TwitterErrorHandler.handleTwitterError(client, ex);
+ }
}
-
+ while (curser != 0 && keepTrying < provider.getConfig().getRetryMax() && count < provider.getConfig().getMaxItems());
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterProviderUtil.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterProviderUtil.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterProviderUtil.java
index d9f4ec2..48666cb 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterProviderUtil.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterProviderUtil.java
@@ -16,28 +16,34 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.streams.twitter.provider;
import org.apache.streams.twitter.TwitterConfiguration;
/**
- * Created by sblackmon on 7/26/15.
+ * TwitterProviderUtil contains utilities for Twitter Providers.
*/
public class TwitterProviderUtil {
- public static String baseUrl(TwitterConfiguration config) {
+ /**
+ * baseUrl from TwitterConfiguration.
+ * @param config TwitterConfiguration
+ * @return baseUrl
+ */
+ public static String baseUrl(TwitterConfiguration config) {
- String baseUrl = new StringBuilder()
- .append(config.getProtocol())
- .append("://")
- .append(config.getHost())
- .append(":")
- .append(config.getPort())
- .append("/")
- .append(config.getVersion())
- .append("/")
- .toString();
+ String baseUrl = new StringBuilder()
+ .append(config.getProtocol())
+ .append("://")
+ .append(config.getHost())
+ .append(":")
+ .append(config.getPort())
+ .append("/")
+ .append(config.getVersion())
+ .append("/")
+ .toString();
- return baseUrl;
- }
+ return baseUrl;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamHelper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamHelper.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamHelper.java
new file mode 100644
index 0000000..a4562ef
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamHelper.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.provider;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.twitter.converter.TwitterDocumentClassifier;
+import org.apache.streams.util.ComponentUtils;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import com.twitter.hbc.core.processor.StringDelimitedProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * TwitterStreamHelper helps with hosebird twitter stream.
+ */
+public class TwitterStreamHelper extends StringDelimitedProcessor {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(TwitterStreamHelper.class);
+ private static final int DEFAULT_POOL_SIZE = 5;
+
+ private static final TwitterDocumentClassifier TWITTER_DOCUMENT_CLASSIFIER = new TwitterDocumentClassifier();
+
+ private final TwitterStreamProvider provider;
+ private final ExecutorService service;
+
+ public TwitterStreamHelper(TwitterStreamProvider provider) {
+ this(provider, DEFAULT_POOL_SIZE);
+ }
+
+ /**
+ * TwitterStreamHelper constructor.
+ * @param provider TwitterStreamProvider
+ * @param poolSize poolSize
+ */
+ public TwitterStreamHelper(TwitterStreamProvider provider, int poolSize) {
+ //We are only going to use the Hosebird processor to manage the extraction of the tweets from the Stream
+ super(null);
+ service = Executors.newFixedThreadPool(poolSize);
+ this.provider = provider;
+ }
+
+ @Override
+ public boolean process() throws IOException, InterruptedException {
+ String msg;
+ do {
+ msg = this.processNextMessage();
+ if (msg == null) {
+ Thread.sleep(10);
+ }
+ }
+ while (msg == null);
+
+ //Deserializing to an ObjectNode can take time. Parallelize the task to improve throughput
+ return provider.addDatum(service.submit(new StreamDeserializer(msg)));
+ }
+
+ public void cleanUp() {
+ ComponentUtils.shutdownExecutor(service, 1, 30);
+ }
+
+ protected static class StreamDeserializer implements Callable<List<StreamsDatum>> {
+
+ protected static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+ protected String item;
+
+ public StreamDeserializer(String item) {
+ this.item = item;
+ }
+
+ @Override
+ public List<StreamsDatum> call() throws Exception {
+ if (item != null) {
+ Class itemClass = TWITTER_DOCUMENT_CLASSIFIER.detectClasses(item).get(0);
+ Object document = mapper.readValue(item, itemClass);
+ StreamsDatum rawDatum = new StreamsDatum(document);
+ return Lists.newArrayList(rawDatum);
+ }
+ return new ArrayList<>();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProcessor.java
deleted file mode 100644
index 96df67b..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProcessor.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.provider;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Lists;
-import com.twitter.hbc.core.processor.StringDelimitedProcessor;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.util.ComponentUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-/**
- *
- */
-public class TwitterStreamProcessor extends StringDelimitedProcessor {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(TwitterStreamProcessor.class);
- private static final int DEFAULT_POOL_SIZE = 5;
-
- private final TwitterStreamProvider provider;
- private final ExecutorService service;
-
- public TwitterStreamProcessor(TwitterStreamProvider provider) {
- this(provider, DEFAULT_POOL_SIZE);
- }
-
- public TwitterStreamProcessor(TwitterStreamProvider provider, int poolSize) {
- //We are only going to use the Hosebird processor to manage the extraction of the tweets from the Stream
- super(null);
- service = Executors.newFixedThreadPool(poolSize);
- this.provider = provider;
- }
-
-
- @Override
- public boolean process() throws IOException, InterruptedException {
- String msg;
- do {
- msg = this.processNextMessage();
- if(msg == null) {
- Thread.sleep(10);
- }
- } while(msg == null);
-
- //Deserializing to an ObjectNode can take time. Parallelize the task to improve throughput
- return provider.addDatum(service.submit(new StreamDeserializer(msg)));
- }
-
- public void cleanUp() {
- ComponentUtils.shutdownExecutor(service, 1, 30);
- }
-
- protected static class StreamDeserializer implements Callable<List<StreamsDatum>> {
-
- protected static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-
- protected String item;
-
- public StreamDeserializer(String item) {
- this.item = item;
- }
-
- @Override
- public List<StreamsDatum> call() throws Exception {
- if(item != null) {
- Class itemClass = TwitterEventClassifier.detectClass(item);
- Object document = mapper.readValue(item, itemClass);
- StreamsDatum rawDatum = new StreamsDatum(document);
- return Lists.newArrayList(rawDatum);
- }
- return new ArrayList<>();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
index 3856935..1895ee2 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
@@ -18,6 +18,20 @@
package org.apache.streams.twitter.provider;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.DatumStatus;
+import org.apache.streams.core.DatumStatusCountable;
+import org.apache.streams.core.DatumStatusCounter;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.twitter.TwitterStreamConfiguration;
+import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
+import org.apache.streams.util.ComponentUtils;
+
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
@@ -41,19 +55,6 @@ import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigParseOptions;
import org.apache.commons.lang.NotImplementedException;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfiguration;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.DatumStatus;
-import org.apache.streams.core.DatumStatusCountable;
-import org.apache.streams.core.DatumStatusCounter;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProvider;
-import org.apache.streams.core.StreamsResultSet;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.twitter.TwitterStreamConfiguration;
-import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
-import org.apache.streams.util.ComponentUtils;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -84,269 +85,282 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/
public class TwitterStreamProvider implements StreamsProvider, Serializable, DatumStatusCountable {
- public final static String STREAMS_ID = "TwitterStreamProvider";
-
- private final static Logger LOGGER = LoggerFactory.getLogger(TwitterStreamProvider.class);
-
- public static void main(String[] args) {
-
- Preconditions.checkArgument(args.length >= 2);
-
- String configfile = args[0];
- String outfile = args[1];
-
- Config reference = ConfigFactory.load();
- File conf_file = new File(configfile);
- assert(conf_file.exists());
- Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
-
- Config typesafe = testResourceConfig.withFallback(reference).resolve();
-
- StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe);
- TwitterStreamConfiguration config = new ComponentConfigurator<>(TwitterStreamConfiguration.class).detectConfiguration(typesafe, "twitter");
- TwitterStreamProvider provider = new TwitterStreamProvider(config);
-
- ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT));
-
- PrintStream outStream = null;
+ public static final String STREAMS_ID = "TwitterStreamProvider";
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(TwitterStreamProvider.class);
+
+ /**
+ * To use from command line:
+ *
+ * <p/>
+ * Supply (at least) the following required configuration in application.conf:
+ *
+ * <p/>
+ * twitter.oauth.consumerKey
+ * twitter.oauth.consumerSecret
+ * twitter.oauth.accessToken
+ * twitter.oauth.accessTokenSecret
+ *
+ * <p/>
+ * Launch using:
+ *
+ * <p/>
+ * mvn exec:java -Dexec.mainClass=org.apache.streams.twitter.provider.TwitterStreamProvider -Dexec.args="application.conf tweets.json"
+ *
+ * @param args
+ */
+ public static void main(String[] args) {
+
+ Preconditions.checkArgument(args.length >= 2);
+
+ String configfile = args[0];
+ String outfile = args[1];
+
+ Config reference = ConfigFactory.load();
+ File file = new File(configfile);
+ assert (file.exists());
+ Config testResourceConfig = ConfigFactory.parseFileAnySyntax(file, ConfigParseOptions.defaults().setAllowMissing(false));
+
+ Config typesafe = testResourceConfig.withFallback(reference).resolve();
+
+ StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe);
+ TwitterStreamConfiguration config = new ComponentConfigurator<>(TwitterStreamConfiguration.class).detectConfiguration(typesafe, "twitter");
+ TwitterStreamProvider provider = new TwitterStreamProvider(config);
+
+ ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT));
+
+ PrintStream outStream = null;
+ try {
+ outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
+ } catch (FileNotFoundException ex) {
+ LOGGER.error("FileNotFoundException", ex);
+ return;
+ }
+ provider.prepare(config);
+ provider.startStream();
+ do {
+ Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
+ Iterator<StreamsDatum> iterator = provider.readCurrent().iterator();
+ while (iterator.hasNext()) {
+ StreamsDatum datum = iterator.next();
+ String json;
try {
- outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
- } catch (FileNotFoundException e) {
- LOGGER.error("FileNotFoundException", e);
- return;
+ json = mapper.writeValueAsString(datum.getDocument());
+ outStream.println(json);
+ } catch (JsonProcessingException ex) {
+ System.err.println(ex.getMessage());
}
- provider.prepare(config);
- provider.startStream();
- do {
- Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
- Iterator<StreamsDatum> iterator = provider.readCurrent().iterator();
- while(iterator.hasNext()) {
- StreamsDatum datum = iterator.next();
- String json;
- try {
- json = mapper.writeValueAsString(datum.getDocument());
- outStream.println(json);
- } catch (JsonProcessingException e) {
- System.err.println(e.getMessage());
- }
- }
- } while( provider.isRunning());
- provider.cleanUp();
- outStream.flush();
+ }
}
-
- public static final int MAX_BATCH = 1000;
-
- private TwitterStreamConfiguration config;
-
- public TwitterStreamConfiguration getConfig() {
- return config;
+ while ( provider.isRunning());
+ provider.cleanUp();
+ outStream.flush();
+ }
+
+ public static final int MAX_BATCH = 1000;
+
+ private TwitterStreamConfiguration config;
+
+ public TwitterStreamConfiguration getConfig() {
+ return config;
+ }
+
+ public void setConfig(TwitterStreamConfiguration config) {
+ this.config = config;
+ }
+
+ protected volatile Queue<Future<List<StreamsDatum>>> providerQueue;
+
+ protected Hosts hosebirdHosts;
+ protected Authentication auth;
+ protected StreamingEndpoint endpoint;
+ protected BasicClient client;
+ protected AtomicBoolean running = new AtomicBoolean(false);
+ protected TwitterStreamHelper processor = new TwitterStreamHelper(this);
+ private DatumStatusCounter countersCurrent = new DatumStatusCounter();
+ private DatumStatusCounter countersTotal = new DatumStatusCounter();
+
+ public TwitterStreamProvider() {
+ this.config = new ComponentConfigurator<>(TwitterStreamConfiguration.class).detectConfiguration(StreamsConfigurator.config, "twitter");
+ }
+
+ public TwitterStreamProvider(TwitterStreamConfiguration config) {
+ this.config = config;
+ }
+
+ @Override
+ public String getId() {
+ return STREAMS_ID;
+ }
+
+ @Override
+ public void startStream() {
+ client.connect();
+ running.set(true);
+ }
+
+ @Override
+ public synchronized StreamsResultSet readCurrent() {
+
+ StreamsResultSet current;
+ synchronized (this) {
+ Queue<StreamsDatum> drain = Queues.newLinkedBlockingDeque();
+ drainTo(drain);
+ current = new StreamsResultSet(drain);
+ current.setCounter(new DatumStatusCounter());
+ current.getCounter().add(countersCurrent);
+ countersTotal.add(countersCurrent);
+ countersCurrent = new DatumStatusCounter();
}
- public void setConfig(TwitterStreamConfiguration config) {
- this.config = config;
- }
+ return current;
+ }
- protected volatile Queue<Future<List<StreamsDatum>>> providerQueue;
-
- protected Hosts hosebirdHosts;
- protected Authentication auth;
- protected StreamingEndpoint endpoint;
- protected BasicClient client;
- protected AtomicBoolean running = new AtomicBoolean(false);
- protected TwitterStreamProcessor processor = new TwitterStreamProcessor(this);
- private DatumStatusCounter countersCurrent = new DatumStatusCounter();
- private DatumStatusCounter countersTotal = new DatumStatusCounter();
-
- private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
- return new ThreadPoolExecutor(nThreads, nThreads,
- 5000L, TimeUnit.MILLISECONDS,
- new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
- }
+ @Override
+ public StreamsResultSet readNew(BigInteger sequence) {
+ throw new NotImplementedException();
+ }
- public TwitterStreamProvider() {
- this.config = new ComponentConfigurator<>(TwitterStreamConfiguration.class).detectConfiguration(StreamsConfigurator.config, "twitter");
- }
+ @Override
+ public StreamsResultSet readRange(DateTime start, DateTime end) {
+ throw new NotImplementedException();
+ }
- public TwitterStreamProvider(TwitterStreamConfiguration config) {
- this.config = config;
- }
+ @Override
+ public boolean isRunning() {
+ return this.running.get() && !client.isDone();
+ }
- @Override
- public String getId() {
- return STREAMS_ID;
- }
+ @Override
+ public void prepare(Object configurationObject) {
- @Override
- public void startStream() {
- client.connect();
- running.set(true);
- }
+ Preconditions.checkNotNull(config.getEndpoint());
- @Override
- public synchronized StreamsResultSet readCurrent() {
-
- StreamsResultSet current;
- synchronized(this) {
- Queue<StreamsDatum> drain = Queues.newLinkedBlockingDeque();
- drainTo(drain);
- current = new StreamsResultSet(drain);
- current.setCounter(new DatumStatusCounter());
- current.getCounter().add(countersCurrent);
- countersTotal.add(countersCurrent);
- countersCurrent = new DatumStatusCounter();
- }
+ if (config.getEndpoint().equals("userstream") ) {
- return current;
- }
+ hosebirdHosts = new HttpHosts(Constants.USERSTREAM_HOST);
- @Override
- public StreamsResultSet readNew(BigInteger sequence) {
- throw new NotImplementedException();
- }
+ UserstreamEndpoint userstreamEndpoint = new UserstreamEndpoint();
+ userstreamEndpoint.withFollowings(true);
+ userstreamEndpoint.withUser(false);
+ userstreamEndpoint.allReplies(false);
+ endpoint = userstreamEndpoint;
+ } else if (config.getEndpoint().equals("sample") ) {
- @Override
- public StreamsResultSet readRange(DateTime start, DateTime end) {
- throw new NotImplementedException();
- }
-
- @Override
- public boolean isRunning() {
- return this.running.get() && !client.isDone();
- }
+ hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
- @Override
- public void prepare(Object o) {
+ boolean track = config.getTrack() != null && !config.getTrack().isEmpty();
+ boolean follow = config.getFollow() != null && !config.getFollow().isEmpty();
- Preconditions.checkNotNull(config.getEndpoint());
-
- if(config.getEndpoint().equals("userstream") ) {
-
- hosebirdHosts = new HttpHosts(Constants.USERSTREAM_HOST);
-
- UserstreamEndpoint userstreamEndpoint = new UserstreamEndpoint();
- userstreamEndpoint.withFollowings(true);
- userstreamEndpoint.withUser(false);
- userstreamEndpoint.allReplies(false);
- endpoint = userstreamEndpoint;
+ if ( track || follow ) {
+ LOGGER.debug("***\tPRESENT\t***");
+ StatusesFilterEndpoint statusesFilterEndpoint = new StatusesFilterEndpoint();
+ if ( track ) {
+ statusesFilterEndpoint.trackTerms(config.getTrack());
}
- else if(config.getEndpoint().equals("sample") ) {
-
- hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
-
- boolean track = config.getTrack() != null && !config.getTrack().isEmpty();
- boolean follow = config.getFollow() != null && !config.getFollow().isEmpty();
-
- if( track || follow ) {
- LOGGER.debug("***\tPRESENT\t***");
- StatusesFilterEndpoint statusesFilterEndpoint = new StatusesFilterEndpoint();
- if( track ) {
- statusesFilterEndpoint.trackTerms(config.getTrack());
- }
- if( follow ) {
- statusesFilterEndpoint.followings(config.getFollow());
- }
- this.endpoint = statusesFilterEndpoint;
- } else {
- endpoint = new StatusesSampleEndpoint();
- }
-
+ if ( follow ) {
+ statusesFilterEndpoint.followings(config.getFollow());
}
- else if( config.getEndpoint().endsWith("firehose")) {
- hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
- endpoint = new StatusesFirehoseEndpoint();
- } else {
- LOGGER.error("NO ENDPOINT RESOLVED");
- return;
- }
-
- if( config.getBasicauth() != null ) {
-
- Preconditions.checkNotNull(config.getBasicauth().getUsername());
- Preconditions.checkNotNull(config.getBasicauth().getPassword());
-
- auth = new BasicAuth(
- config.getBasicauth().getUsername(),
- config.getBasicauth().getPassword()
- );
-
- } else if( config.getOauth() != null ) {
-
- Preconditions.checkNotNull(config.getOauth().getConsumerKey());
- Preconditions.checkNotNull(config.getOauth().getConsumerSecret());
- Preconditions.checkNotNull(config.getOauth().getAccessToken());
- Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret());
-
- auth = new OAuth1(config.getOauth().getConsumerKey(),
- config.getOauth().getConsumerSecret(),
- config.getOauth().getAccessToken(),
- config.getOauth().getAccessTokenSecret());
+ this.endpoint = statusesFilterEndpoint;
+ } else {
+ endpoint = new StatusesSampleEndpoint();
+ }
+
+ } else if ( config.getEndpoint().endsWith("firehose")) {
+ hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
+ endpoint = new StatusesFirehoseEndpoint();
+ } else {
+ LOGGER.error("NO ENDPOINT RESOLVED");
+ return;
+ }
- } else {
- LOGGER.error("NO AUTH RESOLVED");
- return;
- }
+ if ( config.getBasicauth() != null ) {
- LOGGER.debug("host={}\tendpoint={}\taut={}", hosebirdHosts, endpoint, auth);
+ Preconditions.checkNotNull(config.getBasicauth().getUsername());
+ Preconditions.checkNotNull(config.getBasicauth().getPassword());
- providerQueue = new LinkedBlockingQueue<>(MAX_BATCH);
+ auth = new BasicAuth(
+ config.getBasicauth().getUsername(),
+ config.getBasicauth().getPassword()
+ );
- client = new ClientBuilder()
- .name("apache/streams/streams-contrib/streams-provider-twitter")
- .hosts(hosebirdHosts)
- .endpoint(endpoint)
- .authentication(auth)
- .connectionTimeout(1200000)
- .processor(processor)
- .build();
+ } else if ( config.getOauth() != null ) {
- }
+ Preconditions.checkNotNull(config.getOauth().getConsumerKey());
+ Preconditions.checkNotNull(config.getOauth().getConsumerSecret());
+ Preconditions.checkNotNull(config.getOauth().getAccessToken());
+ Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret());
- @Override
- public void cleanUp() {
- this.client.stop();
- this.processor.cleanUp();
- this.running.set(false);
- }
+ auth = new OAuth1(config.getOauth().getConsumerKey(),
+ config.getOauth().getConsumerSecret(),
+ config.getOauth().getAccessToken(),
+ config.getOauth().getAccessTokenSecret());
- @Override
- public DatumStatusCounter getDatumStatusCounter() {
- return countersTotal;
+ } else {
+ LOGGER.error("NO AUTH RESOLVED");
+ return;
}
- protected boolean addDatum(Future<List<StreamsDatum>> future) {
- try {
- ComponentUtils.offerUntilSuccess(future, providerQueue);
- countersCurrent.incrementStatus(DatumStatus.SUCCESS);
- return true;
- } catch (Exception e) {
- countersCurrent.incrementStatus(DatumStatus.FAIL);
- LOGGER.warn("Unable to enqueue item from Twitter stream");
- return false;
- }
+ LOGGER.debug("host={}\tendpoint={}\taut={}", hosebirdHosts, endpoint, auth);
+
+ providerQueue = new LinkedBlockingQueue<>(MAX_BATCH);
+
+ client = new ClientBuilder()
+ .name("apache/streams/streams-contrib/streams-provider-twitter")
+ .hosts(hosebirdHosts)
+ .endpoint(endpoint)
+ .authentication(auth)
+ .connectionTimeout(1200000)
+ .processor(processor)
+ .build();
+
+ }
+
+ @Override
+ public void cleanUp() {
+ this.client.stop();
+ this.processor.cleanUp();
+ this.running.set(false);
+ }
+
+ @Override
+ public DatumStatusCounter getDatumStatusCounter() {
+ return countersTotal;
+ }
+
+ protected boolean addDatum(Future<List<StreamsDatum>> future) {
+ try {
+ ComponentUtils.offerUntilSuccess(future, providerQueue);
+ countersCurrent.incrementStatus(DatumStatus.SUCCESS);
+ return true;
+ } catch (Exception ex) {
+ countersCurrent.incrementStatus(DatumStatus.FAIL);
+ LOGGER.warn("Unable to enqueue item from Twitter stream");
+ return false;
}
-
- protected void drainTo(Queue<StreamsDatum> drain) {
- int count = 0;
- while(!providerQueue.isEmpty() && count <= MAX_BATCH) {
- for(StreamsDatum datum : pollForDatum()) {
- ComponentUtils.offerUntilSuccess(datum, drain);
- count++;
- }
- }
+ }
+
+ protected void drainTo(Queue<StreamsDatum> drain) {
+ int count = 0;
+ while (!providerQueue.isEmpty() && count <= MAX_BATCH) {
+ for (StreamsDatum datum : pollForDatum()) {
+ ComponentUtils.offerUntilSuccess(datum, drain);
+ count++;
+ }
}
-
- protected List<StreamsDatum> pollForDatum() {
- try {
- return providerQueue.poll().get();
- } catch (InterruptedException e) {
- LOGGER.warn("Interrupted while waiting for future. Initiate shutdown.");
- this.cleanUp();
- Thread.currentThread().interrupt();
- return new ArrayList<>();
- } catch (ExecutionException e) {
- LOGGER.warn("Error getting tweet from future");
- return new ArrayList<>();
- }
+ }
+
+ protected List<StreamsDatum> pollForDatum() {
+ try {
+ return providerQueue.poll().get();
+ } catch (InterruptedException ex) {
+ LOGGER.warn("Interrupted while waiting for future. Initiate shutdown.");
+ this.cleanUp();
+ Thread.currentThread().interrupt();
+ return new ArrayList<>();
+ } catch (ExecutionException ex) {
+ LOGGER.warn("Error getting tweet from future");
+ return new ArrayList<>();
}
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index cea9829..7461356 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -18,6 +18,17 @@
package org.apache.streams.twitter.provider;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.DatumStatusCounter;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.twitter.TwitterUserInformationConfiguration;
+import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
+
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
@@ -31,17 +42,6 @@ import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigParseOptions;
import org.apache.commons.lang.NotImplementedException;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfiguration;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.DatumStatusCounter;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProvider;
-import org.apache.streams.core.StreamsResultSet;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.twitter.TwitterUserInformationConfiguration;
-import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
-import org.apache.streams.util.ComponentUtils;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,7 +65,6 @@ import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -75,320 +74,335 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
/**
- * Retrieve recent posts from a list of user ids or names.
- *
- * To use from command line:
- *
- * Supply (at least) the following required configuration in application.conf:
- *
- * twitter.oauth.consumerKey
- * twitter.oauth.consumerSecret
- * twitter.oauth.accessToken
- * twitter.oauth.accessTokenSecret
- * twitter.info
- *
- * Launch using:
- *
- * mvn exec:java -Dexec.mainClass=org.apache.streams.twitter.provider.TwitterTimelineProvider -Dexec.args="application.conf tweets.json"
+ * Retrieve recent posts from a list of user ids or names.
*/
public class TwitterTimelineProvider implements StreamsProvider, Serializable {
- public final static String STREAMS_ID = "TwitterTimelineProvider";
-
- private final static Logger LOGGER = LoggerFactory.getLogger(TwitterTimelineProvider.class);
-
- public static final int MAX_NUMBER_WAITING = 10000;
-
- private TwitterUserInformationConfiguration config;
-
- protected final ReadWriteLock lock = new ReentrantReadWriteLock();
-
- public TwitterUserInformationConfiguration getConfig() {
- return config;
- }
-
- public void setConfig(TwitterUserInformationConfiguration config) {
- this.config = config;
- }
-
- protected Collection<String[]> screenNameBatches;
- protected Collection<Long> ids;
+ public static final String STREAMS_ID = "TwitterTimelineProvider";
- protected volatile Queue<StreamsDatum> providerQueue;
+ private static final Logger LOGGER = LoggerFactory.getLogger(TwitterTimelineProvider.class);
- protected int idsCount;
- protected Twitter client;
+ public static final int MAX_NUMBER_WAITING = 10000;
- protected ListeningExecutorService executor;
+ private TwitterUserInformationConfiguration config;
- protected DateTime start;
- protected DateTime end;
+ protected final ReadWriteLock lock = new ReentrantReadWriteLock();
- protected final AtomicBoolean running = new AtomicBoolean();
+ public TwitterUserInformationConfiguration getConfig() {
+ return config;
+ }
- List<ListenableFuture<Object>> futures = new ArrayList<>();
+ public void setConfig(TwitterUserInformationConfiguration config) {
+ this.config = config;
+ }
- Boolean jsonStoreEnabled;
- Boolean includeEntitiesEnabled;
+ protected Collection<String[]> screenNameBatches;
+ protected Collection<Long> ids;
- public static void main(String[] args) throws Exception {
+ protected volatile Queue<StreamsDatum> providerQueue;
- Preconditions.checkArgument(args.length >= 2);
+ protected int idsCount;
+ protected Twitter client;
- String configfile = args[0];
- String outfile = args[1];
+ protected ListeningExecutorService executor;
- Config reference = ConfigFactory.load();
- File conf_file = new File(configfile);
- assert(conf_file.exists());
- Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+ protected DateTime start;
+ protected DateTime end;
- Config typesafe = testResourceConfig.withFallback(reference).resolve();
+ protected final AtomicBoolean running = new AtomicBoolean();
- StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe);
- TwitterUserInformationConfiguration config = new ComponentConfigurator<>(TwitterUserInformationConfiguration.class).detectConfiguration(typesafe, "twitter");
- TwitterTimelineProvider provider = new TwitterTimelineProvider(config);
+ List<ListenableFuture<Object>> futures = new ArrayList<>();
- ObjectMapper mapper = new StreamsJacksonMapper(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT));
+ Boolean jsonStoreEnabled;
+ Boolean includeEntitiesEnabled;
- PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
- provider.prepare(config);
- provider.startStream();
- do {
- Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
- Iterator<StreamsDatum> iterator = provider.readCurrent().iterator();
- while(iterator.hasNext()) {
- StreamsDatum datum = iterator.next();
- String json;
- try {
- json = mapper.writeValueAsString(datum.getDocument());
- outStream.println(json);
- } catch (JsonProcessingException e) {
- System.err.println(e.getMessage());
- }
- }
- } while( provider.isRunning());
- provider.cleanUp();
- outStream.flush();
- }
+ /**
+ * To use from command line:
+ *
+ * <p/>
+ * Supply (at least) the following required configuration in application.conf:
+ *
+ * <p/>
+ * twitter.oauth.consumerKey
+ * twitter.oauth.consumerSecret
+ * twitter.oauth.accessToken
+ * twitter.oauth.accessTokenSecret
+ * twitter.info
+ *
+ * <p/>
+ * Launch using:
+ *
+ * <p/>
+ * mvn exec:java -Dexec.mainClass=org.apache.streams.twitter.provider.TwitterTimelineProvider -Dexec.args="application.conf tweets.json"
+ *
+ * @param args args
+ * @throws Exception Exception
+ */
+ public static void main(String[] args) throws Exception {
- public TwitterTimelineProvider(TwitterUserInformationConfiguration config) {
- this.config = config;
- }
+ Preconditions.checkArgument(args.length >= 2);
- public Queue<StreamsDatum> getProviderQueue() {
- return this.providerQueue;
- }
+ String configfile = args[0];
+ String outfile = args[1];
- @Override
- public String getId() {
- return STREAMS_ID;
- }
+ Config reference = ConfigFactory.load();
+ File file = new File(configfile);
+ assert (file.exists());
+ Config testResourceConfig = ConfigFactory.parseFileAnySyntax(file, ConfigParseOptions.defaults().setAllowMissing(false));
- @Override
- public void prepare(Object o) {
+ Config typesafe = testResourceConfig.withFallback(reference).resolve();
+ StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe);
+ TwitterUserInformationConfiguration config = new ComponentConfigurator<>(TwitterUserInformationConfiguration.class).detectConfiguration(typesafe, "twitter");
+ TwitterTimelineProvider provider = new TwitterTimelineProvider(config);
+ ObjectMapper mapper = new StreamsJacksonMapper(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT));
+ PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
+ provider.prepare(config);
+ provider.startStream();
+ do {
+ Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
+ Iterator<StreamsDatum> iterator = provider.readCurrent().iterator();
+ while (iterator.hasNext()) {
+ StreamsDatum datum = iterator.next();
+ String json;
try {
- lock.writeLock().lock();
- providerQueue = constructQueue();
- } finally {
- lock.writeLock().unlock();
+ json = mapper.writeValueAsString(datum.getDocument());
+ outStream.println(json);
+ } catch (JsonProcessingException ex) {
+ System.err.println(ex.getMessage());
}
-
- Preconditions.checkNotNull(providerQueue);
- Preconditions.checkNotNull(config.getOauth().getConsumerKey());
- Preconditions.checkNotNull(config.getOauth().getConsumerSecret());
- Preconditions.checkNotNull(config.getOauth().getAccessToken());
- Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret());
- Preconditions.checkNotNull(config.getInfo());
-
- consolidateToIDs();
-
- if(ids.size() > 1)
- executor = MoreExecutors.listeningDecorator(TwitterUserInformationProvider.newFixedThreadPoolWithQueueSize(5, ids.size()));
- else
- executor = MoreExecutors.listeningDecorator(newSingleThreadExecutor());
+ }
+ }
+ while ( provider.isRunning() );
+ provider.cleanUp();
+ outStream.flush();
+ }
+
+ public TwitterTimelineProvider(TwitterUserInformationConfiguration config) {
+ this.config = config;
+ }
+
+ public Queue<StreamsDatum> getProviderQueue() {
+ return this.providerQueue;
+ }
+
+ @Override
+ public String getId() {
+ return STREAMS_ID;
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+
+ try {
+ lock.writeLock().lock();
+ providerQueue = constructQueue();
+ } finally {
+ lock.writeLock().unlock();
}
- @Override
- public void startStream() {
+ Preconditions.checkNotNull(providerQueue);
+ Preconditions.checkNotNull(config.getOauth().getConsumerKey());
+ Preconditions.checkNotNull(config.getOauth().getConsumerSecret());
+ Preconditions.checkNotNull(config.getOauth().getAccessToken());
+ Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret());
+ Preconditions.checkNotNull(config.getInfo());
- LOGGER.debug("{} startStream", STREAMS_ID);
+ consolidateToIDs();
- Preconditions.checkArgument(!ids.isEmpty());
+ if (ids.size() > 1) {
+ executor = MoreExecutors.listeningDecorator(TwitterUserInformationProvider.newFixedThreadPoolWithQueueSize(5, ids.size()));
+ } else {
+ executor = MoreExecutors.listeningDecorator(newSingleThreadExecutor());
+ }
+ }
- running.set(true);
+ @Override
+ public void startStream() {
- submitTimelineThreads(ids.toArray(new Long[0]));
+ LOGGER.debug("{} startStream", STREAMS_ID);
- executor.shutdown();
+ Preconditions.checkArgument(!ids.isEmpty());
- }
+ running.set(true);
- public boolean shouldContinuePulling(List<Status> statuses) {
- return (statuses != null) && (statuses.size() > 0);
- }
+ submitTimelineThreads(ids.toArray(new Long[0]));
- protected void submitTimelineThreads(Long[] ids) {
+ executor.shutdown();
- Twitter client = getTwitterClient();
+ }
- for(int i = 0; i < ids.length; i++) {
+ public boolean shouldContinuePulling(List<Status> statuses) {
+ return (statuses != null) && (statuses.size() > 0);
+ }
- TwitterTimelineProviderTask providerTask = new TwitterTimelineProviderTask(this, client, ids[i]);
- ListenableFuture future = executor.submit(providerTask);
- futures.add(future);
- LOGGER.info("submitted {}", ids[i]);
- }
+ protected void submitTimelineThreads(Long[] ids) {
- }
+ Twitter client = getTwitterClient();
- private Collection<Long> retrieveIds(String[] screenNames) {
- Twitter client = getTwitterClient();
+ for (int i = 0; i < ids.length; i++) {
- List<Long> ids = Lists.newArrayList();
- try {
- for (User tStat : client.lookupUsers(screenNames)) {
- ids.add(tStat.getId());
- }
- } catch (TwitterException e) {
- LOGGER.error("Failure retrieving user details.", e.getMessage());
- }
- return ids;
+ TwitterTimelineProviderTask providerTask = new TwitterTimelineProviderTask(this, client, ids[i]);
+ ListenableFuture future = executor.submit(providerTask);
+ futures.add(future);
+ LOGGER.info("submitted {}", ids[i]);
}
- public StreamsResultSet readCurrent() {
+ }
- StreamsResultSet result;
+ private Collection<Long> retrieveIds(String[] screenNames) {
+ Twitter client = getTwitterClient();
- LOGGER.debug("Providing {} docs", providerQueue.size());
+ List<Long> ids = Lists.newArrayList();
+ try {
+ for (User twitterUser : client.lookupUsers(screenNames)) {
+ ids.add(twitterUser.getId());
+ }
+ } catch (TwitterException ex) {
+ LOGGER.error("Failure retrieving user details.", ex.getMessage());
+ }
+ return ids;
+ }
- try {
- lock.writeLock().lock();
- result = new StreamsResultSet(providerQueue);
- result.setCounter(new DatumStatusCounter());
- providerQueue = constructQueue();
- } finally {
- lock.writeLock().unlock();
- }
+ @Override
+ public StreamsResultSet readCurrent() {
- if( result.size() == 0 && providerQueue.isEmpty() && executor.isTerminated() ) {
- LOGGER.info("Finished. Cleaning up...");
+ StreamsResultSet result;
- running.set(false);
+ LOGGER.debug("Providing {} docs", providerQueue.size());
- LOGGER.info("Exiting");
- }
+ try {
+ lock.writeLock().lock();
+ result = new StreamsResultSet(providerQueue);
+ result.setCounter(new DatumStatusCounter());
+ providerQueue = constructQueue();
+ } finally {
+ lock.writeLock().unlock();
+ }
- return result;
+ if ( result.size() == 0 && providerQueue.isEmpty() && executor.isTerminated() ) {
+ LOGGER.info("Finished. Cleaning up...");
- }
+ running.set(false);
- protected Queue<StreamsDatum> constructQueue() {
- return new LinkedBlockingQueue<StreamsDatum>();
+ LOGGER.info("Exiting");
}
- public StreamsResultSet readNew(BigInteger sequence) {
- LOGGER.debug("{} readNew", STREAMS_ID);
- throw new NotImplementedException();
- }
+ return result;
- public StreamsResultSet readRange(DateTime start, DateTime end) {
- LOGGER.debug("{} readRange", STREAMS_ID);
- throw new NotImplementedException();
- }
+ }
+ protected Queue<StreamsDatum> constructQueue() {
+ return new LinkedBlockingQueue<StreamsDatum>();
+ }
+ public StreamsResultSet readNew(BigInteger sequence) {
+ LOGGER.debug("{} readNew", STREAMS_ID);
+ throw new NotImplementedException();
+ }
- /**
- * Using the "info" list that is contained in the configuration, ensure that all
- * account identifiers are converted to IDs (Longs) instead of screenNames (Strings)
- */
- protected void consolidateToIDs() {
- List<String> screenNames = Lists.newArrayList();
- ids = Lists.newArrayList();
-
- for(String account : config.getInfo()) {
- try {
- if (new Long(account) != null) {
- ids.add(Long.parseLong(Objects.toString(account, null)));
- } else {
- screenNames.add(account);
- }
- } catch (Exception e) {
- LOGGER.error("Exception while trying to add ID: {{}}, {}", account, e);
- }
- }
+ public StreamsResultSet readRange(DateTime start, DateTime end) {
+ LOGGER.debug("{} readRange", STREAMS_ID);
+ throw new NotImplementedException();
+ }
- // Twitter allows for batches up to 100 per request, but you cannot mix types
- screenNameBatches = new ArrayList<String[]>();
- while(screenNames.size() >= 100) {
- screenNameBatches.add(screenNames.subList(0, 100).toArray(new String[0]));
- screenNames = screenNames.subList(100, screenNames.size());
- }
- if(screenNames.size() > 0)
- screenNameBatches.add(screenNames.toArray(new String[ids.size()]));
- Iterator<String[]> screenNameBatchIterator = screenNameBatches.iterator();
+ /**
+ * Using the "info" list that is contained in the configuration, ensure that all
+ * account identifiers are converted to IDs (Longs) instead of screenNames (Strings).
+ */
+ protected void consolidateToIDs() {
+ List<String> screenNames = Lists.newArrayList();
+ ids = Lists.newArrayList();
- while(screenNameBatchIterator.hasNext()) {
- Collection<Long> batchIds = retrieveIds(screenNameBatchIterator.next());
- ids.addAll(batchIds);
+ for (String account : config.getInfo()) {
+ try {
+ if (new Long(account) != null) {
+ ids.add(Long.parseLong(Objects.toString(account, null)));
+ } else {
+ screenNames.add(account);
}
+ } catch (Exception ex) {
+ LOGGER.error("Exception while trying to add ID: {{}}, {}", account, ex);
+ }
}
- public Twitter getTwitterClient() {
-
- String baseUrl = TwitterProviderUtil.baseUrl(config);
-
- ConfigurationBuilder builder = new ConfigurationBuilder()
- .setOAuthConsumerKey(config.getOauth().getConsumerKey())
- .setOAuthConsumerSecret(config.getOauth().getConsumerSecret())
- .setOAuthAccessToken(config.getOauth().getAccessToken())
- .setOAuthAccessTokenSecret(config.getOauth().getAccessTokenSecret())
- .setIncludeEntitiesEnabled(true)
- .setJSONStoreEnabled(true)
- .setAsyncNumThreads(3)
- .setRestBaseURL(baseUrl)
- .setIncludeMyRetweetEnabled(Boolean.TRUE)
- .setPrettyDebugEnabled(Boolean.TRUE);
-
- return new TwitterFactory(builder.build()).getInstance();
+ // Twitter allows for batches up to 100 per request, but you cannot mix types
+ screenNameBatches = new ArrayList<String[]>();
+ while (screenNames.size() >= 100) {
+ screenNameBatches.add(screenNames.subList(0, 100).toArray(new String[0]));
+ screenNames = screenNames.subList(100, screenNames.size());
}
- @Override
- public void cleanUp() {
- shutdownAndAwaitTermination(executor);
+ if (screenNames.size() > 0) {
+ screenNameBatches.add(screenNames.toArray(new String[ids.size()]));
}
- void shutdownAndAwaitTermination(ExecutorService pool) {
- pool.shutdown(); // Disable new tasks from being submitted
- try {
- // Wait a while for existing tasks to terminate
- if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
- pool.shutdownNow(); // Cancel currently executing tasks
- // Wait a while for tasks to respond to being cancelled
- if (!pool.awaitTermination(10, TimeUnit.SECONDS))
- System.err.println("Pool did not terminate");
- }
- } catch (InterruptedException ie) {
- // (Re-)Cancel if current thread also interrupted
- pool.shutdownNow();
- // Preserve interrupt status
- Thread.currentThread().interrupt();
- }
- }
+ Iterator<String[]> screenNameBatchIterator = screenNameBatches.iterator();
- @Override
- public boolean isRunning() {
- if (providerQueue.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone()) {
- LOGGER.info("Completed");
- running.set(false);
- LOGGER.info("Exiting");
+ while (screenNameBatchIterator.hasNext()) {
+ Collection<Long> batchIds = retrieveIds(screenNameBatchIterator.next());
+ ids.addAll(batchIds);
+ }
+ }
+
+ /**
+ * get Twitter Client from TwitterUserInformationConfiguration.
+ * @return result
+ */
+ public Twitter getTwitterClient() {
+
+ String baseUrl = TwitterProviderUtil.baseUrl(config);
+
+ ConfigurationBuilder builder = new ConfigurationBuilder()
+ .setOAuthConsumerKey(config.getOauth().getConsumerKey())
+ .setOAuthConsumerSecret(config.getOauth().getConsumerSecret())
+ .setOAuthAccessToken(config.getOauth().getAccessToken())
+ .setOAuthAccessTokenSecret(config.getOauth().getAccessTokenSecret())
+ .setIncludeEntitiesEnabled(true)
+ .setJSONStoreEnabled(true)
+ .setAsyncNumThreads(3)
+ .setRestBaseURL(baseUrl)
+ .setIncludeMyRetweetEnabled(Boolean.TRUE)
+ .setPrettyDebugEnabled(Boolean.TRUE);
+
+ return new TwitterFactory(builder.build()).getInstance();
+ }
+
+ @Override
+ public void cleanUp() {
+ shutdownAndAwaitTermination(executor);
+ }
+
+ void shutdownAndAwaitTermination(ExecutorService pool) {
+ pool.shutdown(); // Disable new tasks from being submitted
+ try {
+ // Wait a while for existing tasks to terminate
+ if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+ pool.shutdownNow(); // Cancel currently executing tasks
+ // Wait a while for tasks to respond to being cancelled
+ if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+ System.err.println("Pool did not terminate");
}
- return running.get();
+ }
+ } catch (InterruptedException ie) {
+ // (Re-)Cancel if current thread also interrupted
+ pool.shutdownNow();
+ // Preserve interrupt status
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ @Override
+ public boolean isRunning() {
+ if (providerQueue.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone()) {
+ LOGGER.info("Completed");
+ running.set(false);
+ LOGGER.info("Exiting");
}
+ return running.get();
+ }
}