You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/10/07 19:43:39 UTC
[1/8] incubator-streams git commit: related to STREAMS-403
Repository: incubator-streams
Updated Branches:
refs/heads/master 8bb4ca8a6 -> 4febde277
related to STREAMS-403
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/9bf8ef9b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/9bf8ef9b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/9bf8ef9b
Branch: refs/heads/master
Commit: 9bf8ef9ba566351a855366875f0253059c0473ed
Parents: 8bb4ca8
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Tue Oct 4 15:06:14 2016 -0500
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Tue Oct 4 15:11:44 2016 -0500
----------------------------------------------------------------------
.../twitter/provider/TwitterErrorHandler.java | 28 +++--
.../provider/TwitterFollowingProvider.java | 9 +-
.../provider/TwitterFollowingProviderTask.java | 8 +-
.../provider/TwitterTimelineProvider.java | 8 +-
.../provider/TwitterTimelineProviderTask.java | 20 +++-
.../TwitterUserInformationProvider.java | 103 ++++++++++++++-----
.../src/main/jsonschema/com/twitter/Follow.json | 1 +
.../com/twitter/TwitterConfiguration.json | 10 ++
8 files changed, 142 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9bf8ef9b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java
index 51236ba..90f6b62 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java
@@ -18,6 +18,9 @@
package org.apache.streams.twitter.provider;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.twitter.TwitterConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import twitter4j.Twitter;
@@ -32,11 +35,18 @@ public class TwitterErrorHandler
private final static Logger LOGGER = LoggerFactory.getLogger(TwitterErrorHandler.class);
// selected because 3 * 5 + n >= 15 for positive n
- protected static final long retry = 3*60*1000;
+ protected static long retry =
+ new ComponentConfigurator<TwitterConfiguration>(TwitterConfiguration.class).detectConfiguration(
+ StreamsConfigurator.getConfig().getConfig("twitter")
+ ).getRetrySleepMs();
+ protected static long retryMax =
+ new ComponentConfigurator<TwitterConfiguration>(TwitterConfiguration.class).detectConfiguration(
+ StreamsConfigurator.getConfig().getConfig("twitter")
+ ).getRetryMax();
@Deprecated
public static int handleTwitterError(Twitter twitter, Exception exception) {
- return handleTwitterError(twitter, null, exception);
+ return handleTwitterError( twitter, null, exception);
}
public static int handleTwitterError(Twitter twitter, Long id, Exception exception)
@@ -82,11 +92,11 @@ public class TwitterErrorHandler
LOGGER.warn("User does not exist: {}", id);
else
LOGGER.warn("User does not exist");
- return 100;
+ return (int)retryMax;
}
else
{
- return 1;
+ return (int)retryMax/3;
}
}
else
@@ -94,7 +104,7 @@ public class TwitterErrorHandler
if(e.getExceptionCode().equals("ced778ef-0c669ac0"))
{
// This is a known weird issue, not exactly sure the cause, but you'll never be able to get the data.
- return 5;
+ return (int)retryMax/3;
}
else if(e.getExceptionCode().equals("4be80492-0a7bf7c7")) {
// This is a 401 reflecting credentials don't have access to the requested resource.
@@ -102,7 +112,7 @@ public class TwitterErrorHandler
LOGGER.warn("Authentication Exception accessing id: {}", id);
else
LOGGER.warn("Authentication Exception");
- return 5;
+ return (int)retryMax;
}
else
{
@@ -111,19 +121,19 @@ public class TwitterErrorHandler
LOGGER.warn(" Access: {}", e.getAccessLevel());
LOGGER.warn(" Code: {}", e.getExceptionCode());
LOGGER.warn(" Message: {}", e.getLocalizedMessage());
- return 1;
+ return (int)retryMax/10;
}
}
}
else if(exception instanceof RuntimeException)
{
LOGGER.warn("TwitterGrabber: Unknown Runtime Error", exception.getMessage());
- return 1;
+ return (int)retryMax/3;
}
else
{
LOGGER.info("Completely Unknown Exception: {}", exception);
- return 1;
+ return (int)retryMax/3;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9bf8ef9b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
index dc15407..27c8526 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
import twitter4j.Twitter;
import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -57,6 +58,7 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider {
}
public TwitterFollowingProvider(TwitterFollowingConfiguration config) {
+ super(config);
this.config = config;
}
@@ -130,7 +132,7 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider {
}
protected Queue<StreamsDatum> constructQueue() {
- return Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(MAX_NUMBER_WAITING));
+ return new ConcurrentLinkedQueue<StreamsDatum>();
}
@Override
@@ -149,4 +151,9 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider {
lock.readLock().unlock();
}
}
+
+ @Override
+ public boolean isRunning() {
+ return running.get();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9bf8ef9b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
index 5397757..cc71d48 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
@@ -139,7 +139,7 @@ public class TwitterFollowingProviderTask implements Runnable {
Preconditions.checkNotNull(follow);
if( count < provider.getConfig().getMaxItems()) {
- provider.addDatum(new StreamsDatum(follow));
+ ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue);
count++;
}
@@ -157,7 +157,7 @@ public class TwitterFollowingProviderTask implements Runnable {
catch(Exception e) {
keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
}
- } while (curser != 0 && keepTrying < 10 && count < provider.getConfig().getMaxItems());
+ } while (curser != 0 && keepTrying < provider.getConfig().getRetryMax() && count < provider.getConfig().getMaxItems());
}
private void collectIds(Long id) {
@@ -196,7 +196,7 @@ public class TwitterFollowingProviderTask implements Runnable {
Preconditions.checkNotNull(follow);
if( count < provider.getConfig().getMaxItems()) {
- provider.addDatum(new StreamsDatum(follow));
+ ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue);
count++;
}
} catch (Exception e) {
@@ -213,7 +213,7 @@ public class TwitterFollowingProviderTask implements Runnable {
catch(Exception e) {
keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
}
- } while (curser != 0 && keepTrying < 10 && count < provider.getConfig().getMaxItems());
+ } while (curser != 0 && keepTrying < provider.getConfig().getRetryMax() && count < provider.getConfig().getMaxItems());
}
protected void getFollowing(String screenName) {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9bf8ef9b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index 26ba887..a8eada4 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -109,7 +109,7 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
Preconditions.checkArgument(!ids.isEmpty());
- LOGGER.info("readCurrent");
+ LOGGER.debug("{} - readCurrent", ids);
submitTimelineThreads(ids.toArray(new Long[0]));
@@ -150,10 +150,10 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
public StreamsResultSet readCurrent() {
- LOGGER.info("Providing {} docs", providerQueue.size());
-
StreamsResultSet result;
+ LOGGER.info("Providing {} docs", providerQueue.size());
+
try {
lock.writeLock().lock();
result = new StreamsResultSet(providerQueue);
@@ -176,7 +176,7 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
}
protected Queue<StreamsDatum> constructQueue() {
- return Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(MAX_NUMBER_WAITING));
+ return new LinkedBlockingQueue<StreamsDatum>();
}
public StreamsResultSet readNew(BigInteger sequence) {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9bf8ef9b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
index adc37ca..b8d5e1d 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
@@ -18,10 +18,17 @@
package org.apache.streams.twitter.provider;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
+import org.apache.streams.twitter.pojo.*;
+import org.apache.streams.util.ComponentUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import twitter4j.*;
+import twitter4j.Status;
import java.util.List;
@@ -32,6 +39,8 @@ public class TwitterTimelineProviderTask implements Runnable {
private final static Logger LOGGER = LoggerFactory.getLogger(TwitterTimelineProviderTask.class);
+ private static ObjectMapper MAPPER = new StreamsJacksonMapper(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT));
+
protected TwitterTimelineProvider provider;
protected Twitter client;
protected Long id;
@@ -49,6 +58,8 @@ public class TwitterTimelineProviderTask implements Runnable {
List<Status> statuses = null;
int count = 0;
+ LOGGER.info(id + " Thread Starting");
+
do
{
int keepTrying = 0;
@@ -67,10 +78,15 @@ public class TwitterTimelineProviderTask implements Runnable {
statuses = client.getUserTimeline(id, paging);
for (Status tStat : statuses) {
- String json = TwitterObjectFactory.getRawJSON(tStat);
+ String json = TwitterObjectFactory.getRawJSON(tStat);
if( count < provider.getConfig().getMaxItems() ) {
- provider.addDatum(new StreamsDatum(json));
+ try {
+ org.apache.streams.twitter.pojo.Tweet tweet = MAPPER.readValue(json, org.apache.streams.twitter.pojo.Tweet.class);
+ ComponentUtils.offerUntilSuccess(new StreamsDatum(tweet), provider.providerQueue);
+ } catch(Exception exception) {
+ LOGGER.warn("Failed to read document as Tweet ", tStat);
+ }
count++;
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9bf8ef9b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
index c4cc96d..78eb3e6 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
@@ -18,17 +18,24 @@
package org.apache.streams.twitter.provider;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Queues;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.commons.lang.NotImplementedException;
import org.apache.streams.config.ComponentConfigurator;
import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.DatumStatusCounter;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.twitter.TwitterFollowingConfiguration;
import org.apache.streams.twitter.TwitterUserInformationConfiguration;
+import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
+import org.apache.streams.twitter.pojo.User;
import org.apache.streams.util.ComponentUtils;
import org.joda.time.DateTime;
import org.slf4j.Logger;
@@ -36,7 +43,6 @@ import org.slf4j.LoggerFactory;
import twitter4j.Twitter;
import twitter4j.TwitterException;
import twitter4j.TwitterFactory;
-import twitter4j.User;
import twitter4j.conf.ConfigurationBuilder;
import twitter4j.json.DataObjectFactory;
@@ -48,17 +54,27 @@ import java.util.List;
import java.util.Queue;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
public class TwitterUserInformationProvider implements StreamsProvider, Serializable
{
public static final String STREAMS_ID = "TwitterUserInformationProvider";
+ private static ObjectMapper MAPPER = new StreamsJacksonMapper(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT));
+
private static final Logger LOGGER = LoggerFactory.getLogger(TwitterUserInformationProvider.class);
+ public static final int MAX_NUMBER_WAITING = 1000;
+
private TwitterUserInformationConfiguration config;
- protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>();
+ protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ protected volatile Queue<StreamsDatum> providerQueue;
public TwitterUserInformationConfiguration getConfig() { return config; }
@@ -81,7 +97,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
}
public TwitterUserInformationProvider() {
- this.config = new ComponentConfigurator<>(TwitterFollowingConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig().getConfig("twitter"));
+ this.config = new ComponentConfigurator<>(TwitterUserInformationConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig().getConfig("twitter"));
}
public TwitterUserInformationProvider(TwitterUserInformationConfiguration config) {
@@ -99,7 +115,20 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
@Override
public void startStream() {
+
+ Preconditions.checkArgument(idsBatches.hasNext() || screenNameBatches.hasNext());
+
+ LOGGER.info("{}{} - startStream", idsBatches, screenNameBatches);
+
+ while(idsBatches.hasNext())
+ loadBatch(idsBatches.next());
+
+ while(screenNameBatches.hasNext())
+ loadBatch(screenNameBatches.next());
+
running.set(true);
+
+ executor.shutdown();
}
protected void loadBatch(Long[] ids) {
@@ -116,9 +145,14 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
for(int i = 0; i < ids.length; i++)
toQuery[i] = ids[i];
- for (User tStat : client.lookupUsers(toQuery)) {
- String json = DataObjectFactory.getRawJSON(tStat);
- ComponentUtils.offerUntilSuccess(new StreamsDatum(json), providerQueue);
+ for (twitter4j.User tUser : client.lookupUsers(toQuery)) {
+ String json = DataObjectFactory.getRawJSON(tUser);
+ try {
+ User user = MAPPER.readValue(json, org.apache.streams.twitter.pojo.User.class);
+ ComponentUtils.offerUntilSuccess(new StreamsDatum(user), providerQueue);
+ } catch(Exception exception) {
+ LOGGER.warn("Failed to read document as User ", tUser);
+ }
}
keepTrying = 10;
}
@@ -141,9 +175,14 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
{
try
{
- for (User tStat : client.lookupUsers(ids)) {
- String json = DataObjectFactory.getRawJSON(tStat);
- providerQueue.offer(new StreamsDatum(json));
+ for (twitter4j.User tUser : client.lookupUsers(ids)) {
+ String json = DataObjectFactory.getRawJSON(tUser);
+ try {
+ User user = MAPPER.readValue(json, org.apache.streams.twitter.pojo.User.class);
+ ComponentUtils.offerUntilSuccess(new StreamsDatum(user), providerQueue);
+ } catch(Exception exception) {
+ LOGGER.warn("Failed to read document as User ", tUser);
+ }
}
keepTrying = 10;
}
@@ -158,30 +197,34 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
public StreamsResultSet readCurrent() {
- Preconditions.checkArgument(idsBatches.hasNext() || screenNameBatches.hasNext());
-
- LOGGER.info("readCurrent");
-
- while(idsBatches.hasNext())
- loadBatch(idsBatches.next());
-
- while(screenNameBatches.hasNext())
- loadBatch(screenNameBatches.next());
-
+ LOGGER.info("{}{} - readCurrent", idsBatches, screenNameBatches);
- LOGGER.info("Finished. Cleaning up...");
+ StreamsResultSet result;
- LOGGER.info("Providing {} docs", providerQueue.size());
+ try {
+ lock.writeLock().lock();
+ result = new StreamsResultSet(providerQueue);
+ result.setCounter(new DatumStatusCounter());
+ providerQueue = constructQueue();
+ LOGGER.info("{}{} - providing {} docs", idsBatches, screenNameBatches, result.size());
+ } finally {
+ lock.writeLock().unlock();
+ }
- StreamsResultSet result = new StreamsResultSet(providerQueue);
- running.set(false);
+ if( providerQueue.isEmpty() && executor.isTerminated()) {
+ LOGGER.info("{}{} - completed", idsBatches, screenNameBatches);
- LOGGER.info("Exiting");
+ running.set(false);
+ }
return result;
}
+ protected Queue<StreamsDatum> constructQueue() {
+ return new LinkedBlockingQueue<StreamsDatum>();
+ }
+
public StreamsResultSet readNew(BigInteger sequence) {
LOGGER.debug("{} readNew", STREAMS_ID);
throw new NotImplementedException();
@@ -225,6 +268,13 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
if( o instanceof TwitterFollowingConfiguration )
config = (TwitterUserInformationConfiguration) o;
+ try {
+ lock.writeLock().lock();
+ providerQueue = constructQueue();
+ } finally {
+ lock.writeLock().unlock();
+ }
+
Preconditions.checkNotNull(providerQueue);
Preconditions.checkNotNull(config.getOauth().getConsumerKey());
Preconditions.checkNotNull(config.getOauth().getConsumerSecret());
@@ -276,7 +326,10 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
if(screenNames.size() > 0)
screenNameBatches.add(screenNames.toArray(new String[ids.size()]));
- executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, (ids.size() + screenNames.size())));
+ if(ids.size() + screenNames.size() > 0)
+ executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, (ids.size() + screenNames.size())));
+ else
+ executor = MoreExecutors.listeningDecorator(newSingleThreadExecutor());
this.idsBatches = idsBatches.iterator();
this.screenNameBatches = screenNameBatches.iterator();
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9bf8ef9b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/Follow.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/Follow.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/Follow.json
index b667540..320db12 100644
--- a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/Follow.json
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/Follow.json
@@ -6,6 +6,7 @@
],
"id": "#",
"javaType" : "org.apache.streams.twitter.pojo.Follow",
+ "javaInterfaces": ["java.io.Serializable"],
"properties": {
"follower": {
"$ref": "User.json"
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9bf8ef9b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterConfiguration.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterConfiguration.json
index 5d911af..69048d1 100644
--- a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterConfiguration.json
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterConfiguration.json
@@ -72,6 +72,16 @@
"type": "string"
}
}
+ },
+ "retrySleepMs": {
+ "type": "integer",
+ "description": "ms to sleep when hitting a rate limit",
+ "default": 100000
+ },
+ "retryMax": {
+ "type": "integer",
+ "description": "ms to sleep when hitting a rate limit",
+ "default": 10
}
}
}
\ No newline at end of file
[3/8] incubator-streams git commit: employ args to simplify test and
provider command line
Posted by sb...@apache.org.
employ args to simplify test and provider command line
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/d9e58cdd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/d9e58cdd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/d9e58cdd
Branch: refs/heads/master
Commit: d9e58cdd67020520d592aad621b3aff6a8249537
Parents: 0813b11
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Tue Oct 4 20:25:44 2016 -0500
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Tue Oct 4 20:25:44 2016 -0500
----------------------------------------------------------------------
.../streams-provider-twitter/pom.xml | 12 ++++
.../provider/TwitterTimelineProvider.java | 64 +++++++++++++++-----
.../provider/TwitterTimelineProviderIT.java | 35 +----------
.../resources/TwitterTimelineProviderIT.conf | 4 ++
.../resources/TwitterTimelineProviderTest.conf | 4 --
5 files changed, 69 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d9e58cdd/streams-contrib/streams-provider-twitter/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/pom.xml b/streams-contrib/streams-provider-twitter/pom.xml
index 903d3a7..7ec0908 100644
--- a/streams-contrib/streams-provider-twitter/pom.xml
+++ b/streams-contrib/streams-provider-twitter/pom.xml
@@ -209,6 +209,18 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
</plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <version>1.5.0</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>exec</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d9e58cdd/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index b8653b8..61cddaf 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -24,8 +24,12 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import com.google.common.util.concurrent.Uninterruptibles;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
import org.apache.commons.lang.NotImplementedException;
import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.DatumStatusCounter;
import org.apache.streams.core.StreamsDatum;
@@ -42,6 +46,11 @@ import org.slf4j.LoggerFactory;
import twitter4j.*;
import twitter4j.conf.ConfigurationBuilder;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
import java.io.Serializable;
import java.math.BigInteger;
import java.util.*;
@@ -55,8 +64,22 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* Retrieve recent posts from a list of user ids or names.
+ *
+ * To use from command line:
+ *
+ * Supply (at least) the following required configuration in application.conf:
+ *
+ * twitter.oauth.consumerKey
+ * twitter.oauth.consumerSecret
+ * twitter.oauth.accessToken
+ * twitter.oauth.accessTokenSecret
+ * twitter.info
+ *
+ * Launch using:
+ *
+ * mvn exec:java -Dexec.mainClass=org.apache.streams.twitter.provider.TwitterTimelineProvider -Dexec.args="application.conf tweets.json"
*/
-public class TwitterTimelineProvider implements StreamsProvider, Serializable, Runnable {
+public class TwitterTimelineProvider implements StreamsProvider, Serializable {
public final static String STREAMS_ID = "TwitterTimelineProvider";
@@ -64,30 +87,43 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable, R
private static ObjectMapper MAPPER = new StreamsJacksonMapper(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT));
- public static void main(String[] args) {
- TwitterUserInformationConfiguration config = new ComponentConfigurator<>(TwitterUserInformationConfiguration.class).detectConfiguration("twitter");
+ public static void main(String[] args) throws Exception {
+
+ Preconditions.checkArgument(args.length >= 2);
+
+ String configfile = args[0];
+ String outfile = args[1];
+
+ Config reference = ConfigFactory.load();
+ File conf_file = new File(configfile);
+ assert(conf_file.exists());
+ Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+
+ Config typesafe = testResourceConfig.withFallback(reference).resolve();
+
+ StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe);
+ TwitterUserInformationConfiguration config = new ComponentConfigurator<>(TwitterUserInformationConfiguration.class).detectConfiguration(typesafe, "twitter");
TwitterTimelineProvider provider = new TwitterTimelineProvider(config);
- provider.run();
- }
- @Override
- public void run() {
- prepare(config);
- startStream();
+ PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
+ provider.prepare(config);
+ provider.startStream();
do {
- Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
- Iterator<StreamsDatum> iterator = readCurrent().iterator();
+ Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
+ Iterator<StreamsDatum> iterator = provider.readCurrent().iterator();
while(iterator.hasNext()) {
StreamsDatum datum = iterator.next();
String json;
try {
json = MAPPER.writeValueAsString(datum.getDocument());
- System.out.println(json);
+ outStream.println(json);
} catch (JsonProcessingException e) {
System.err.println(e.getMessage());
}
}
- } while( isRunning());
+ } while( provider.isRunning());
+ provider.cleanUp();
+ outStream.flush();
}
public static final int MAX_NUMBER_WAITING = 10000;
@@ -189,7 +225,7 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable, R
StreamsResultSet result;
- LOGGER.info("Providing {} docs", providerQueue.size());
+ LOGGER.debug("Providing {} docs", providerQueue.size());
try {
lock.writeLock().lock();
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d9e58cdd/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderIT.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderIT.java
index e0f3b6a..f21a87e 100644
--- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderIT.java
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderIT.java
@@ -43,26 +43,10 @@ public class TwitterTimelineProviderIT {
@Test
public void testTwitterTimelineProvider() throws Exception {
- PrintStream stdout = new PrintStream(new BufferedOutputStream(new FileOutputStream("./target/test-classes/TwitterTimelineProviderTest.stdout.txt")));
- PrintStream stderr = new PrintStream(new BufferedOutputStream(new FileOutputStream("./target/test-classes/TwitterTimelineProviderTest.stderr.txt")));
+ String configfile = "./target/test-classes/TwitterTimelineProviderIT.conf";
+ String outfile = "./target/test-classes/TwitterTimelineProviderIT.txt";
- System.setOut(stdout);
- System.setErr(stderr);
-
- Config reference = ConfigFactory.load();
- File conf_file = new File("target/test-classes/TwitterTimelineProviderTest.conf");
- assert(conf_file.exists());
- Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
-
- Config typesafe = testResourceConfig.withFallback(reference).resolve();
- StreamsConfiguration streams = StreamsConfigurator.detectConfiguration(typesafe);
- TwitterUserInformationConfiguration testConfig = new ComponentConfigurator<>(TwitterUserInformationConfiguration.class).detectConfiguration(typesafe.getConfig("twitter"));
-
- TwitterTimelineProvider provider = new TwitterTimelineProvider(testConfig);
- provider.run();
-
- stdout.flush();
- stderr.flush();
+ TwitterTimelineProvider.main(Lists.newArrayList(configfile, outfile).toArray(new String[2]));
File out = new File("target/test-classes/TwitterTimelineProviderTest.stdout.txt");
assert (out.exists());
@@ -76,18 +60,5 @@ public class TwitterTimelineProviderIT {
assert (outCounter.getLineNumber() == 1000);
- File err = new File("target/test-classes/TwitterTimelineProviderTest.stderr.txt");
- assert (err.exists());
- assert (err.canRead());
- assert (err.isFile());
-
- FileReader errReader = new FileReader(err);
- LineNumberReader errCounter = new LineNumberReader(errReader);
-
- while(errCounter.readLine() != null) {}
-
- assert (errCounter.getLineNumber() == 0);
-
-
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d9e58cdd/streams-contrib/streams-provider-twitter/src/test/resources/TwitterTimelineProviderIT.conf
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/resources/TwitterTimelineProviderIT.conf b/streams-contrib/streams-provider-twitter/src/test/resources/TwitterTimelineProviderIT.conf
new file mode 100644
index 0000000..a7862c4
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/test/resources/TwitterTimelineProviderIT.conf
@@ -0,0 +1,4 @@
+twitter.info = [
+ 18055613
+]
+twitter.max_items = 1000
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d9e58cdd/streams-contrib/streams-provider-twitter/src/test/resources/TwitterTimelineProviderTest.conf
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/resources/TwitterTimelineProviderTest.conf b/streams-contrib/streams-provider-twitter/src/test/resources/TwitterTimelineProviderTest.conf
deleted file mode 100644
index a7862c4..0000000
--- a/streams-contrib/streams-provider-twitter/src/test/resources/TwitterTimelineProviderTest.conf
+++ /dev/null
@@ -1,4 +0,0 @@
-twitter.info = [
- 18055613
-]
-twitter.max_items = 1000
\ No newline at end of file
[2/8] incubator-streams git commit: example of STREAMS-415 using
twitter
Posted by sb...@apache.org.
example of STREAMS-415 using twitter
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/0813b11e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/0813b11e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/0813b11e
Branch: refs/heads/master
Commit: 0813b11edd535322cbabafd9a91e77136812e8bb
Parents: 9bf8ef9
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Tue Oct 4 17:37:06 2016 -0500
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Tue Oct 4 17:37:24 2016 -0500
----------------------------------------------------------------------
.../provider/TwitterTimelineProvider.java | 40 ++++++++-
.../src/site/markdown/index.md | 18 ++++
.../provider/TwitterTimelineProviderIT.java | 93 ++++++++++++++++++++
.../provider/TwitterTimelineProviderTest.java | 39 --------
.../resources/TwitterTimelineProviderTest.conf | 4 +
5 files changed, 154 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0813b11e/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index a8eada4..b8653b8 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -18,15 +18,23 @@
package org.apache.streams.twitter.provider;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
+import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.commons.lang.NotImplementedException;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.DatumStatusCounter;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.twitter.TwitterUserInformationConfiguration;
+import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
+import org.apache.streams.twitter.pojo.Tweet;
import org.apache.streams.util.ComponentUtils;
import org.joda.time.DateTime;
import org.slf4j.Logger;
@@ -48,12 +56,40 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* Retrieve recent posts from a list of user ids or names.
*/
-public class TwitterTimelineProvider implements StreamsProvider, Serializable {
+public class TwitterTimelineProvider implements StreamsProvider, Serializable, Runnable {
public final static String STREAMS_ID = "TwitterTimelineProvider";
private final static Logger LOGGER = LoggerFactory.getLogger(TwitterTimelineProvider.class);
+ private static ObjectMapper MAPPER = new StreamsJacksonMapper(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT));
+
+ public static void main(String[] args) {
+ TwitterUserInformationConfiguration config = new ComponentConfigurator<>(TwitterUserInformationConfiguration.class).detectConfiguration("twitter");
+ TwitterTimelineProvider provider = new TwitterTimelineProvider(config);
+ provider.run();
+ }
+
+ @Override
+ public void run() {
+ prepare(config);
+ startStream();
+ do {
+ Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+ Iterator<StreamsDatum> iterator = readCurrent().iterator();
+ while(iterator.hasNext()) {
+ StreamsDatum datum = iterator.next();
+ String json;
+ try {
+ json = MAPPER.writeValueAsString(datum.getDocument());
+ System.out.println(json);
+ } catch (JsonProcessingException e) {
+ System.err.println(e.getMessage());
+ }
+ }
+ } while( isRunning());
+ }
+
public static final int MAX_NUMBER_WAITING = 10000;
private TwitterUserInformationConfiguration config;
@@ -116,6 +152,7 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
running.set(true);
executor.shutdown();
+
}
public boolean shouldContinuePulling(List<Status> statuses) {
@@ -304,4 +341,5 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
lock.readLock().unlock();
}
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0813b11e/streams-contrib/streams-provider-twitter/src/site/markdown/index.md
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/site/markdown/index.md b/streams-contrib/streams-provider-twitter/src/site/markdown/index.md
index ec5d1c8..4249956 100644
--- a/streams-contrib/streams-provider-twitter/src/site/markdown/index.md
+++ b/streams-contrib/streams-provider-twitter/src/site/markdown/index.md
@@ -31,6 +31,24 @@ streams-provider-twitter contains schema definitions, providers, conversions, an
| TwitterStreamProvider [TwitterStreamProvider.html](apidocs/org/apache/streams/twitter/TwitterStreamProvider.html "javadoc") | [TwitterStreamConfiguration.json](com/twitter/TwitterStreamConfiguration.json "TwitterStreamConfiguration.json") [TwitterUserInformationConfiguration.html](apidocs/org/apache/streams/twitter/pojo/TwitterStreamConfiguration.html "javadoc") | [sample.conf](sample.conf "sample.conf")<br/>[userstream.conf](userstream.conf "userstream.conf") |
| TwitterFollowingProvider [TwitterFollowingProvider.html](apidocs/org/apache/streams/twitter/TwitterFollowingConfiguration.html "javadoc") | [TwitterFollowingConfiguration.json](com/twitter/TwitterFollowingConfiguration.json "TwitterFollowingConfiguration.json") [TwitterFollowingConfiguration.html](apidocs/org/apache/streams/twitter/pojo/TwitterFollowingConfiguration.html "javadoc") | [friends.conf](friends.conf "friends.conf")<br/>[followers.conf](followers.conf "followers.conf") |
+Test:
+-----
+
+Create a local file `application.conf` with valid twitter credentials
+
+ twitter {
+ oauth {
+ consumerKey = ""
+ consumerSecret = ""
+ accessToken = ""
+ accessTokenSecret = ""
+ }
+ }
+
+Build with integration testing enabled, using your credentials
+
+ mvn clean test verify -DskipITs=false -DargLine="-Dconfig.file=`pwd`/application.conf"
+
[JavaDocs](apidocs/index.html "JavaDocs")
###### Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0813b11e/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderIT.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderIT.java
new file mode 100644
index 0000000..e0f3b6a
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderIT.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.provider;
+
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.twitter.TwitterUserInformationConfiguration;
+import org.junit.Test;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.LineNumberReader;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.util.Arrays;
+import java.util.List;
+
+public class TwitterTimelineProviderIT {
+
+ @Test
+ public void testTwitterTimelineProvider() throws Exception {
+
+ PrintStream stdout = new PrintStream(new BufferedOutputStream(new FileOutputStream("./target/test-classes/TwitterTimelineProviderTest.stdout.txt")));
+ PrintStream stderr = new PrintStream(new BufferedOutputStream(new FileOutputStream("./target/test-classes/TwitterTimelineProviderTest.stderr.txt")));
+
+ System.setOut(stdout);
+ System.setErr(stderr);
+
+ Config reference = ConfigFactory.load();
+ File conf_file = new File("target/test-classes/TwitterTimelineProviderTest.conf");
+ assert(conf_file.exists());
+ Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+
+ Config typesafe = testResourceConfig.withFallback(reference).resolve();
+ StreamsConfiguration streams = StreamsConfigurator.detectConfiguration(typesafe);
+ TwitterUserInformationConfiguration testConfig = new ComponentConfigurator<>(TwitterUserInformationConfiguration.class).detectConfiguration(typesafe.getConfig("twitter"));
+
+ TwitterTimelineProvider provider = new TwitterTimelineProvider(testConfig);
+ provider.run();
+
+ stdout.flush();
+ stderr.flush();
+
+ File out = new File("target/test-classes/TwitterTimelineProviderTest.stdout.txt");
+ assert (out.exists());
+ assert (out.canRead());
+ assert (out.isFile());
+
+ FileReader outReader = new FileReader(out);
+ LineNumberReader outCounter = new LineNumberReader(outReader);
+
+ while(outCounter.readLine() != null) {}
+
+ assert (outCounter.getLineNumber() == 1000);
+
+ File err = new File("target/test-classes/TwitterTimelineProviderTest.stderr.txt");
+ assert (err.exists());
+ assert (err.canRead());
+ assert (err.isFile());
+
+ FileReader errReader = new FileReader(err);
+ LineNumberReader errCounter = new LineNumberReader(errReader);
+
+ while(errCounter.readLine() != null) {}
+
+ assert (errCounter.getLineNumber() == 0);
+
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0813b11e/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTest.java
deleted file mode 100644
index 0cdede0..0000000
--- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTest.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.provider;
-
-import org.apache.streams.twitter.TwitterUserInformationConfiguration;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.List;
-
-public class TwitterTimelineProviderTest {
-
- @Test
- public void consolidateToIDsTest() {
- List<String> ids = Arrays.asList("2342342", "", "144523", null);
-
- TwitterUserInformationConfiguration twitterUserInformationConfiguration = new TwitterUserInformationConfiguration();
- twitterUserInformationConfiguration.setInfo(ids);
- TwitterTimelineProvider twitterTimelineProvider = new TwitterTimelineProvider(twitterUserInformationConfiguration);
-
- twitterTimelineProvider.consolidateToIDs();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0813b11e/streams-contrib/streams-provider-twitter/src/test/resources/TwitterTimelineProviderTest.conf
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/resources/TwitterTimelineProviderTest.conf b/streams-contrib/streams-provider-twitter/src/test/resources/TwitterTimelineProviderTest.conf
new file mode 100644
index 0000000..a7862c4
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/test/resources/TwitterTimelineProviderTest.conf
@@ -0,0 +1,4 @@
+twitter.info = [
+ 18055613
+]
+twitter.max_items = 1000
\ No newline at end of file
[7/8] incubator-streams git commit: add 3 more provider ITs,
reorganize test packages
Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4febde27/streams-contrib/streams-provider-twitter/src/test/resources/TwitterUserInformationProviderIT.conf
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/resources/TwitterUserInformationProviderIT.conf b/streams-contrib/streams-provider-twitter/src/test/resources/TwitterUserInformationProviderIT.conf
new file mode 100644
index 0000000..698a2c8
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/test/resources/TwitterUserInformationProviderIT.conf
@@ -0,0 +1,1002 @@
+twitter.info = [
+ 3424266646
+ 3277467241
+ 3244517214
+ 29953647
+ 63818319
+ 1528436754
+ 405580894
+ 322778026
+ 172382176
+ 633076833
+ 703735608
+ 2347223440
+ 2907929487
+ 950240089
+ 1418546592
+ 3318418717
+ 2848958704
+ 1120797264
+ 933623324
+ 2977700375
+ 328204518
+ 585131136
+ 2868789793
+ 158347647
+ 2915413161
+ 2217367263
+ 2534019247
+ 3033565239
+ 377379801
+ 2525341814
+ 3123827524
+ 1840932523
+ 3307643975
+ 3301777832
+ 961987748
+ 3205632255
+ 2799469322
+ 17730681
+ 1495242662
+ 1909516123
+ 263933760
+ 312651511
+ 2479527469
+ 2357151036
+ 346433828
+ 44801893
+ 1049697306
+ 2779673194
+ 18323141
+ 2172488902
+ 2373431930
+ 1038322550
+ 2946211549
+ 2911057543
+ 1186036284
+ 2878076317
+ 1312950464
+ 57323685
+ 32929857
+ 301933631
+ 2852217152
+ 330422649
+ 98470876
+ 933125156
+ 3237125761
+ 914882005
+ 1560239652
+ 900444860
+ 402918702
+ 1820690166
+ 3074359086
+ 353183684
+ 528544881
+ 1881638161
+ 2751762993
+ 3161315692
+ 3305680079
+ 1721613488
+ 513068659
+ 627186234
+ 3203648416
+ 1541163325
+ 1882043502
+ 29071727
+ 610104090
+ 2819781014
+ 2909115204
+ 213886397
+ 3249385591
+ 3086875073
+ 87040031
+ 2202487475
+ 334896132
+ 49163181
+ 3433984816
+ 543969362
+ 489445461
+ 855051894
+ 2792040175
+ 117051455
+ 438599410
+ 1387329846
+ 711595782
+ 3230662766
+ 2766672269
+ 2926781875
+ 863203928
+ 517199566
+ 201645935
+ 1555939147
+ 2943152669
+ 1324775431
+ 400234897
+ 2347416842
+ 1558112510
+ 474415350
+ 2153710970
+ 1408335014
+ 3633713483
+ 3166021013
+ 3530993294
+ 332598229
+ 308252069
+ 3317826986
+ 572175644
+ 1718271572
+ 2869090090
+ 23725109
+ 1926137280
+ 1486830500
+ 743080386
+ 3250479720
+ 2560441544
+ 2715649872
+ 287089153
+ 18761334
+ 2305577745
+ 724860668
+ 193306049
+ 2615761979
+ 2463299598
+ 1436916012
+ 919019185
+ 90502449
+ 50689522
+ 1383774679
+ 612784850
+ 410319975
+ 833440153
+ 442322844
+ 2181167094
+ 94012832
+ 112748352
+ 1474618075
+ 158262669
+ 2391506308
+ 882502026
+ 2693660146
+ 2971933908
+ 55271184
+ 2287356556
+ 2895756090
+ 407147132
+ 3262181
+ 313317193
+ 2729137002
+ 2939122360
+ 2751601568
+ 1215082350
+ 124866576
+ 274292311
+ 3310301042
+ 95407473
+ 24993769
+ 1342908648
+ 1805339413
+ 3118252036
+ 893269387
+ 1481149014
+ 463288019
+ 75008083
+ 2895489727
+ 965493739
+ 278637248
+ 1937513246
+ 422218268
+ 3320995462
+ 78682286
+ 2777069098
+ 2909553730
+ 2914338670
+ 1251667531
+ 2764034755
+ 532659717
+ 269002510
+ 29373713
+ 358075450
+ 633880614
+ 200374379
+ 141628294
+ 1513028977
+ 116798791
+ 2937455354
+ 246194623
+ 793925970
+ 115594167
+ 82463176
+ 324774974
+ 185844856
+ 2462295999
+ 3555105016
+ 1029169117
+ 2689309484
+ 1587145976
+ 1607241271
+ 3032276402
+ 183916933
+ 63766245
+ 151217255
+ 2781098109
+ 252081559
+ 1608788256
+ 41984573
+ 1896587353
+ 40136999
+ 295505814
+ 384867933
+ 116947371
+ 255703939
+ 2687800732
+ 76543916
+ 881649782
+ 2765729924
+ 1715695669
+ 1965383022
+ 2888214228
+ 21820514
+ 1727966414
+ 2581992818
+ 103999565
+ 741018846
+ 446792386
+ 2568989424
+ 2780674777
+ 465934916
+ 3378294885
+ 2885604327
+ 3336273419
+ 130742941
+ 2327629099
+ 1103818104
+ 3050036073
+ 2882456842
+ 2702914248
+ 2153674818
+ 132825659
+ 289758699
+ 2995946100
+ 3027449217
+ 2708029160
+ 1529367002
+ 608170333
+ 140446819
+ 2790688993
+ 1597308192
+ 14462028
+ 104062608
+ 370274893
+ 356145607
+ 566542629
+ 112587243
+ 39372070
+ 146853060
+ 2440984657
+ 3074554539
+ 204701034
+ 887623447
+ 1971521630
+ 2457208175
+ 466113358
+ 1574643830
+ 1465533884
+ 2500404589
+ 1633154150
+ 1349117870
+ 1658071267
+ 593022891
+ 3094177813
+ 1304672510
+ 3385525697
+ 2916225552
+ 2759773715
+ 1369215552
+ 1058390078
+ 2532850321
+ 351483656
+ 1902796704
+ 113000738
+ 2241245557
+ 2416606754
+ 408729540
+ 2530294556
+ 2936808249
+ 3138999692
+ 2679987883
+ 1448537377
+ 2524773906
+ 942079406
+ 2217584389
+ 3059427504
+ 3028507725
+ 632766658
+ 3302663431
+ 2914832897
+ 93487101
+ 2786054379
+ 1339647769
+ 531402307
+ 402066474
+ 337936675
+ 2760568625
+ 1385916396
+ 2595560922
+ 421910477
+ 1713100813
+ 352016040
+ 415247994
+ 1883606209
+ 2974994111
+ 1118022211
+ 3096979637
+ 711889867
+ 262890561
+ 233810062
+ 1877177168
+ 964106670
+ 164985413
+ 2920420361
+ 318936782
+ 3289826764
+ 145873735
+ 2523059919
+ 2409896179
+ 2292047201
+ 285674825
+ 2765549780
+ 2359541905
+ 2419103894
+ 358884588
+ 206231205
+ 136500778
+ 1397885138
+ 2625422097
+ 2524578002
+ 604278657
+ 2625634867
+ 73168019
+ 407448958
+ 189276174
+ 2507896925
+ 80880449
+ 520177827
+ 418469102
+ 2925075456
+ 615730636
+ 2995998941
+ 2697270934
+ 497135011
+ 2944598402
+ 428706893
+ 1345291712
+ 388751708
+ 130092079
+ 2984741882
+ 1047514436
+ 15927135
+ 2884357840
+ 294362779
+ 2870985800
+ 1720400449
+ 130027314
+ 2970518577
+ 240923858
+ 1613498838
+ 708321211
+ 1403382426
+ 2602186970
+ 1596855998
+ 280062526
+ 2716454552
+ 268720451
+ 2869044811
+ 1911762488
+ 392373280
+ 2151082712
+ 2770919004
+ 231541900
+ 60122778
+ 390006102
+ 240167506
+ 1558314660
+ 221608257
+ 852829933
+ 461669243
+ 239778483
+ 502146157
+ 1471963970
+ 276426707
+ 2336546150
+ 323595235
+ 128670043
+ 1308641714
+ 1411112756
+ 3011727217
+ 3082006921
+ 450537474
+ 2673101407
+ 2416030447
+ 51952627
+ 708057486
+ 833620748
+ 3024957797
+ 2147572362
+ 1712467098
+ 2899300501
+ 1348351772
+ 2923114629
+ 2779232814
+ 21306308
+ 1466314507
+ 1224588289
+ 81307783
+ 42717316
+ 315972617
+ 434649827
+ 105839296
+ 366063496
+ 34045892
+ 3076447389
+ 92437198
+ 3124335006
+ 1444393410
+ 351737762
+ 1919360383
+ 2836048345
+ 1670939112
+ 722140159
+ 92939425
+ 2932728756
+ 2831872033
+ 1354255123
+ 1689738186
+ 463578260
+ 2881582438
+ 912252510
+ 3226221887
+ 390827200
+ 269169237
+ 1450007192
+ 2735984326
+ 3029836305
+ 28291382
+ 785668627
+ 567287970
+ 1480004420
+ 131927864
+ 2958631308
+ 488490020
+ 2603422688
+ 3186614985
+ 177373618
+ 2466506329
+ 2651294251
+ 3367170684
+ 2673870882
+ 369098635
+ 242011326
+ 18099277
+ 1922210574
+ 3093762445
+ 470634878
+ 1674607392
+ 2920526283
+ 3261677580
+ 2192187078
+ 485599960
+ 1854850729
+ 95198467
+ 2228217740
+ 2171528344
+ 2957461230
+ 226615737
+ 1624183567
+ 158597677
+ 2909224690
+ 19278114
+ 2488284258
+ 2777071149
+ 1598064697
+ 2740691127
+ 3100908480
+ 1147010126
+ 2741161553
+ 439971668
+ 3247227273
+ 2884261062
+ 3127250575
+ 2942021278
+ 539428196
+ 409599986
+ 3161801331
+ 2328613860
+ 1903013437
+ 313082004
+ 2580495721
+ 209464435
+ 600172085
+ 339541217
+ 62219810
+ 583287316
+ 295891933
+ 561683767
+ 229192352
+ 1357869918
+ 235438136
+ 1599249169
+ 583879210
+ 507744802
+ 1696336261
+ 2323537206
+ 36882220
+ 541528426
+ 956202559
+ 387936537
+ 211658842
+ 2685186010
+ 2581656488
+ 391154378
+ 122932105
+ 409764153
+ 129737967
+ 2848806360
+ 3054860719
+ 372199585
+ 2316121597
+ 703345746
+ 3335505287
+ 2466151422
+ 380038166
+ 420561214
+ 2977085351
+ 110955327
+ 3004295886
+ 2362857361
+ 3053844460
+ 3182081552
+ 324208260
+ 2571790321
+ 1061498868
+ 2187395299
+ 2187482779
+ 3096652530
+ 2538239672
+ 3809634552
+ 2306848839
+ 1544061547
+ 151075965
+ 3250238556
+ 16157689
+ 1692663644
+ 1356000732
+ 436774994
+ 45503055
+ 1086037316
+ 2798297775
+ 2923485772
+ 58731726
+ 211816170
+ 885013716
+ 2608529078
+ 2954917057
+ 2271021600
+ 173743066
+ 451543575
+ 3219728436
+ 399824828
+ 2464688153
+ 2541069631
+ 1522892262
+ 3167829845
+ 944851321
+ 2471474509
+ 68073858
+ 1496221376
+ 13979882
+ 2218792189
+ 302123873
+ 2845915546
+ 431402814
+ 1364254945
+ 2711277666
+ 2766696876
+ 2495441323
+ 2844317433
+ 138009079
+ 2578631100
+ 478167529
+ 1222728360
+ 1323688411
+ 2883066187
+ 2443554697
+ 411631689
+ 68537682
+ 1027019269
+ 1660752493
+ 987324488
+ 2764106926
+ 2184511674
+ 103419315
+ 2310456424
+ 1572938088
+ 2554895281
+ 34138105
+ 2942100621
+ 160517898
+ 285075974
+ 2260805169
+ 19390498
+ 301696842
+ 2588239985
+ 2886588596
+ 2962622367
+ 1867897483
+ 2827053488
+ 1447767319
+ 2924491293
+ 167327096
+ 3309592402
+ 2795575638
+ 578758971
+ 2888665561
+ 30542348
+ 1437049609
+ 2242541566
+ 74354017
+ 58900854
+ 2159055031
+ 246517688
+ 2916873012
+ 1110055280
+ 562430843
+ 761797794
+ 1648208552
+ 301483343
+ 2896842048
+ 522103295
+ 1578517986
+ 2659610776
+ 2890560429
+ 1427665578
+ 268363160
+ 563709041
+ 2172300002
+ 2791262431
+ 3039809351
+ 2914940301
+ 2746560353
+ 2892191616
+ 71596845
+ 233770184
+ 1530949130
+ 105906110
+ 755347622
+ 490836906
+ 357603454
+ 324517203
+ 2835402315
+ 3285479894
+ 86368327
+ 238219970
+ 3153173945
+ 2732361234
+ 2357626327
+ 346602505
+ 13732632
+ 44055265
+ 2998032219
+ 482072312
+ 1721073866
+ 1386781034
+ 168194206
+ 1213443144
+ 181296114
+ 942598400
+ 2955577216
+ 582056669
+ 747540468
+ 2371722140
+ 360824004
+ 3023711736
+ 207032580
+ 2748107976
+ 464428175
+ 3150849096
+ 85450014
+ 2840066340
+ 2287819200
+ 240931426
+ 553606800
+ 397876544
+ 2195298230
+ 2601812005
+ 3013344739
+ 17599363
+ 1572639314
+ 3377673407
+ 303420278
+ 2811879995
+ 526860891
+ 346333874
+ 113568311
+ 705488304
+ 3238867619
+ 333772149
+ 373309716
+ 300472003
+ 3223424681
+ 2895699896
+ 3241119570
+ 1147453440
+ 3135402609
+ 521763744
+ 2702966971
+ 2878317616
+ 845031697
+ 2855454471
+ 3051902539
+ 482306439
+ 129173738
+ 306572138
+ 2941951538
+ 762707233
+ 2732608168
+ 1228456939
+ 246020724
+ 1920607602
+ 14434245
+ 1254943537
+ 1520746602
+ 150745124
+ 1350160351
+ 38707222
+ 267766858
+ 2992121760
+ 712666764
+ 983036864
+ 289490939
+ 269797384
+ 100215048
+ 3099557245
+ 2339741570
+ 306005146
+ 1182227460
+ 288235870
+ 1412832260
+ 455190443
+ 489912183
+ 448994061
+ 2944595072
+ 2453094914
+ 2899434206
+ 59288818
+ 2824706688
+ 423363992
+ 972850482
+ 997868714
+ 1203750733
+ 176147179
+ 115110596
+ 2978397615
+ 2528946267
+ 620180433
+ 365949935
+ 110609853
+ 1533494268
+ 2723839166
+ 34186887
+ 2864430424
+ 76942977
+ 361086733
+ 2724200587
+ 635206139
+ 2757801421
+ 19651443
+ 3364322949
+ 2770576744
+ 2168612560
+ 764020297
+ 2558268513
+ 2855384901
+ 1881414907
+ 2502212139
+ 3250037586
+ 2525185944
+ 591375982
+ 707911211
+ 3025041666
+ 19785599
+ 2311172950
+ 922817815
+ 739363530
+ 2812894393
+ 2496283986
+ 206162815
+ 590916342
+ 354053245
+ 2735195854
+ 2788759128
+ 3510947235
+ 3490740532
+ 2920847304
+ 2681444558
+ 2856805755
+ 3103899682
+ 145893832
+ 3065663910
+ 2736009516
+ 2835226230
+ 1590913771
+ 2700889555
+ 2221272164
+ 109780161
+ 700221218
+ 541753453
+ 126575915
+ 274336817
+ 2498172455
+ 2809515630
+ 2588774684
+ 296734891
+ 2212410182
+ 243027454
+ 1336526904
+ 397062736
+ 449331876
+ 30619307
+ 2310483811
+ 2437586509
+ 191710730
+ 1084185378
+ 2831486681
+ 1606477879
+ 969600636
+ 529783214
+ 2928131586
+ 190041293
+ 2967031274
+ 2165962781
+ 376501355
+ 284137985
+ 266863824
+ 407944074
+ 108456036
+ 1641294422
+ 900733706
+ 1063071450
+ 1682722328
+ 341419520
+ 1644293778
+ 2245151467
+ 511176989
+ 241922669
+ 3388315624
+ 1909431145
+ 2223820028
+ 600581315
+ 1723555076
+ 2748445313
+ 561211823
+ 561022931
+ 2751429993
+ 2714908343
+ 16165257
+ 524623359
+ 306741266
+ 469994381
+ 2561892084
+ 998802661
+ 1492924374
+ 789039140
+ 210150093
+ 817544820
+ 35740178
+ 326162841
+ 1447331628
+ 17493441
+ 2874693608
+ 965027312
+ 261936985
+ 510564259
+ 728031187
+ 164696234
+ 2204519310
+ 1626241164
+ 1024940588
+ 221486613
+ 571084565
+ 3029264508
+ 221716563
+ 2211417135
+ 499972359
+ 1565989165
+ 2436927208
+ 381029291
+ 2730580620
+ 3436438413
+ 2466014604
+ 538990742
+ 2935470687
+ 1162845468
+ 468108082
+ 2383897542
+ 2542119658
+ 1962281514
+ 171235080
+ 536915535100125185
+ 2841076618
+ 3006098500
+ 1057158554
+ 3245676721
+ 251087536
+ 3082811549
+ 281785349
+ 1674871100
+ 1898659951
+ 1414854156
+ 428693618
+ 2385953101
+ 2281213477
+ 2786368894
+ 2253203998
+ 357277727
+ 1358707970
+ 545186198
+ 3033613587
+ 107121821
+ 595965259
+ 583894637
+ 1306698787
+ 442262869
+ 2868353318
+ 1908436844
+ 271982042
+ 495202171
+ 251586884
+ 3151032974
+ 2213682568
+ 1203133039
+ 193128957
+ 597407120
+ 2781102086
+ 369254505
+ 62831036
+ 2328734640
+ 2579064082
+ 3271313827
+ 2880366619
+ 2323026113
+ 446380518
+ 245418139
+ 261211664
+ 1893329208
+ 3406596309
+ 584967077
+ 1708862304
+ 388961426
+ 2421535351
+ 2194375668
+ 2790313673
+ 2728894977
+ 2829174824
+ 784541196
+ 959902393
+ 249705367
+ 1677679309
+ 2825975175
+ 1305768366
+ 373475046
+ 785362464
+ 419607671
+ 61031675
+ 3854236343
+ 714603248
+ 1301447720
+ 827660912
+ 2383764684
+ 3180084906
+ 3265558124
+ 608536922
+ 238943561
+]
\ No newline at end of file
[6/8] incubator-streams git commit: more main methods: STREAMS-411,
better thread tracking: STREAMS-425, misc cleanup
Posted by sb...@apache.org.
more main methods: STREAMS-411, better thread tracking: STREAMS-425, misc cleanup
more main methods: STREAMS-411
better thread tracking: STREAMS-425
misc cleanup
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/170cb8b6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/170cb8b6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/170cb8b6
Branch: refs/heads/master
Commit: 170cb8b6b9d647dc2b7ff82b87edf060f078585c
Parents: f1540b1
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Thu Oct 6 14:01:04 2016 -0500
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Thu Oct 6 14:01:04 2016 -0500
----------------------------------------------------------------------
.../provider/TwitterFollowingProvider.java | 120 +++++++---
.../twitter/provider/TwitterStreamProvider.java | 55 +++++
.../provider/TwitterTimelineProvider.java | 191 ++++++++--------
.../TwitterUserInformationProvider.java | 227 ++++++++++++-------
.../twitter/TwitterFollowingConfiguration.json | 2 +-
5 files changed, 386 insertions(+), 209 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/170cb8b6/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
index 4c3a828..66c1104 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
@@ -18,22 +18,43 @@
package org.apache.streams.twitter.provider;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.Uninterruptibles;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.DatumStatusCounter;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.twitter.TwitterFollowingConfiguration;
+import org.apache.streams.twitter.TwitterStreamConfiguration;
+import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
import org.apache.streams.util.ComponentUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import twitter4j.Twitter;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -49,6 +70,49 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider {
private TwitterFollowingConfiguration config;
+ List<ListenableFuture<Object>> futures = new ArrayList<>();
+
+ public static void main(String[] args) throws Exception {
+
+ Preconditions.checkArgument(args.length >= 2);
+
+ String configfile = args[0];
+ String outfile = args[1];
+
+ Config reference = ConfigFactory.load();
+ File conf_file = new File(configfile);
+ assert(conf_file.exists());
+ Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+
+ Config typesafe = testResourceConfig.withFallback(reference).resolve();
+
+ StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe);
+ TwitterFollowingConfiguration config = new ComponentConfigurator<>(TwitterFollowingConfiguration.class).detectConfiguration(typesafe, "twitter");
+ TwitterFollowingProvider provider = new TwitterFollowingProvider(config);
+
+ ObjectMapper mapper = new StreamsJacksonMapper(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT));
+
+ PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
+ provider.prepare(config);
+ provider.startStream();
+ do {
+ Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
+ Iterator<StreamsDatum> iterator = provider.readCurrent().iterator();
+ while(iterator.hasNext()) {
+ StreamsDatum datum = iterator.next();
+ String json;
+ try {
+ json = mapper.writeValueAsString(datum.getDocument());
+ outStream.println(json);
+ } catch (JsonProcessingException e) {
+ System.err.println(e.getMessage());
+ }
+ }
+ } while( provider.isRunning());
+ provider.cleanUp();
+ outStream.flush();
+ }
+
public TwitterFollowingConfiguration getConfig() { return config; }
public static final int MAX_NUMBER_WAITING = 10000;
@@ -63,14 +127,24 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider {
}
@Override
+ public void prepare(Object o) {
+ super.prepare(config);
+ Preconditions.checkNotNull(getConfig().getEndpoint());
+ Preconditions.checkArgument(getConfig().getEndpoint().equals("friends") || getConfig().getEndpoint().equals("followers"));
+ return;
+ }
+
+ @Override
public void startStream() {
- running.set(true);
+ Preconditions.checkNotNull(executor);
Preconditions.checkArgument(idsBatches.hasNext() || screenNameBatches.hasNext());
LOGGER.info("startStream");
+ running.set(true);
+
while (idsBatches.hasNext()) {
submitFollowingThreads(idsBatches.next());
}
@@ -78,8 +152,6 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider {
submitFollowingThreads(screenNameBatches.next());
}
- running.set(true);
-
executor.shutdown();
}
@@ -89,7 +161,9 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider {
for (int i = 0; i < ids.length; i++) {
TwitterFollowingProviderTask providerTask = new TwitterFollowingProviderTask(this, client, ids[i]);
- executor.submit(providerTask);
+ ListenableFuture future = executor.submit(providerTask);
+ futures.add(future);
+ LOGGER.info("submitted {}", ids[i]);
}
}
@@ -98,7 +172,9 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider {
for (int i = 0; i < screenNames.length; i++) {
TwitterFollowingProviderTask providerTask = new TwitterFollowingProviderTask(this, client, screenNames[i]);
- executor.submit(providerTask);
+ ListenableFuture future = executor.submit(providerTask);
+ futures.add(future);
+ LOGGER.info("submitted {}", screenNames[i]);
}
}
@@ -120,41 +196,17 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider {
lock.writeLock().unlock();
}
- if (providerQueue.isEmpty() && executor.isTerminated()) {
- LOGGER.info("{}{} - completed", idsBatches, screenNameBatches);
-
- running.set(false);
-
- LOGGER.info("Exiting");
- }
-
return result;
}
- protected Queue<StreamsDatum> constructQueue() {
- return new ConcurrentLinkedQueue<StreamsDatum>();
- }
-
- @Override
- public void prepare(Object o) {
- super.prepare(config);
- Preconditions.checkNotNull(getConfig().getEndpoint());
- Preconditions.checkArgument(getConfig().getEndpoint().equals("friends") || getConfig().getEndpoint().equals("followers"));
- return;
- }
-
- public void addDatum(StreamsDatum datum) {
- try {
- lock.readLock().lock();
- ComponentUtils.offerUntilSuccess(datum, providerQueue);
- } finally {
- lock.readLock().unlock();
- }
- }
-
@Override
public boolean isRunning() {
+ if (providerQueue.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone()) {
+ LOGGER.info("Completed");
+ running.set(false);
+ LOGGER.info("Exiting");
+ }
return running.get();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/170cb8b6/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
index f584950..b414074 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
@@ -18,9 +18,12 @@
package org.apache.streams.twitter.provider;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
+import com.google.common.util.concurrent.Uninterruptibles;
import com.twitter.hbc.ClientBuilder;
import com.twitter.hbc.core.Constants;
import com.twitter.hbc.core.Hosts;
@@ -35,7 +38,11 @@ import com.twitter.hbc.httpclient.auth.Authentication;
import com.twitter.hbc.httpclient.auth.BasicAuth;
import com.twitter.hbc.httpclient.auth.OAuth1;
import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
import org.apache.commons.lang.NotImplementedException;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.DatumStatus;
import org.apache.streams.core.DatumStatusCountable;
@@ -43,14 +50,21 @@ import org.apache.streams.core.DatumStatusCounter;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.twitter.TwitterStreamConfiguration;
+import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
import org.apache.streams.util.ComponentUtils;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
import java.io.Serializable;
import java.math.BigInteger;
+import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
@@ -72,6 +86,47 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
private final static Logger LOGGER = LoggerFactory.getLogger(TwitterStreamProvider.class);
+ public static void main(String[] args) throws Exception {
+
+ Preconditions.checkArgument(args.length >= 2);
+
+ String configfile = args[0];
+ String outfile = args[1];
+
+ Config reference = ConfigFactory.load();
+ File conf_file = new File(configfile);
+ assert(conf_file.exists());
+ Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+
+ Config typesafe = testResourceConfig.withFallback(reference).resolve();
+
+ StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe);
+ TwitterStreamConfiguration config = new ComponentConfigurator<>(TwitterStreamConfiguration.class).detectConfiguration(typesafe, "twitter");
+ TwitterStreamProvider provider = new TwitterStreamProvider(config);
+
+ ObjectMapper mapper = new StreamsJacksonMapper(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT));
+
+ PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
+ provider.prepare(config);
+ provider.startStream();
+ do {
+ Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
+ Iterator<StreamsDatum> iterator = provider.readCurrent().iterator();
+ while(iterator.hasNext()) {
+ StreamsDatum datum = iterator.next();
+ String json;
+ try {
+ json = mapper.writeValueAsString(datum.getDocument());
+ outStream.println(json);
+ } catch (JsonProcessingException e) {
+ System.err.println(e.getMessage());
+ }
+ }
+ } while( provider.isRunning());
+ provider.cleanUp();
+ outStream.flush();
+ }
+
public static final int MAX_BATCH = 1000;
private TwitterStreamConfiguration config;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/170cb8b6/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index 2924623..cea9829 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -22,6 +22,10 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
@@ -68,6 +72,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+
/**
* Retrieve recent posts from a list of user ids or names.
*
@@ -91,7 +97,39 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
private final static Logger LOGGER = LoggerFactory.getLogger(TwitterTimelineProvider.class);
- private static ObjectMapper MAPPER = new StreamsJacksonMapper(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT));
+ public static final int MAX_NUMBER_WAITING = 10000;
+
+ private TwitterUserInformationConfiguration config;
+
+ protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ public TwitterUserInformationConfiguration getConfig() {
+ return config;
+ }
+
+ public void setConfig(TwitterUserInformationConfiguration config) {
+ this.config = config;
+ }
+
+ protected Collection<String[]> screenNameBatches;
+ protected Collection<Long> ids;
+
+ protected volatile Queue<StreamsDatum> providerQueue;
+
+ protected int idsCount;
+ protected Twitter client;
+
+ protected ListeningExecutorService executor;
+
+ protected DateTime start;
+ protected DateTime end;
+
+ protected final AtomicBoolean running = new AtomicBoolean();
+
+ List<ListenableFuture<Object>> futures = new ArrayList<>();
+
+ Boolean jsonStoreEnabled;
+ Boolean includeEntitiesEnabled;
public static void main(String[] args) throws Exception {
@@ -111,6 +149,8 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
TwitterUserInformationConfiguration config = new ComponentConfigurator<>(TwitterUserInformationConfiguration.class).detectConfiguration(typesafe, "twitter");
TwitterTimelineProvider provider = new TwitterTimelineProvider(config);
+ ObjectMapper mapper = new StreamsJacksonMapper(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT));
+
PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
provider.prepare(config);
provider.startStream();
@@ -121,7 +161,7 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
StreamsDatum datum = iterator.next();
String json;
try {
- json = MAPPER.writeValueAsString(datum.getDocument());
+ json = mapper.writeValueAsString(datum.getDocument());
outStream.println(json);
} catch (JsonProcessingException e) {
System.err.println(e.getMessage());
@@ -132,42 +172,6 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
outStream.flush();
}
- public static final int MAX_NUMBER_WAITING = 10000;
-
- private TwitterUserInformationConfiguration config;
-
- protected final ReadWriteLock lock = new ReentrantReadWriteLock();
-
- public TwitterUserInformationConfiguration getConfig() {
- return config;
- }
-
- public void setConfig(TwitterUserInformationConfiguration config) {
- this.config = config;
- }
-
- protected Collection<String[]> screenNameBatches;
- protected Collection<Long> ids;
-
- protected volatile Queue<StreamsDatum> providerQueue;
-
- protected int idsCount;
- protected Twitter client;
-
- protected ExecutorService executor;
-
- protected DateTime start;
- protected DateTime end;
-
- protected final AtomicBoolean running = new AtomicBoolean();
-
- Boolean jsonStoreEnabled;
- Boolean includeEntitiesEnabled;
-
- private static ExecutorService getExecutor() {
- return Executors.newSingleThreadExecutor();
- }
-
public TwitterTimelineProvider(TwitterUserInformationConfiguration config) {
this.config = config;
}
@@ -182,17 +186,43 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
}
@Override
+ public void prepare(Object o) {
+
+
+
+ try {
+ lock.writeLock().lock();
+ providerQueue = constructQueue();
+ } finally {
+ lock.writeLock().unlock();
+ }
+
+ Preconditions.checkNotNull(providerQueue);
+ Preconditions.checkNotNull(config.getOauth().getConsumerKey());
+ Preconditions.checkNotNull(config.getOauth().getConsumerSecret());
+ Preconditions.checkNotNull(config.getOauth().getAccessToken());
+ Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret());
+ Preconditions.checkNotNull(config.getInfo());
+
+ consolidateToIDs();
+
+ if(ids.size() > 1)
+ executor = MoreExecutors.listeningDecorator(TwitterUserInformationProvider.newFixedThreadPoolWithQueueSize(5, ids.size()));
+ else
+ executor = MoreExecutors.listeningDecorator(newSingleThreadExecutor());
+ }
+
+ @Override
public void startStream() {
+
LOGGER.debug("{} startStream", STREAMS_ID);
Preconditions.checkArgument(!ids.isEmpty());
- LOGGER.debug("{} - readCurrent", ids);
+ running.set(true);
submitTimelineThreads(ids.toArray(new Long[0]));
- running.set(true);
-
executor.shutdown();
}
@@ -202,13 +232,15 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
}
protected void submitTimelineThreads(Long[] ids) {
+
Twitter client = getTwitterClient();
for(int i = 0; i < ids.length; i++) {
TwitterTimelineProviderTask providerTask = new TwitterTimelineProviderTask(this, client, ids[i]);
- executor.submit(providerTask);
-
+ ListenableFuture future = executor.submit(providerTask);
+ futures.add(future);
+ LOGGER.info("submitted {}", ids[i]);
}
}
@@ -242,7 +274,7 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
lock.writeLock().unlock();
}
- if( providerQueue.isEmpty() && executor.isTerminated()) {
+ if( result.size() == 0 && providerQueue.isEmpty() && executor.isTerminated() ) {
LOGGER.info("Finished. Cleaning up...");
running.set(false);
@@ -268,50 +300,7 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
throw new NotImplementedException();
}
- @Override
- public boolean isRunning() {
- return running.get();
- }
-
- void shutdownAndAwaitTermination(ExecutorService pool) {
- pool.shutdown(); // Disable new tasks from being submitted
- try {
- // Wait a while for existing tasks to terminate
- if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
- pool.shutdownNow(); // Cancel currently executing tasks
- // Wait a while for tasks to respond to being cancelled
- if (!pool.awaitTermination(10, TimeUnit.SECONDS))
- System.err.println("Pool did not terminate");
- }
- } catch (InterruptedException ie) {
- // (Re-)Cancel if current thread also interrupted
- pool.shutdownNow();
- // Preserve interrupt status
- Thread.currentThread().interrupt();
- }
- }
-
- @Override
- public void prepare(Object o) {
- executor = getExecutor();
-
- try {
- lock.writeLock().lock();
- providerQueue = constructQueue();
- } finally {
- lock.writeLock().unlock();
- }
-
- Preconditions.checkNotNull(providerQueue);
- Preconditions.checkNotNull(config.getOauth().getConsumerKey());
- Preconditions.checkNotNull(config.getOauth().getConsumerSecret());
- Preconditions.checkNotNull(config.getOauth().getAccessToken());
- Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret());
- Preconditions.checkNotNull(config.getInfo());
-
- consolidateToIDs();
- }
/**
* Using the "info" list that is contained in the configuration, ensure that all
@@ -375,13 +364,31 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
shutdownAndAwaitTermination(executor);
}
- public void addDatum(StreamsDatum datum) {
+ void shutdownAndAwaitTermination(ExecutorService pool) {
+ pool.shutdown(); // Disable new tasks from being submitted
try {
- lock.readLock().lock();
- ComponentUtils.offerUntilSuccess(datum, providerQueue);
- } finally {
- lock.readLock().unlock();
+ // Wait a while for existing tasks to terminate
+ if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+ pool.shutdownNow(); // Cancel currently executing tasks
+ // Wait a while for tasks to respond to being cancelled
+ if (!pool.awaitTermination(10, TimeUnit.SECONDS))
+ System.err.println("Pool did not terminate");
+ }
+ } catch (InterruptedException ie) {
+ // (Re-)Cancel if current thread also interrupted
+ pool.shutdownNow();
+ // Preserve interrupt status
+ Thread.currentThread().interrupt();
}
}
+ @Override
+ public boolean isRunning() {
+ if (providerQueue.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone()) {
+ LOGGER.info("Completed");
+ running.set(false);
+ LOGGER.info("Exiting");
+ }
+ return running.get();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/170cb8b6/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
index 44f8a24..d6e783b 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
@@ -18,13 +18,20 @@
package org.apache.streams.twitter.provider;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.Uninterruptibles;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
import org.apache.commons.lang.NotImplementedException;
import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.DatumStatusCounter;
import org.apache.streams.core.StreamsDatum;
@@ -45,6 +52,10 @@ import twitter4j.TwitterFactory;
import twitter4j.conf.ConfigurationBuilder;
import twitter4j.json.DataObjectFactory;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
import java.io.Serializable;
import java.math.BigInteger;
import java.util.ArrayList;
@@ -75,6 +86,45 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
private TwitterUserInformationConfiguration config;
+ public static void main(String[] args) throws Exception {
+
+ Preconditions.checkArgument(args.length >= 2);
+
+ String configfile = args[0];
+ String outfile = args[1];
+
+ Config reference = ConfigFactory.load();
+ File conf_file = new File(configfile);
+ assert(conf_file.exists());
+ Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+
+ Config typesafe = testResourceConfig.withFallback(reference).resolve();
+
+ StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe);
+ TwitterUserInformationConfiguration config = new ComponentConfigurator<>(TwitterUserInformationConfiguration.class).detectConfiguration(typesafe, "twitter");
+ TwitterUserInformationProvider provider = new TwitterUserInformationProvider(config);
+
+ PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
+ provider.prepare(config);
+ provider.startStream();
+ do {
+ Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
+ Iterator<StreamsDatum> iterator = provider.readCurrent().iterator();
+ while(iterator.hasNext()) {
+ StreamsDatum datum = iterator.next();
+ String json;
+ try {
+ json = MAPPER.writeValueAsString(datum.getDocument());
+ outStream.println(json);
+ } catch (JsonProcessingException e) {
+ System.err.println(e.getMessage());
+ }
+ }
+ } while( provider.isRunning());
+ provider.cleanUp();
+ outStream.flush();
+ }
+
protected final ReadWriteLock lock = new ReentrantReadWriteLock();
protected volatile Queue<StreamsDatum> providerQueue;
@@ -93,7 +143,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
protected final AtomicBoolean running = new AtomicBoolean();
- private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
+ public static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
return new ThreadPoolExecutor(nThreads, nThreads,
5000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
@@ -117,8 +167,88 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
}
@Override
+ public void prepare(Object o) {
+
+ if( o instanceof TwitterFollowingConfiguration )
+ config = (TwitterUserInformationConfiguration) o;
+
+ Preconditions.checkNotNull(config);
+ Preconditions.checkNotNull(config.getOauth());
+ Preconditions.checkNotNull(config.getOauth().getConsumerKey());
+ Preconditions.checkNotNull(config.getOauth().getConsumerSecret());
+ Preconditions.checkNotNull(config.getOauth().getAccessToken());
+ Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret());
+ Preconditions.checkNotNull(config.getInfo());
+
+ try {
+ lock.writeLock().lock();
+ providerQueue = constructQueue();
+ } finally {
+ lock.writeLock().unlock();
+ }
+
+ Preconditions.checkNotNull(providerQueue);
+
+ List<String> screenNames = new ArrayList<String>();
+ List<String[]> screenNameBatches = new ArrayList<String[]>();
+
+ List<Long> ids = new ArrayList<Long>();
+ List<Long[]> idsBatches = new ArrayList<Long[]>();
+
+ for(String s : config.getInfo()) {
+ if(s != null)
+ {
+ String potentialScreenName = s.replaceAll("@", "").trim().toLowerCase();
+
+ // See if it is a long, if it is, add it to the user iD list, if it is not, add it to the
+ // screen name list
+ try {
+ ids.add(Long.parseLong(potentialScreenName));
+ } catch (Exception e) {
+ screenNames.add(potentialScreenName);
+ }
+
+ // Twitter allows for batches up to 100 per request, but you cannot mix types
+
+ if(ids.size() >= 100) {
+ // add the batch
+ idsBatches.add(ids.toArray(new Long[ids.size()]));
+ // reset the Ids
+ ids = new ArrayList<Long>();
+ }
+
+ if(screenNames.size() >= 100) {
+ // add the batch
+ screenNameBatches.add(screenNames.toArray(new String[ids.size()]));
+ // reset the Ids
+ screenNames = new ArrayList<String>();
+ }
+ }
+ }
+
+
+ if(ids.size() > 0)
+ idsBatches.add(ids.toArray(new Long[ids.size()]));
+
+ if(screenNames.size() > 0)
+ screenNameBatches.add(screenNames.toArray(new String[ids.size()]));
+
+ if(ids.size() + screenNames.size() > 0)
+ executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, (ids.size() + screenNames.size())));
+ else
+ executor = MoreExecutors.listeningDecorator(newSingleThreadExecutor());
+
+ Preconditions.checkNotNull(executor);
+
+ this.idsBatches = idsBatches.iterator();
+ this.screenNameBatches = screenNameBatches.iterator();
+ }
+
+ @Override
public void startStream() {
+ Preconditions.checkNotNull(executor);
+
Preconditions.checkArgument(idsBatches.hasNext() || screenNameBatches.hasNext());
LOGGER.info("{}{} - startStream", idsBatches, screenNameBatches);
@@ -214,16 +344,6 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
lock.writeLock().unlock();
}
- if( providerQueue.isEmpty() && executor.isTerminated()) {
- LOGGER.info("{}{} - completed", idsBatches, screenNameBatches);
-
- running.set(false);
-
- LOGGER.info("Exiting");
- }
-
- return result;
-
}
protected Queue<StreamsDatum> constructQueue() {
@@ -246,6 +366,15 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
@Override
public boolean isRunning() {
+
+ if( providerQueue.isEmpty() && executor.isTerminated() ) {
+ LOGGER.info("{}{} - completed", idsBatches, screenNameBatches);
+
+ running.set(false);
+
+ LOGGER.info("Exiting");
+ }
+
return running.get();
}
@@ -267,78 +396,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
}
}
- @Override
- public void prepare(Object o) {
- if( o instanceof TwitterFollowingConfiguration )
- config = (TwitterUserInformationConfiguration) o;
-
- try {
- lock.writeLock().lock();
- providerQueue = constructQueue();
- } finally {
- lock.writeLock().unlock();
- }
-
- Preconditions.checkNotNull(providerQueue);
- Preconditions.checkNotNull(config.getOauth().getConsumerKey());
- Preconditions.checkNotNull(config.getOauth().getConsumerSecret());
- Preconditions.checkNotNull(config.getOauth().getAccessToken());
- Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret());
- Preconditions.checkNotNull(config.getInfo());
-
- List<String> screenNames = new ArrayList<String>();
- List<String[]> screenNameBatches = new ArrayList<String[]>();
-
- List<Long> ids = new ArrayList<Long>();
- List<Long[]> idsBatches = new ArrayList<Long[]>();
-
- for(String s : config.getInfo()) {
- if(s != null)
- {
- String potentialScreenName = s.replaceAll("@", "").trim().toLowerCase();
-
- // See if it is a long, if it is, add it to the user iD list, if it is not, add it to the
- // screen name list
- try {
- ids.add(Long.parseLong(potentialScreenName));
- } catch (Exception e) {
- screenNames.add(potentialScreenName);
- }
-
- // Twitter allows for batches up to 100 per request, but you cannot mix types
-
- if(ids.size() >= 100) {
- // add the batch
- idsBatches.add(ids.toArray(new Long[ids.size()]));
- // reset the Ids
- ids = new ArrayList<Long>();
- }
-
- if(screenNames.size() >= 100) {
- // add the batch
- screenNameBatches.add(screenNames.toArray(new String[ids.size()]));
- // reset the Ids
- screenNames = new ArrayList<String>();
- }
- }
- }
-
-
- if(ids.size() > 0)
- idsBatches.add(ids.toArray(new Long[ids.size()]));
-
- if(screenNames.size() > 0)
- screenNameBatches.add(screenNames.toArray(new String[ids.size()]));
-
- if(ids.size() + screenNames.size() > 0)
- executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, (ids.size() + screenNames.size())));
- else
- executor = MoreExecutors.listeningDecorator(newSingleThreadExecutor());
-
- this.idsBatches = idsBatches.iterator();
- this.screenNameBatches = screenNameBatches.iterator();
- }
protected Twitter getTwitterClient()
{
@@ -359,6 +417,11 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
return new TwitterFactory(builder.build()).getInstance();
}
+ protected void callback() {
+
+
+ }
+
@Override
public void cleanUp() {
shutdownAndAwaitTermination(executor);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/170cb8b6/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterFollowingConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterFollowingConfiguration.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterFollowingConfiguration.json
index c72f3cf..dda5d1b 100644
--- a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterFollowingConfiguration.json
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterFollowingConfiguration.json
@@ -12,7 +12,7 @@
"ids_only": {
"type": "boolean",
"description": "Whether to collect ids only, or full profiles",
- "value": "true"
+ "default": "true"
}
}
}
\ No newline at end of file
[8/8] incubator-streams git commit: add 3 more provider ITs,
reorganize test packages
Posted by sb...@apache.org.
add 3 more provider ITs, reorganize test packages
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/4febde27
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/4febde27
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/4febde27
Branch: refs/heads/master
Commit: 4febde277d428fe0a3fcd9de55b7eaa3899cf4d0
Parents: 170cb8b
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Thu Oct 6 17:56:40 2016 -0500
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Thu Oct 6 17:56:40 2016 -0500
----------------------------------------------------------------------
.../twitter/provider/TwitterStreamProvider.java | 11 +-
.../TwitterUserInformationProvider.java | 2 +
.../provider/TwitterTimelineProviderIT.java | 64 --
.../test/TwitterActivityConvertersTest.java | 86 --
.../TwitterActivityObjectsConvertersTest.java | 55 -
.../test/TwitterDocumentClassifierTest.java | 88 --
.../twitter/test/TwitterObjectMapperIT.java | 132 ---
.../test/data/TwitterObjectMapperIT.java | 132 +++
.../providers/TwitterFollowingProviderIT.java | 52 +
.../test/providers/TwitterStreamProviderIT.java | 64 ++
.../providers/TwitterTimelineProviderIT.java | 52 +
.../TwitterUserInformationProviderIT.java | 52 +
.../utils/TwitterActivityConvertersTest.java | 86 ++
.../TwitterActivityObjectsConvertersTest.java | 47 +
.../utils/TwitterDocumentClassifierTest.java | 88 ++
.../resources/TwitterFollowingProviderIT.conf | 8 +
.../test/resources/TwitterStreamProviderIT.conf | 6 +
.../TwitterUserInformationProviderIT.conf | 1002 ++++++++++++++++++
18 files changed, 1600 insertions(+), 427 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4febde27/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
index b414074..eac1218 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
@@ -60,6 +60,7 @@ import org.slf4j.LoggerFactory;
import java.io.BufferedOutputStream;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.io.Serializable;
@@ -86,7 +87,7 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
private final static Logger LOGGER = LoggerFactory.getLogger(TwitterStreamProvider.class);
- public static void main(String[] args) throws Exception {
+ public static void main(String[] args) {
Preconditions.checkArgument(args.length >= 2);
@@ -106,7 +107,13 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
ObjectMapper mapper = new StreamsJacksonMapper(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT));
- PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
+ PrintStream outStream = null;
+ try {
+ outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
+ } catch (FileNotFoundException e) {
+ LOGGER.error("FileNotFoundException", e);
+ return;
+ }
provider.prepare(config);
provider.startStream();
do {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4febde27/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
index d6e783b..15ff791 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
@@ -344,6 +344,8 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
lock.writeLock().unlock();
}
+ return result;
+
}
protected Queue<StreamsDatum> constructQueue() {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4febde27/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderIT.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderIT.java
deleted file mode 100644
index f21a87e..0000000
--- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderIT.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.provider;
-
-import com.google.common.collect.Lists;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigParseOptions;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfiguration;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.twitter.TwitterUserInformationConfiguration;
-import org.junit.Test;
-
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FileReader;
-import java.io.LineNumberReader;
-import java.io.OutputStream;
-import java.io.PrintStream;
-import java.util.Arrays;
-import java.util.List;
-
-public class TwitterTimelineProviderIT {
-
- @Test
- public void testTwitterTimelineProvider() throws Exception {
-
- String configfile = "./target/test-classes/TwitterTimelineProviderIT.conf";
- String outfile = "./target/test-classes/TwitterTimelineProviderIT.txt";
-
- TwitterTimelineProvider.main(Lists.newArrayList(configfile, outfile).toArray(new String[2]));
-
- File out = new File("target/test-classes/TwitterTimelineProviderTest.stdout.txt");
- assert (out.exists());
- assert (out.canRead());
- assert (out.isFile());
-
- FileReader outReader = new FileReader(out);
- LineNumberReader outCounter = new LineNumberReader(outReader);
-
- while(outCounter.readLine() != null) {}
-
- assert (outCounter.getLineNumber() == 1000);
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4febde27/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterActivityConvertersTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterActivityConvertersTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterActivityConvertersTest.java
deleted file mode 100644
index 5e0473b..0000000
--- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterActivityConvertersTest.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.test;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Lists;
-import org.apache.streams.converter.ActivityConverterUtil;
-import org.apache.streams.data.util.ActivityUtil;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-/**
- * Tests {@link: org.apache.streams.twitter.converter.*}
- */
-public class TwitterActivityConvertersTest {
-
- private final static Logger LOGGER = LoggerFactory.getLogger(TwitterActivityConvertersTest.class);
-
- private ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT));
-
- private ActivityConverterUtil activityConverterUtil = ActivityConverterUtil.getInstance();
-
- private String tweet = "{\"created_at\":\"Wed Dec 11 22:27:34 +0000 2013\",\"id\":12345,\"id_str\":\"12345\",\"text\":\"text\",\"source\":\"source\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":91407775,\"id_str\":\"12345\",\"name\":\"name\",\"screen_name\":\"screen_name\",\"location\":\"\",\"url\":null,\"description\":null,\"protected\":false,\"followers_count\":136,\"friends_count\":0,\"listed_count\":1,\"created_at\":\"Fri Nov 20 19:29:02 +0000 2009\",\"favourites_count\":0,\"utc_offset\":null,\"time_zone\":null,\"geo_enabled\":false,\"verified\":false,\"statuses_count\":1793,\"lang\":\"en\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C0DEED\",\"profile_background_image_url\":\"http:\\/\\/profile_background_image_url.png\",\"profile_background_image_url_https\":\"https:\\/\\/profile_b
ackground_image_url_https.png\",\"profile_background_tile\":false,\"profile_image_url\":\"http:\\/\\/profile_image_url.jpg\",\"profile_image_url_https\":\"https:\\/\\/profile_image_url_https.jpg\",\"profile_link_color\":\"0084B4\",\"profile_sidebar_border_color\":\"C0DEED\",\"profile_sidebar_fill_color\":\"DDEEF6\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"default_profile\":true,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[{\"url\":\"http:\\/\\/url\",\"expanded_url\":\"http:\\/\\/expanded_url\",\"display_url\":\"display_url\",\"indices\":[118,140]}],\"user_mentions\":[]},\"favorited\":false,\"retweeted\":false,\"possibly_sensitive\":false,\"filter_level\":\"medium\",\"lang\":\"en\"}\n";
- private String retweet = "{\"created_at\":\"Wed Dec 11 22:27:34 +0000 2013\",\"id\":23456,\"id_str\":\"23456\",\"text\":\"text\",\"source\":\"web\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":163149656,\"id_str\":\"34567\",\"name\":\"name\",\"screen_name\":\"screen_name\",\"location\":\"location\",\"url\":\"http:\\/\\/www.youtube.com\\/watch?v=url\",\"description\":\"description\\u00ed\",\"protected\":false,\"followers_count\":41,\"friends_count\":75,\"listed_count\":2,\"created_at\":\"Mon Jul 05 17:35:49 +0000 2010\",\"favourites_count\":4697,\"utc_offset\":-10800,\"time_zone\":\"Buenos Aires\",\"geo_enabled\":false,\"verified\":false,\"statuses_count\":5257,\"lang\":\"es\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C4A64B\",\"profile_background_image_url\":\"http:\\/\\/a0.twimg.com\\
/profile_background_images\\/12345\\/12345.jpeg\",\"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_background_images\\/12345\\/12345.jpeg\",\"profile_background_tile\":true,\"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/12345\\/12345.jpeg\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/12345\\/12345.jpeg\",\"profile_banner_url\":\"https:\\/\\/pbs.twimg.com\\/profile_banners\\/12345\\/12345\",\"profile_link_color\":\"BF415A\",\"profile_sidebar_border_color\":\"000000\",\"profile_sidebar_fill_color\":\"B17CED\",\"profile_text_color\":\"3D1957\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweeted_status\":{\"created_at\":\"Wed Dec 11 22:25:06 +0000 2013\",\"id\":34567,\"id_str\":\"34567\",\"text\":\"text\",\"sourc
e\":\"source\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":34567,\"id_str\":\"34567\",\"name\":\"name\",\"screen_name\":\"screen_name\",\"location\":\"\",\"url\":\"http:\\/\\/www.web.com\",\"description\":\"description\",\"protected\":false,\"followers_count\":34307,\"friends_count\":325,\"listed_count\":361,\"created_at\":\"Fri Apr 13 19:00:11 +0000 2012\",\"favourites_count\":44956,\"utc_offset\":3600,\"time_zone\":\"Madrid\",\"geo_enabled\":false,\"verified\":false,\"statuses_count\":24011,\"lang\":\"es\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"000000\",\"profile_background_image_url\":\"http:\\/\\/profile_background_image_url.jpeg\",\"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_background_images\\/34567\\/34567.jpeg\",\"profile_background_tile\":false,
\"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/34567\\/34567.gif\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/34567\\/34567.gif\",\"profile_banner_url\":\"https:\\/\\/pbs.twimg.com\\/profile_banners\\/34567\\/34567\",\"profile_link_color\":\"FF00E1\",\"profile_sidebar_border_color\":\"FFFFFF\",\"profile_sidebar_fill_color\":\"F3F3F3\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweet_count\":9,\"favorite_count\":6,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[],\"user_mentions\":[]},\"favorited\":false,\"retweeted\":false,\"lang\":\"es\"},\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[],\"user_mentions\":[{\"screen_name\":\"screen_name\",\"name\
":\"name emocional\",\"id\":45678,\"id_str\":\"45678\",\"indices\":[3,14]}]},\"favorited\":false,\"retweeted\":false,\"filter_level\":\"medium\",\"lang\":\"es\"}\n";
- private String delete = "{\"delete\":{\"status\":{\"id\":56789,\"user_id\":67890,\"id_str\":\"56789\",\"user_id_str\":\"67890\"}}}\n";
- private String follow = "{\"follower\":{\"id\":12345},\"followee\":{\"id\":56789}}\n";
-
- @Test
- public void testConvertTweet() {
- List<Activity> activityList = activityConverterUtil.convert(tweet);
- Assert.assertTrue(activityList.size() == 1);
- Activity activity = activityList.get(0);
- if( !ActivityUtil.isValid(activity) )
- Assert.fail();
- }
-
- @Test
- public void testConvertRetweet() {
- List<Activity> activityList = activityConverterUtil.convert(retweet);
- Assert.assertTrue(activityList.size() == 1);
- Activity activity = activityList.get(0);
- if( !ActivityUtil.isValid(activity) )
- Assert.fail();
- }
-
- @Test
- public void testConvertDelete() {
- List<Activity> activityList = activityConverterUtil.convert(delete);
- Assert.assertTrue(activityList.size() == 1);
- Activity activity = activityList.get(0);
- if( !ActivityUtil.isValid(activity) )
- Assert.fail();
- }
-
- @Test
- public void testConvertFollow() {
- List<Activity> activityList = activityConverterUtil.convert(follow);
- Assert.assertTrue(activityList.size() == 1);
- Activity activity = activityList.get(0);
- if( !ActivityUtil.isValid(activity) )
- Assert.fail();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4febde27/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterActivityObjectsConvertersTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterActivityObjectsConvertersTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterActivityObjectsConvertersTest.java
deleted file mode 100644
index 4a663e2..0000000
--- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterActivityObjectsConvertersTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.test;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Lists;
-import org.apache.streams.converter.ActivityConverterUtil;
-import org.apache.streams.converter.ActivityObjectConverterUtil;
-import org.apache.streams.data.util.ActivityUtil;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.pojo.json.ActivityObject;
-import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-/**
- * Tests {@link: org.apache.streams.twitter.converter.*}
- */
-public class TwitterActivityObjectsConvertersTest {
-
- private final static Logger LOGGER = LoggerFactory.getLogger(TwitterActivityObjectsConvertersTest.class);
-
- private ActivityObjectConverterUtil activityObjectConverterUtil = ActivityObjectConverterUtil.getInstance();
-
- private String user = "{\"id\":1663018644,\"id_str\":\"1663018644\",\"name\":\"M.R. Clark\",\"screen_name\":\"cantennisfan\",\"location\":\"\",\"url\":null,\"description\":null,\"protected\":false,\"verified\":false,\"followers_count\":0,\"friends_count\":5,\"listed_count\":0,\"favourites_count\":2,\"statuses_count\":72,\"created_at\":\"Sun Aug 11 17:23:47 +0000 2013\",\"utc_offset\":-18000,\"time_zone\":\"Eastern Time (US & Canada)\",\"geo_enabled\":false,\"lang\":\"en\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C0DEED\",\"profile_background_image_url\":\"http://abs.twimg.com/images/themes/theme1/bg.png\",\"profile_background_image_url_https\":\"https://abs.twimg.com/images/themes/theme1/bg.png\",\"profile_background_tile\":false,\"profile_link_color\":\"0084B4\",\"profile_sidebar_border_color\":\"C0DEED\",\"profile_sidebar_fill_color\":\"DDEEF6\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"profile_image_
url\":\"http://abs.twimg.com/sticky/default_profile_images/default_profile_0_normal.png\",\"profile_image_url_https\":\"https://abs.twimg.com/sticky/default_profile_images/default_profile_0_normal.png\",\"default_profile\":true,\"default_profile_image\":true,\"following\":null,\"follow_request_sent\":null,\"notifications\":null,\"status\":{\"created_at\":\"Thu Jan 01 14:11:48 +0000 2015\",\"id\":550655634706669568,\"id_str\":\"550655634706669568\",\"text\":\"CBC Media Centre - CBC - Air Farce New Year's Eve 2014/2015: http://t.co/lMlL9VbC5e\",\"source\":\"<a href=\\\"https://dev.twitter.com/docs/tfw\\\" rel=\\\"nofollow\\\">Twitter for Websites</a>\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"trends\":[],\
"urls\":[{\"url\":\"http://t.co/lMlL9VbC5e\",\"expanded_url\":\"http://www.cbc.ca/mediacentre/air-farce-new-years-eve-20142015.html#.VKVVarDhVxR.twitter\",\"display_url\":\"cbc.ca/mediacentre/ai\u2026\",\"indices\":[61,83]}],\"user_mentions\":[],\"symbols\":[]},\"favorited\":false,\"retweeted\":false,\"possibly_sensitive\":false,\"filter_level\":\"medium\",\"lang\":\"en\",\"timestamp_ms\":\"1420121508658\"}}\n";
-
- @Test
- public void testConvertUser() {
- ActivityObject activityObject = activityObjectConverterUtil.convert(user);
- assert( activityObject != null );
- if( !ActivityUtil.isValid(activityObject) )
- Assert.fail();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4febde27/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterDocumentClassifierTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterDocumentClassifierTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterDocumentClassifierTest.java
deleted file mode 100644
index 044fe3c..0000000
--- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterDocumentClassifierTest.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.test;
-
-import org.apache.streams.twitter.converter.TwitterDocumentClassifier;
-import org.apache.streams.twitter.pojo.Delete;
-import org.apache.streams.twitter.pojo.Follow;
-import org.apache.streams.twitter.pojo.Retweet;
-import org.apache.streams.twitter.pojo.Tweet;
-import org.apache.streams.twitter.pojo.User;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.List;
-
-/**
- * Tests {@link: org.apache.streams.twitter.processor.TwitterEventClassifier}
- */
-public class TwitterDocumentClassifierTest {
-
- private String tweet = "{\"created_at\":\"Wed Dec 11 22:27:34 +0000 2013\",\"id\":12345,\"id_str\":\"12345\",\"text\":\"text\",\"source\":\"source\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":91407775,\"id_str\":\"12345\",\"name\":\"name\",\"screen_name\":\"screen_name\",\"location\":\"\",\"url\":null,\"description\":null,\"protected\":false,\"followers_count\":136,\"friends_count\":0,\"listed_count\":1,\"created_at\":\"Fri Nov 20 19:29:02 +0000 2009\",\"favourites_count\":0,\"utc_offset\":null,\"time_zone\":null,\"geo_enabled\":false,\"verified\":false,\"statuses_count\":1793,\"lang\":\"en\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C0DEED\",\"profile_background_image_url\":\"http:\\/\\/profile_background_image_url.png\",\"profile_background_image_url_https\":\"https:\\/\\/profile_b
ackground_image_url_https.png\",\"profile_background_tile\":false,\"profile_image_url\":\"http:\\/\\/profile_image_url.jpg\",\"profile_image_url_https\":\"https:\\/\\/profile_image_url_https.jpg\",\"profile_link_color\":\"0084B4\",\"profile_sidebar_border_color\":\"C0DEED\",\"profile_sidebar_fill_color\":\"DDEEF6\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"default_profile\":true,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[{\"url\":\"http:\\/\\/url\",\"expanded_url\":\"http:\\/\\/expanded_url\",\"display_url\":\"display_url\",\"indices\":[118,140]}],\"user_mentions\":[]},\"favorited\":false,\"retweeted\":false,\"possibly_sensitive\":false,\"filter_level\":\"medium\",\"lang\":\"en\"}\n";
- private String retweet = "{\"created_at\":\"Wed Dec 11 22:27:34 +0000 2013\",\"id\":23456,\"id_str\":\"23456\",\"text\":\"text\",\"source\":\"web\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":163149656,\"id_str\":\"34567\",\"name\":\"name\",\"screen_name\":\"screen_name\",\"location\":\"location\",\"url\":\"http:\\/\\/www.youtube.com\\/watch?v=url\",\"description\":\"description\\u00ed\",\"protected\":false,\"followers_count\":41,\"friends_count\":75,\"listed_count\":2,\"created_at\":\"Mon Jul 05 17:35:49 +0000 2010\",\"favourites_count\":4697,\"utc_offset\":-10800,\"time_zone\":\"Buenos Aires\",\"geo_enabled\":false,\"verified\":false,\"statuses_count\":5257,\"lang\":\"es\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C4A64B\",\"profile_background_image_url\":\"http:\\/\\/a0.twimg.com\\
/profile_background_images\\/12345\\/12345.jpeg\",\"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_background_images\\/12345\\/12345.jpeg\",\"profile_background_tile\":true,\"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/12345\\/12345.jpeg\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/12345\\/12345.jpeg\",\"profile_banner_url\":\"https:\\/\\/pbs.twimg.com\\/profile_banners\\/12345\\/12345\",\"profile_link_color\":\"BF415A\",\"profile_sidebar_border_color\":\"000000\",\"profile_sidebar_fill_color\":\"B17CED\",\"profile_text_color\":\"3D1957\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweeted_status\":{\"created_at\":\"Wed Dec 11 22:25:06 +0000 2013\",\"id\":34567,\"id_str\":\"34567\",\"text\":\"text\",\"sourc
e\":\"source\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":34567,\"id_str\":\"34567\",\"name\":\"name\",\"screen_name\":\"screen_name\",\"location\":\"\",\"url\":\"http:\\/\\/www.web.com\",\"description\":\"description\",\"protected\":false,\"followers_count\":34307,\"friends_count\":325,\"listed_count\":361,\"created_at\":\"Fri Apr 13 19:00:11 +0000 2012\",\"favourites_count\":44956,\"utc_offset\":3600,\"time_zone\":\"Madrid\",\"geo_enabled\":false,\"verified\":false,\"statuses_count\":24011,\"lang\":\"es\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"000000\",\"profile_background_image_url\":\"http:\\/\\/profile_background_image_url.jpeg\",\"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_background_images\\/34567\\/34567.jpeg\",\"profile_background_tile\":false,
\"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/34567\\/34567.gif\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/34567\\/34567.gif\",\"profile_banner_url\":\"https:\\/\\/pbs.twimg.com\\/profile_banners\\/34567\\/34567\",\"profile_link_color\":\"FF00E1\",\"profile_sidebar_border_color\":\"FFFFFF\",\"profile_sidebar_fill_color\":\"F3F3F3\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweet_count\":9,\"favorite_count\":6,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[],\"user_mentions\":[]},\"favorited\":false,\"retweeted\":false,\"lang\":\"es\"},\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[],\"user_mentions\":[{\"screen_name\":\"screen_name\",\"name\
":\"name emocional\",\"id\":45678,\"id_str\":\"45678\",\"indices\":[3,14]}]},\"favorited\":false,\"retweeted\":false,\"filter_level\":\"medium\",\"lang\":\"es\"}\n";
- private String delete = "{\"delete\":{\"status\":{\"id\":56789,\"user_id\":67890,\"id_str\":\"56789\",\"user_id_str\":\"67890\"}}}\n";
- private String follow = "{\"follower\":{\"id\":12345},\"followee\":{\"id\":56789}}\n";
- private String user = "{\"location\":\"\",\"default_profile\":true,\"profile_background_tile\":false,\"statuses_count\":1,\"lang\":\"en\",\"profile_link_color\":\"0084B4\",\"id\":67890,\"following\":false,\"protected\":false,\"favourites_count\":0,\"profile_text_color\":\"333333\",\"description\":\"\",\"verified\":false,\"contributors_enabled\":false,\"profile_sidebar_border_color\":\"C0DEED\",\"name\":\"name\",\"profile_background_color\":\"C0DEED\",\"created_at\":\"Fri Apr 17 12:35:56 +0000 2009\",\"is_translation_enabled\":false,\"default_profile_image\":true,\"followers_count\":2,\"profile_image_url_https\":\"https://profile_image_url_https.png\",\"geo_enabled\":false,\"status\":{\"contributors\":null,\"text\":\"Working\",\"geo\":null,\"retweeted\":false,\"in_reply_to_screen_name\":null,\"truncated\":false,\"lang\":\"en\",\"entities\":{\"symbols\":[],\"urls\":[],\"hashtags\":[],\"user_mentions\":[]},\"in_reply_to_status_id_str\":null,\"id\":67890,\"source\":\"web\",\"in_repl
y_to_user_id_str\":null,\"favorited\":false,\"in_reply_to_status_id\":null,\"retweet_count\":0,\"created_at\":\"Fri Apr 17 12:37:54 +0000 2009\",\"in_reply_to_user_id\":null,\"favorite_count\":0,\"id_str\":\"67890\",\"place\":null,\"coordinates\":null},\"profile_background_image_url\":\"http://abs.twimg.com/profile_background_image_url.png\",\"profile_background_image_url_https\":\"https://abs.twimg.com/images/profile_background_image_url_https.png\",\"follow_request_sent\":false,\"entities\":{\"description\":{\"urls\":[]}},\"url\":null,\"utc_offset\":null,\"time_zone\":null,\"notifications\":false,\"profile_use_background_image\":true,\"friends_count\":1,\"profile_sidebar_fill_color\":\"DDEEF6\",\"screen_name\":\"screen_name\",\"id_str\":\"67890\",\"profile_image_url\":\"http://abs.twimg.com/sticky/default_profile_images/default_profile_1_normal.png\",\"listed_count\":0,\"is_translator\":false}";
-
- @Test
- public void testDetectTweet() {
- List<Class> detected = new TwitterDocumentClassifier().detectClasses(tweet);
- Assert.assertTrue(detected.size() == 1);
- Class result = detected.get(0);
- if( !result.equals(Tweet.class) )
- Assert.fail();
- }
-
- @Test
- public void testDetectRetweet() {
- List<Class> detected = new TwitterDocumentClassifier().detectClasses(retweet);
- Assert.assertTrue(detected.size() == 1);
- Class result = detected.get(0);
- if( !result.equals(Retweet.class) )
- Assert.fail();
- }
-
- @Test
- public void testDetectDelete() {
- List<Class> detected = new TwitterDocumentClassifier().detectClasses(delete);
- Assert.assertTrue(detected.size() == 1);
- Class result = detected.get(0);
- if( !result.equals(Delete.class) )
- Assert.fail();
- }
-
- @Test
- public void testDetectFollow() {
- List<Class> detected = new TwitterDocumentClassifier().detectClasses(follow);
- Assert.assertTrue(detected.size() == 1);
- Class result = detected.get(0);
- if( !result.equals(Follow.class) )
- Assert.fail();
- }
-
- @Test
- public void testDetectUser() {
- List<Class> detected = new TwitterDocumentClassifier().detectClasses(user);
- Assert.assertTrue(detected.size() == 1);
- Class result = detected.get(0);
- if (!result.equals(User.class))
- Assert.fail();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4febde27/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterObjectMapperIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterObjectMapperIT.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterObjectMapperIT.java
deleted file mode 100644
index e8bbf49..0000000
--- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterObjectMapperIT.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.test;
-
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.StringUtils;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.twitter.converter.StreamsTwitterMapper;
-import org.apache.streams.twitter.converter.TwitterDocumentClassifier;
-import org.apache.streams.twitter.pojo.Delete;
-import org.apache.streams.twitter.pojo.Retweet;
-import org.apache.streams.twitter.pojo.Tweet;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.CoreMatchers.not;
-import static org.hamcrest.CoreMatchers.nullValue;
-import static org.hamcrest.Matchers.greaterThan;
-import static org.junit.Assert.assertThat;
-
-/**
-* Tests serialization / deserialization of twitter jsons
-*/
-public class TwitterObjectMapperIT {
-
- private final static Logger LOGGER = LoggerFactory.getLogger(TwitterObjectMapperIT.class);
-
- private ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(StreamsTwitterMapper.TWITTER_FORMAT));
-
- @Test
- public void Tests()
- {
- mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.TRUE);
- mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
- mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
-
- InputStream is = TwitterObjectMapperIT.class.getResourceAsStream("/testtweets.txt");
- InputStreamReader isr = new InputStreamReader(is);
- BufferedReader br = new BufferedReader(isr);
-
- int tweetlinks = 0;
- int retweetlinks = 0;
-
- try {
- while (br.ready()) {
- String line = br.readLine();
- if(!StringUtils.isEmpty(line))
- {
- LOGGER.info("raw: {}", line);
-
- Class detected = new TwitterDocumentClassifier().detectClasses(line).get(0);
-
- ObjectNode event = (ObjectNode) mapper.readTree(line);
-
- assertThat(event, is(not(nullValue())));
-
- if( detected == Tweet.class ) {
-
- Tweet tweet = mapper.convertValue(event, Tweet.class);
-
- assertThat(tweet, is(not(nullValue())));
- assertThat(tweet.getCreatedAt(), is(not(nullValue())));
- assertThat(tweet.getText(), is(not(nullValue())));
- assertThat(tweet.getUser(), is(not(nullValue())));
-
- tweetlinks += Optional.fromNullable(tweet.getEntities().getUrls().size()).or(0);
-
- } else if( detected == Retweet.class ) {
-
- Retweet retweet = mapper.convertValue(event, Retweet.class);
-
- assertThat(retweet.getRetweetedStatus(), is(not(nullValue())));
- assertThat(retweet.getRetweetedStatus().getCreatedAt(), is(not(nullValue())));
- assertThat(retweet.getRetweetedStatus().getText(), is(not(nullValue())));
- assertThat(retweet.getRetweetedStatus().getUser(), is(not(nullValue())));
- assertThat(retweet.getRetweetedStatus().getUser().getId(), is(not(nullValue())));
- assertThat(retweet.getRetweetedStatus().getUser().getCreatedAt(), is(not(nullValue())));
-
- retweetlinks += Optional.fromNullable(retweet.getRetweetedStatus().getEntities().getUrls().size()).or(0);
-
- } else if( detected == Delete.class ) {
-
- Delete delete = mapper.convertValue(event, Delete.class);
-
- assertThat(delete.getDelete(), is(not(nullValue())));
- assertThat(delete.getDelete().getStatus(), is(not(nullValue())));
- assertThat(delete.getDelete().getStatus().getId(), is(not(nullValue())));
- assertThat(delete.getDelete().getStatus().getUserId(), is(not(nullValue())));
-
- } else {
- Assert.fail();
- }
-
- }
- }
- } catch( Exception e ) {
- LOGGER.error("Exception: ", e);
- Assert.fail();
- }
-
- assertThat(tweetlinks, is(greaterThan(0)));
- assertThat(retweetlinks, is(greaterThan(0)));
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4febde27/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/data/TwitterObjectMapperIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/data/TwitterObjectMapperIT.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/data/TwitterObjectMapperIT.java
new file mode 100644
index 0000000..4d6a3de
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/data/TwitterObjectMapperIT.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.test.data;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.StringUtils;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.twitter.converter.StreamsTwitterMapper;
+import org.apache.streams.twitter.converter.TwitterDocumentClassifier;
+import org.apache.streams.twitter.pojo.Delete;
+import org.apache.streams.twitter.pojo.Retweet;
+import org.apache.streams.twitter.pojo.Tweet;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertThat;
+
+/**
+* Tests serialization / deserialization of twitter jsons
+*/
+public class TwitterObjectMapperIT {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(TwitterObjectMapperIT.class);
+
+ private ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(StreamsTwitterMapper.TWITTER_FORMAT));
+
+ @Test
+ public void Tests()
+ {
+ mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.TRUE);
+ mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
+ mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
+
+ InputStream is = TwitterObjectMapperIT.class.getResourceAsStream("/testtweets.txt");
+ InputStreamReader isr = new InputStreamReader(is);
+ BufferedReader br = new BufferedReader(isr);
+
+ int tweetlinks = 0;
+ int retweetlinks = 0;
+
+ try {
+ while (br.ready()) {
+ String line = br.readLine();
+ if(!StringUtils.isEmpty(line))
+ {
+ LOGGER.info("raw: {}", line);
+
+ Class detected = new TwitterDocumentClassifier().detectClasses(line).get(0);
+
+ ObjectNode event = (ObjectNode) mapper.readTree(line);
+
+ assertThat(event, is(not(nullValue())));
+
+ if( detected == Tweet.class ) {
+
+ Tweet tweet = mapper.convertValue(event, Tweet.class);
+
+ assertThat(tweet, is(not(nullValue())));
+ assertThat(tweet.getCreatedAt(), is(not(nullValue())));
+ assertThat(tweet.getText(), is(not(nullValue())));
+ assertThat(tweet.getUser(), is(not(nullValue())));
+
+ tweetlinks += Optional.fromNullable(tweet.getEntities().getUrls().size()).or(0);
+
+ } else if( detected == Retweet.class ) {
+
+ Retweet retweet = mapper.convertValue(event, Retweet.class);
+
+ assertThat(retweet.getRetweetedStatus(), is(not(nullValue())));
+ assertThat(retweet.getRetweetedStatus().getCreatedAt(), is(not(nullValue())));
+ assertThat(retweet.getRetweetedStatus().getText(), is(not(nullValue())));
+ assertThat(retweet.getRetweetedStatus().getUser(), is(not(nullValue())));
+ assertThat(retweet.getRetweetedStatus().getUser().getId(), is(not(nullValue())));
+ assertThat(retweet.getRetweetedStatus().getUser().getCreatedAt(), is(not(nullValue())));
+
+ retweetlinks += Optional.fromNullable(retweet.getRetweetedStatus().getEntities().getUrls().size()).or(0);
+
+ } else if( detected == Delete.class ) {
+
+ Delete delete = mapper.convertValue(event, Delete.class);
+
+ assertThat(delete.getDelete(), is(not(nullValue())));
+ assertThat(delete.getDelete().getStatus(), is(not(nullValue())));
+ assertThat(delete.getDelete().getStatus().getId(), is(not(nullValue())));
+ assertThat(delete.getDelete().getStatus().getUserId(), is(not(nullValue())));
+
+ } else {
+ Assert.fail();
+ }
+
+ }
+ }
+ } catch( Exception e ) {
+ LOGGER.error("Exception: ", e);
+ Assert.fail();
+ }
+
+ assertThat(tweetlinks, is(greaterThan(0)));
+ assertThat(retweetlinks, is(greaterThan(0)));
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4febde27/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterFollowingProviderIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterFollowingProviderIT.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterFollowingProviderIT.java
new file mode 100644
index 0000000..558bb7c
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterFollowingProviderIT.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.test.providers;
+
+import com.google.common.collect.Lists;
+import org.apache.streams.twitter.provider.TwitterFollowingProvider;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.LineNumberReader;
+
+public class TwitterFollowingProviderIT {
+
+ @Test
+ public void testTwitterFollowingProvider() throws Exception {
+
+ String configfile = "./target/test-classes/TwitterFollowingProviderIT.conf";
+ String outfile = "./target/test-classes/TwitterFollowingProviderIT.stdout.txt";
+
+ TwitterFollowingProvider.main(Lists.newArrayList(configfile, outfile).toArray(new String[2]));
+
+ File out = new File(outfile);
+ assert (out.exists());
+ assert (out.canRead());
+ assert (out.isFile());
+
+ FileReader outReader = new FileReader(out);
+ LineNumberReader outCounter = new LineNumberReader(outReader);
+
+ while(outCounter.readLine() != null) {}
+
+ assert (outCounter.getLineNumber() == 10000);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4febde27/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterStreamProviderIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterStreamProviderIT.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterStreamProviderIT.java
new file mode 100644
index 0000000..880f5df
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterStreamProviderIT.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.test.providers;
+
+import com.google.common.collect.Lists;
+import org.apache.streams.twitter.provider.TwitterStreamProvider;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.LineNumberReader;
+
+public class TwitterStreamProviderIT {
+
+ final String outfile = "./target/test-classes/TwitterStreamProviderIT.stdout.txt";
+
+ @Test
+ public void testTwitterStreamProvider() throws Exception {
+
+
+ Thread testThread = new Thread(
+ new Runnable() {
+
+ String configfile = "./target/test-classes/TwitterStreamProviderIT.conf";
+
+ @Override
+ public void run() {
+ TwitterStreamProvider.main(Lists.newArrayList(configfile, outfile).toArray(new String[2]));
+ }
+ }
+ );
+ testThread.start();
+ testThread.join(30000);
+
+ File out = new File(outfile);
+ assert (out.exists());
+ assert (out.canRead());
+ assert (out.isFile());
+
+ FileReader outReader = new FileReader(out);
+ LineNumberReader outCounter = new LineNumberReader(outReader);
+
+ while(outCounter.readLine() != null) {}
+
+ assert (outCounter.getLineNumber() > 25);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4febde27/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterTimelineProviderIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterTimelineProviderIT.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterTimelineProviderIT.java
new file mode 100644
index 0000000..9e26528
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterTimelineProviderIT.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.test.providers;
+
+import com.google.common.collect.Lists;
+import org.apache.streams.twitter.provider.TwitterTimelineProvider;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.LineNumberReader;
+
+public class TwitterTimelineProviderIT {
+
+ @Test
+ public void testTwitterTimelineProvider() throws Exception {
+
+ String configfile = "./target/test-classes/TwitterTimelineProviderIT.conf";
+ String outfile = "./target/test-classes/TwitterTimelineProviderIT.stdout.txt";
+
+ TwitterTimelineProvider.main(Lists.newArrayList(configfile, outfile).toArray(new String[2]));
+
+ File out = new File(outfile);
+ assert (out.exists());
+ assert (out.canRead());
+ assert (out.isFile());
+
+ FileReader outReader = new FileReader(out);
+ LineNumberReader outCounter = new LineNumberReader(outReader);
+
+ while(outCounter.readLine() != null) {}
+
+ assert (outCounter.getLineNumber() == 1000);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4febde27/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterUserInformationProviderIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterUserInformationProviderIT.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterUserInformationProviderIT.java
new file mode 100644
index 0000000..e489f64
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterUserInformationProviderIT.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.test.providers;
+
+import com.google.common.collect.Lists;
+import org.apache.streams.twitter.provider.TwitterUserInformationProvider;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.LineNumberReader;
+
+public class TwitterUserInformationProviderIT {
+
+ @Test
+ public void testTwitterUserInformationProvider() throws Exception {
+
+ String configfile = "./target/test-classes/TwitterUserInformationProviderIT.conf";
+ String outfile = "./target/test-classes/TwitterUserInformationProviderIT.stdout.txt";
+
+ TwitterUserInformationProvider.main(Lists.newArrayList(configfile, outfile).toArray(new String[2]));
+
+ File out = new File(outfile);
+ assert (out.exists());
+ assert (out.canRead());
+ assert (out.isFile());
+
+ FileReader outReader = new FileReader(out);
+ LineNumberReader outCounter = new LineNumberReader(outReader);
+
+ while(outCounter.readLine() != null) {}
+
+ assert (outCounter.getLineNumber() > 750);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4febde27/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/utils/TwitterActivityConvertersTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/utils/TwitterActivityConvertersTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/utils/TwitterActivityConvertersTest.java
new file mode 100644
index 0000000..755a98e
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/utils/TwitterActivityConvertersTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.test.utils;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import org.apache.streams.converter.ActivityConverterUtil;
+import org.apache.streams.data.util.ActivityUtil;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Tests {@link: org.apache.streams.twitter.converter.*}
+ */
+public class TwitterActivityConvertersTest {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(TwitterActivityConvertersTest.class);
+
+ private ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT));
+
+ private ActivityConverterUtil activityConverterUtil = ActivityConverterUtil.getInstance();
+
+ private String tweet = "{\"created_at\":\"Wed Dec 11 22:27:34 +0000 2013\",\"id\":12345,\"id_str\":\"12345\",\"text\":\"text\",\"source\":\"source\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":91407775,\"id_str\":\"12345\",\"name\":\"name\",\"screen_name\":\"screen_name\",\"location\":\"\",\"url\":null,\"description\":null,\"protected\":false,\"followers_count\":136,\"friends_count\":0,\"listed_count\":1,\"created_at\":\"Fri Nov 20 19:29:02 +0000 2009\",\"favourites_count\":0,\"utc_offset\":null,\"time_zone\":null,\"geo_enabled\":false,\"verified\":false,\"statuses_count\":1793,\"lang\":\"en\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C0DEED\",\"profile_background_image_url\":\"http:\\/\\/profile_background_image_url.png\",\"profile_background_image_url_https\":\"https:\\/\\/profile_b
ackground_image_url_https.png\",\"profile_background_tile\":false,\"profile_image_url\":\"http:\\/\\/profile_image_url.jpg\",\"profile_image_url_https\":\"https:\\/\\/profile_image_url_https.jpg\",\"profile_link_color\":\"0084B4\",\"profile_sidebar_border_color\":\"C0DEED\",\"profile_sidebar_fill_color\":\"DDEEF6\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"default_profile\":true,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[{\"url\":\"http:\\/\\/url\",\"expanded_url\":\"http:\\/\\/expanded_url\",\"display_url\":\"display_url\",\"indices\":[118,140]}],\"user_mentions\":[]},\"favorited\":false,\"retweeted\":false,\"possibly_sensitive\":false,\"filter_level\":\"medium\",\"lang\":\"en\"}\n";
+ private String retweet = "{\"created_at\":\"Wed Dec 11 22:27:34 +0000 2013\",\"id\":23456,\"id_str\":\"23456\",\"text\":\"text\",\"source\":\"web\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":163149656,\"id_str\":\"34567\",\"name\":\"name\",\"screen_name\":\"screen_name\",\"location\":\"location\",\"url\":\"http:\\/\\/www.youtube.com\\/watch?v=url\",\"description\":\"description\\u00ed\",\"protected\":false,\"followers_count\":41,\"friends_count\":75,\"listed_count\":2,\"created_at\":\"Mon Jul 05 17:35:49 +0000 2010\",\"favourites_count\":4697,\"utc_offset\":-10800,\"time_zone\":\"Buenos Aires\",\"geo_enabled\":false,\"verified\":false,\"statuses_count\":5257,\"lang\":\"es\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C4A64B\",\"profile_background_image_url\":\"http:\\/\\/a0.twimg.com\\
/profile_background_images\\/12345\\/12345.jpeg\",\"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_background_images\\/12345\\/12345.jpeg\",\"profile_background_tile\":true,\"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/12345\\/12345.jpeg\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/12345\\/12345.jpeg\",\"profile_banner_url\":\"https:\\/\\/pbs.twimg.com\\/profile_banners\\/12345\\/12345\",\"profile_link_color\":\"BF415A\",\"profile_sidebar_border_color\":\"000000\",\"profile_sidebar_fill_color\":\"B17CED\",\"profile_text_color\":\"3D1957\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweeted_status\":{\"created_at\":\"Wed Dec 11 22:25:06 +0000 2013\",\"id\":34567,\"id_str\":\"34567\",\"text\":\"text\",\"sourc
e\":\"source\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":34567,\"id_str\":\"34567\",\"name\":\"name\",\"screen_name\":\"screen_name\",\"location\":\"\",\"url\":\"http:\\/\\/www.web.com\",\"description\":\"description\",\"protected\":false,\"followers_count\":34307,\"friends_count\":325,\"listed_count\":361,\"created_at\":\"Fri Apr 13 19:00:11 +0000 2012\",\"favourites_count\":44956,\"utc_offset\":3600,\"time_zone\":\"Madrid\",\"geo_enabled\":false,\"verified\":false,\"statuses_count\":24011,\"lang\":\"es\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"000000\",\"profile_background_image_url\":\"http:\\/\\/profile_background_image_url.jpeg\",\"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_background_images\\/34567\\/34567.jpeg\",\"profile_background_tile\":false,
\"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/34567\\/34567.gif\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/34567\\/34567.gif\",\"profile_banner_url\":\"https:\\/\\/pbs.twimg.com\\/profile_banners\\/34567\\/34567\",\"profile_link_color\":\"FF00E1\",\"profile_sidebar_border_color\":\"FFFFFF\",\"profile_sidebar_fill_color\":\"F3F3F3\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweet_count\":9,\"favorite_count\":6,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[],\"user_mentions\":[]},\"favorited\":false,\"retweeted\":false,\"lang\":\"es\"},\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[],\"user_mentions\":[{\"screen_name\":\"screen_name\",\"name\
":\"name emocional\",\"id\":45678,\"id_str\":\"45678\",\"indices\":[3,14]}]},\"favorited\":false,\"retweeted\":false,\"filter_level\":\"medium\",\"lang\":\"es\"}\n";
+ private String delete = "{\"delete\":{\"status\":{\"id\":56789,\"user_id\":67890,\"id_str\":\"56789\",\"user_id_str\":\"67890\"}}}\n";
+ private String follow = "{\"follower\":{\"id\":12345},\"followee\":{\"id\":56789}}\n";
+
+ @Test
+ public void testConvertTweet() {
+ List<Activity> activityList = activityConverterUtil.convert(tweet);
+ Assert.assertTrue(activityList.size() == 1);
+ Activity activity = activityList.get(0);
+ if( !ActivityUtil.isValid(activity) )
+ Assert.fail();
+ }
+
+ @Test
+ public void testConvertRetweet() {
+ List<Activity> activityList = activityConverterUtil.convert(retweet);
+ Assert.assertTrue(activityList.size() == 1);
+ Activity activity = activityList.get(0);
+ if( !ActivityUtil.isValid(activity) )
+ Assert.fail();
+ }
+
+ @Test
+ public void testConvertDelete() {
+ List<Activity> activityList = activityConverterUtil.convert(delete);
+ Assert.assertTrue(activityList.size() == 1);
+ Activity activity = activityList.get(0);
+ if( !ActivityUtil.isValid(activity) )
+ Assert.fail();
+ }
+
+ @Test
+ public void testConvertFollow() {
+ List<Activity> activityList = activityConverterUtil.convert(follow);
+ Assert.assertTrue(activityList.size() == 1);
+ Activity activity = activityList.get(0);
+ if( !ActivityUtil.isValid(activity) )
+ Assert.fail();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4febde27/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/utils/TwitterActivityObjectsConvertersTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/utils/TwitterActivityObjectsConvertersTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/utils/TwitterActivityObjectsConvertersTest.java
new file mode 100644
index 0000000..11cd1e0
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/utils/TwitterActivityObjectsConvertersTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.test.utils;
+
+import org.apache.streams.converter.ActivityObjectConverterUtil;
+import org.apache.streams.data.util.ActivityUtil;
+import org.apache.streams.pojo.json.ActivityObject;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests {@link: org.apache.streams.twitter.converter.*}
+ */
+public class TwitterActivityObjectsConvertersTest {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(TwitterActivityObjectsConvertersTest.class);
+
+ private ActivityObjectConverterUtil activityObjectConverterUtil = ActivityObjectConverterUtil.getInstance();
+
+ private String user = "{\"id\":1663018644,\"id_str\":\"1663018644\",\"name\":\"M.R. Clark\",\"screen_name\":\"cantennisfan\",\"location\":\"\",\"url\":null,\"description\":null,\"protected\":false,\"verified\":false,\"followers_count\":0,\"friends_count\":5,\"listed_count\":0,\"favourites_count\":2,\"statuses_count\":72,\"created_at\":\"Sun Aug 11 17:23:47 +0000 2013\",\"utc_offset\":-18000,\"time_zone\":\"Eastern Time (US & Canada)\",\"geo_enabled\":false,\"lang\":\"en\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C0DEED\",\"profile_background_image_url\":\"http://abs.twimg.com/images/themes/theme1/bg.png\",\"profile_background_image_url_https\":\"https://abs.twimg.com/images/themes/theme1/bg.png\",\"profile_background_tile\":false,\"profile_link_color\":\"0084B4\",\"profile_sidebar_border_color\":\"C0DEED\",\"profile_sidebar_fill_color\":\"DDEEF6\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"profile_image_
url\":\"http://abs.twimg.com/sticky/default_profile_images/default_profile_0_normal.png\",\"profile_image_url_https\":\"https://abs.twimg.com/sticky/default_profile_images/default_profile_0_normal.png\",\"default_profile\":true,\"default_profile_image\":true,\"following\":null,\"follow_request_sent\":null,\"notifications\":null,\"status\":{\"created_at\":\"Thu Jan 01 14:11:48 +0000 2015\",\"id\":550655634706669568,\"id_str\":\"550655634706669568\",\"text\":\"CBC Media Centre - CBC - Air Farce New Year's Eve 2014/2015: http://t.co/lMlL9VbC5e\",\"source\":\"<a href=\\\"https://dev.twitter.com/docs/tfw\\\" rel=\\\"nofollow\\\">Twitter for Websites</a>\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"trends\":[],\
"urls\":[{\"url\":\"http://t.co/lMlL9VbC5e\",\"expanded_url\":\"http://www.cbc.ca/mediacentre/air-farce-new-years-eve-20142015.html#.VKVVarDhVxR.twitter\",\"display_url\":\"cbc.ca/mediacentre/ai\u2026\",\"indices\":[61,83]}],\"user_mentions\":[],\"symbols\":[]},\"favorited\":false,\"retweeted\":false,\"possibly_sensitive\":false,\"filter_level\":\"medium\",\"lang\":\"en\",\"timestamp_ms\":\"1420121508658\"}}\n";
+
+ @Test
+ public void testConvertUser() {
+ ActivityObject activityObject = activityObjectConverterUtil.convert(user);
+ assert( activityObject != null );
+ if( !ActivityUtil.isValid(activityObject) )
+ Assert.fail();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4febde27/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/utils/TwitterDocumentClassifierTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/utils/TwitterDocumentClassifierTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/utils/TwitterDocumentClassifierTest.java
new file mode 100644
index 0000000..a1ca7c5
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/utils/TwitterDocumentClassifierTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.test.utils;
+
+import org.apache.streams.twitter.converter.TwitterDocumentClassifier;
+import org.apache.streams.twitter.pojo.Delete;
+import org.apache.streams.twitter.pojo.Follow;
+import org.apache.streams.twitter.pojo.Retweet;
+import org.apache.streams.twitter.pojo.Tweet;
+import org.apache.streams.twitter.pojo.User;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+/**
+ * Tests {@link: org.apache.streams.twitter.processor.TwitterEventClassifier}
+ */
+public class TwitterDocumentClassifierTest {
+
+ private String tweet = "{\"created_at\":\"Wed Dec 11 22:27:34 +0000 2013\",\"id\":12345,\"id_str\":\"12345\",\"text\":\"text\",\"source\":\"source\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":91407775,\"id_str\":\"12345\",\"name\":\"name\",\"screen_name\":\"screen_name\",\"location\":\"\",\"url\":null,\"description\":null,\"protected\":false,\"followers_count\":136,\"friends_count\":0,\"listed_count\":1,\"created_at\":\"Fri Nov 20 19:29:02 +0000 2009\",\"favourites_count\":0,\"utc_offset\":null,\"time_zone\":null,\"geo_enabled\":false,\"verified\":false,\"statuses_count\":1793,\"lang\":\"en\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C0DEED\",\"profile_background_image_url\":\"http:\\/\\/profile_background_image_url.png\",\"profile_background_image_url_https\":\"https:\\/\\/profile_b
ackground_image_url_https.png\",\"profile_background_tile\":false,\"profile_image_url\":\"http:\\/\\/profile_image_url.jpg\",\"profile_image_url_https\":\"https:\\/\\/profile_image_url_https.jpg\",\"profile_link_color\":\"0084B4\",\"profile_sidebar_border_color\":\"C0DEED\",\"profile_sidebar_fill_color\":\"DDEEF6\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"default_profile\":true,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[{\"url\":\"http:\\/\\/url\",\"expanded_url\":\"http:\\/\\/expanded_url\",\"display_url\":\"display_url\",\"indices\":[118,140]}],\"user_mentions\":[]},\"favorited\":false,\"retweeted\":false,\"possibly_sensitive\":false,\"filter_level\":\"medium\",\"lang\":\"en\"}\n";
+ private String retweet = "{\"created_at\":\"Wed Dec 11 22:27:34 +0000 2013\",\"id\":23456,\"id_str\":\"23456\",\"text\":\"text\",\"source\":\"web\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":163149656,\"id_str\":\"34567\",\"name\":\"name\",\"screen_name\":\"screen_name\",\"location\":\"location\",\"url\":\"http:\\/\\/www.youtube.com\\/watch?v=url\",\"description\":\"description\\u00ed\",\"protected\":false,\"followers_count\":41,\"friends_count\":75,\"listed_count\":2,\"created_at\":\"Mon Jul 05 17:35:49 +0000 2010\",\"favourites_count\":4697,\"utc_offset\":-10800,\"time_zone\":\"Buenos Aires\",\"geo_enabled\":false,\"verified\":false,\"statuses_count\":5257,\"lang\":\"es\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C4A64B\",\"profile_background_image_url\":\"http:\\/\\/a0.twimg.com\\
/profile_background_images\\/12345\\/12345.jpeg\",\"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_background_images\\/12345\\/12345.jpeg\",\"profile_background_tile\":true,\"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/12345\\/12345.jpeg\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/12345\\/12345.jpeg\",\"profile_banner_url\":\"https:\\/\\/pbs.twimg.com\\/profile_banners\\/12345\\/12345\",\"profile_link_color\":\"BF415A\",\"profile_sidebar_border_color\":\"000000\",\"profile_sidebar_fill_color\":\"B17CED\",\"profile_text_color\":\"3D1957\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweeted_status\":{\"created_at\":\"Wed Dec 11 22:25:06 +0000 2013\",\"id\":34567,\"id_str\":\"34567\",\"text\":\"text\",\"sourc
e\":\"source\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":34567,\"id_str\":\"34567\",\"name\":\"name\",\"screen_name\":\"screen_name\",\"location\":\"\",\"url\":\"http:\\/\\/www.web.com\",\"description\":\"description\",\"protected\":false,\"followers_count\":34307,\"friends_count\":325,\"listed_count\":361,\"created_at\":\"Fri Apr 13 19:00:11 +0000 2012\",\"favourites_count\":44956,\"utc_offset\":3600,\"time_zone\":\"Madrid\",\"geo_enabled\":false,\"verified\":false,\"statuses_count\":24011,\"lang\":\"es\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"000000\",\"profile_background_image_url\":\"http:\\/\\/profile_background_image_url.jpeg\",\"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_background_images\\/34567\\/34567.jpeg\",\"profile_background_tile\":false,
\"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/34567\\/34567.gif\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/34567\\/34567.gif\",\"profile_banner_url\":\"https:\\/\\/pbs.twimg.com\\/profile_banners\\/34567\\/34567\",\"profile_link_color\":\"FF00E1\",\"profile_sidebar_border_color\":\"FFFFFF\",\"profile_sidebar_fill_color\":\"F3F3F3\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweet_count\":9,\"favorite_count\":6,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[],\"user_mentions\":[]},\"favorited\":false,\"retweeted\":false,\"lang\":\"es\"},\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[],\"user_mentions\":[{\"screen_name\":\"screen_name\",\"name\
":\"name emocional\",\"id\":45678,\"id_str\":\"45678\",\"indices\":[3,14]}]},\"favorited\":false,\"retweeted\":false,\"filter_level\":\"medium\",\"lang\":\"es\"}\n";
+ private String delete = "{\"delete\":{\"status\":{\"id\":56789,\"user_id\":67890,\"id_str\":\"56789\",\"user_id_str\":\"67890\"}}}\n";
+ private String follow = "{\"follower\":{\"id\":12345},\"followee\":{\"id\":56789}}\n";
+ private String user = "{\"location\":\"\",\"default_profile\":true,\"profile_background_tile\":false,\"statuses_count\":1,\"lang\":\"en\",\"profile_link_color\":\"0084B4\",\"id\":67890,\"following\":false,\"protected\":false,\"favourites_count\":0,\"profile_text_color\":\"333333\",\"description\":\"\",\"verified\":false,\"contributors_enabled\":false,\"profile_sidebar_border_color\":\"C0DEED\",\"name\":\"name\",\"profile_background_color\":\"C0DEED\",\"created_at\":\"Fri Apr 17 12:35:56 +0000 2009\",\"is_translation_enabled\":false,\"default_profile_image\":true,\"followers_count\":2,\"profile_image_url_https\":\"https://profile_image_url_https.png\",\"geo_enabled\":false,\"status\":{\"contributors\":null,\"text\":\"Working\",\"geo\":null,\"retweeted\":false,\"in_reply_to_screen_name\":null,\"truncated\":false,\"lang\":\"en\",\"entities\":{\"symbols\":[],\"urls\":[],\"hashtags\":[],\"user_mentions\":[]},\"in_reply_to_status_id_str\":null,\"id\":67890,\"source\":\"web\",\"in_repl
y_to_user_id_str\":null,\"favorited\":false,\"in_reply_to_status_id\":null,\"retweet_count\":0,\"created_at\":\"Fri Apr 17 12:37:54 +0000 2009\",\"in_reply_to_user_id\":null,\"favorite_count\":0,\"id_str\":\"67890\",\"place\":null,\"coordinates\":null},\"profile_background_image_url\":\"http://abs.twimg.com/profile_background_image_url.png\",\"profile_background_image_url_https\":\"https://abs.twimg.com/images/profile_background_image_url_https.png\",\"follow_request_sent\":false,\"entities\":{\"description\":{\"urls\":[]}},\"url\":null,\"utc_offset\":null,\"time_zone\":null,\"notifications\":false,\"profile_use_background_image\":true,\"friends_count\":1,\"profile_sidebar_fill_color\":\"DDEEF6\",\"screen_name\":\"screen_name\",\"id_str\":\"67890\",\"profile_image_url\":\"http://abs.twimg.com/sticky/default_profile_images/default_profile_1_normal.png\",\"listed_count\":0,\"is_translator\":false}";
+
+ @Test
+ public void testDetectTweet() {
+ List<Class> detected = new TwitterDocumentClassifier().detectClasses(tweet);
+ Assert.assertTrue(detected.size() == 1);
+ Class result = detected.get(0);
+ if( !result.equals(Tweet.class) )
+ Assert.fail();
+ }
+
+ @Test
+ public void testDetectRetweet() {
+ List<Class> detected = new TwitterDocumentClassifier().detectClasses(retweet);
+ Assert.assertTrue(detected.size() == 1);
+ Class result = detected.get(0);
+ if( !result.equals(Retweet.class) )
+ Assert.fail();
+ }
+
+ @Test
+ public void testDetectDelete() {
+ List<Class> detected = new TwitterDocumentClassifier().detectClasses(delete);
+ Assert.assertTrue(detected.size() == 1);
+ Class result = detected.get(0);
+ if( !result.equals(Delete.class) )
+ Assert.fail();
+ }
+
+ @Test
+ public void testDetectFollow() {
+ List<Class> detected = new TwitterDocumentClassifier().detectClasses(follow);
+ Assert.assertTrue(detected.size() == 1);
+ Class result = detected.get(0);
+ if( !result.equals(Follow.class) )
+ Assert.fail();
+ }
+
+ @Test
+ public void testDetectUser() {
+ List<Class> detected = new TwitterDocumentClassifier().detectClasses(user);
+ Assert.assertTrue(detected.size() == 1);
+ Class result = detected.get(0);
+ if (!result.equals(User.class))
+ Assert.fail();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4febde27/streams-contrib/streams-provider-twitter/src/test/resources/TwitterFollowingProviderIT.conf
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/resources/TwitterFollowingProviderIT.conf b/streams-contrib/streams-provider-twitter/src/test/resources/TwitterFollowingProviderIT.conf
new file mode 100644
index 0000000..378978a
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/test/resources/TwitterFollowingProviderIT.conf
@@ -0,0 +1,8 @@
+twitter {
+ info = [
+ 18055613
+ ]
+ endpoint = followers
+ ids_only = true
+ max_items = 10000
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4febde27/streams-contrib/streams-provider-twitter/src/test/resources/TwitterStreamProviderIT.conf
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/resources/TwitterStreamProviderIT.conf b/streams-contrib/streams-provider-twitter/src/test/resources/TwitterStreamProviderIT.conf
new file mode 100644
index 0000000..291a17c
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/test/resources/TwitterStreamProviderIT.conf
@@ -0,0 +1,6 @@
+twitter {
+ endpoint = sample
+ track = [
+ "data"
+ ]
+}
[4/8] incubator-streams git commit: fixes while testing flink examples
Posted by sb...@apache.org.
fixes while testing flink examples
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/9495cf52
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/9495cf52
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/9495cf52
Branch: refs/heads/master
Commit: 9495cf52e3d1c5d5100566364bfa30447555682a
Parents: d9e58cd
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Wed Oct 5 16:41:48 2016 -0500
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Wed Oct 5 16:41:48 2016 -0500
----------------------------------------------------------------------
.../provider/TwitterFollowingProvider.java | 9 ++--
.../provider/TwitterFollowingProviderTask.java | 52 +++++++++-----------
.../provider/TwitterStreamProcessor.java | 5 +-
.../TwitterUserInformationProvider.java | 6 ++-
4 files changed, 34 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9495cf52/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
index 27c8526..4c3a828 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
@@ -88,7 +88,7 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider {
Twitter client = getTwitterClient();
for (int i = 0; i < ids.length; i++) {
- TwitterFollowingProviderTask providerTask = new TwitterFollowingProviderTask(this, client, ids[i], getConfig().getEndpoint(), getConfig().getIdsOnly());
+ TwitterFollowingProviderTask providerTask = new TwitterFollowingProviderTask(this, client, ids[i]);
executor.submit(providerTask);
}
}
@@ -97,7 +97,7 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider {
Twitter client = getTwitterClient();
for (int i = 0; i < screenNames.length; i++) {
- TwitterFollowingProviderTask providerTask = new TwitterFollowingProviderTask(this, client, screenNames[i], getConfig().getEndpoint(), getConfig().getIdsOnly());
+ TwitterFollowingProviderTask providerTask = new TwitterFollowingProviderTask(this, client, screenNames[i]);
executor.submit(providerTask);
}
@@ -106,7 +106,7 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider {
@Override
public StreamsResultSet readCurrent() {
- LOGGER.debug("Providing {} docs", providerQueue.size());
+ LOGGER.info("{}{} - readCurrent", idsBatches, screenNameBatches);
StreamsResultSet result;
@@ -115,12 +115,13 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider {
result = new StreamsResultSet(providerQueue);
result.setCounter(new DatumStatusCounter());
providerQueue = constructQueue();
+ LOGGER.debug("{}{} - providing {} docs", idsBatches, screenNameBatches, result.size());
} finally {
lock.writeLock().unlock();
}
if (providerQueue.isEmpty() && executor.isTerminated()) {
- LOGGER.info("Finished. Cleaning up...");
+ LOGGER.info("{}{} - completed", idsBatches, screenNameBatches);
running.set(false);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9495cf52/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
index cc71d48..f2346fb 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
@@ -44,27 +44,20 @@ public class TwitterFollowingProviderTask implements Runnable {
protected TwitterFollowingProvider provider;
protected Twitter client;
protected Long id;
- protected Boolean idsOnly;
protected String screenName;
- protected String endpoint;
- private int max_per_page = 200;
int count = 0;
- public TwitterFollowingProviderTask(TwitterFollowingProvider provider, Twitter twitter, Long id, String endpoint, Boolean idsOnly) {
+ public TwitterFollowingProviderTask(TwitterFollowingProvider provider, Twitter twitter, Long id) {
this.provider = provider;
this.client = twitter;
this.id = id;
- this.endpoint = endpoint;
- this.idsOnly = idsOnly;
}
- public TwitterFollowingProviderTask(TwitterFollowingProvider provider, Twitter twitter, String screenName, String endpoint, Boolean idsOnly) {
+ public TwitterFollowingProviderTask(TwitterFollowingProvider provider, Twitter twitter, String screenName) {
this.provider = provider;
this.client = twitter;
this.screenName = screenName;
- this.endpoint = endpoint;
- this.idsOnly = idsOnly;
}
@@ -84,9 +77,9 @@ public class TwitterFollowingProviderTask implements Runnable {
protected void getFollowing(Long id) {
- Preconditions.checkArgument(endpoint.equals("friends") || endpoint.equals("followers"));
+ Preconditions.checkArgument(provider.getConfig().getEndpoint().equals("friends") || provider.getConfig().getEndpoint().equals("followers"));
- if( idsOnly )
+ if( provider.getConfig().getIdsOnly() )
collectIds(id);
else
collectUsers(id);
@@ -112,10 +105,10 @@ public class TwitterFollowingProviderTask implements Runnable {
}
PagableResponseList<twitter4j.User> list = null;
- if( endpoint.equals("followers") )
- list = client.friendsFollowers().getFollowersList(id.longValue(), curser, max_per_page);
- else if( endpoint.equals("friends") )
- list = client.friendsFollowers().getFriendsList(id.longValue(), curser, max_per_page);
+ if( provider.getConfig().getEndpoint().equals("followers") )
+ list = client.friendsFollowers().getFollowersList(id.longValue(), curser, provider.getConfig().getMaxItems().intValue());
+ else if( provider.getConfig().getEndpoint().equals("friends") )
+ list = client.friendsFollowers().getFriendsList(id.longValue(), curser, provider.getConfig().getMaxItems().intValue());
Preconditions.checkNotNull(list);
Preconditions.checkArgument(list.size() > 0);
@@ -126,11 +119,11 @@ public class TwitterFollowingProviderTask implements Runnable {
try {
Follow follow = null;
- if( endpoint.equals("followers") ) {
+ if( provider.getConfig().getEndpoint().equals("followers") ) {
follow = new Follow()
.withFollowee(mapper.readValue(userJson, User.class))
.withFollower(mapper.readValue(otherJson, User.class));
- } else if( endpoint.equals("friends") ) {
+ } else if( provider.getConfig().getEndpoint().equals("friends") ) {
follow = new Follow()
.withFollowee(mapper.readValue(otherJson, User.class))
.withFollower(mapper.readValue(userJson, User.class));
@@ -147,9 +140,9 @@ public class TwitterFollowingProviderTask implements Runnable {
LOGGER.warn("Exception: {}", e);
}
}
- if( list.size() == max_per_page )
- curser = list.getNextCursor();
- else break;
+ if( !list.hasNext() ) break;
+ if( list.getNextCursor() == 0 ) break;
+ curser = list.getNextCursor();
}
catch(TwitterException twitterException) {
keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
@@ -170,10 +163,10 @@ public class TwitterFollowingProviderTask implements Runnable {
try
{
twitter4j.IDs ids = null;
- if( endpoint.equals("followers") )
- ids = client.friendsFollowers().getFollowersIDs(id.longValue(), curser, max_per_page);
- else if( endpoint.equals("friends") )
- ids = client.friendsFollowers().getFriendsIDs(id.longValue(), curser, max_per_page);
+ if( provider.getConfig().getEndpoint().equals("followers") )
+ ids = client.friendsFollowers().getFollowersIDs(id.longValue(), curser, provider.getConfig().getMaxItems().intValue());
+ else if( provider.getConfig().getEndpoint().equals("friends") )
+ ids = client.friendsFollowers().getFriendsIDs(id.longValue(), curser, provider.getConfig().getMaxItems().intValue());
Preconditions.checkNotNull(ids);
Preconditions.checkArgument(ids.getIDs().length > 0);
@@ -182,16 +175,15 @@ public class TwitterFollowingProviderTask implements Runnable {
try {
Follow follow = null;
- if( endpoint.equals("followers") ) {
+ if( provider.getConfig().getEndpoint().equals("followers") ) {
follow = new Follow()
.withFollowee(new User().withId(id))
.withFollower(new User().withId(otherId));
- } else if( endpoint.equals("friends") ) {
+ } else if( provider.getConfig().getEndpoint().equals("friends") ) {
follow = new Follow()
.withFollowee(new User().withId(otherId))
.withFollower(new User().withId(id));
}
- ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue);
Preconditions.checkNotNull(follow);
@@ -203,9 +195,9 @@ public class TwitterFollowingProviderTask implements Runnable {
LOGGER.warn("Exception: {}", e);
}
}
- if( ids.hasNext() )
- curser = ids.getNextCursor();
- else break;
+ if( !ids.hasNext() ) break;
+ if( ids.getNextCursor() == 0 ) break;
+ curser = ids.getNextCursor();
}
catch(TwitterException twitterException) {
keepTrying += TwitterErrorHandler.handleTwitterError(client, id, twitterException);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9495cf52/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProcessor.java
index f0690f8..8ea65eb 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProcessor.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProcessor.java
@@ -88,8 +88,9 @@ public class TwitterStreamProcessor extends StringDelimitedProcessor {
@Override
public List<StreamsDatum> call() throws Exception {
if(item != null) {
- ObjectNode objectNode = (ObjectNode) mapper.readTree(item);
- StreamsDatum rawDatum = new StreamsDatum(objectNode);
+ Class itemClass = TwitterEventClassifier.detectClass(item);
+ Object document = mapper.readValue(item, itemClass);
+ StreamsDatum rawDatum = new StreamsDatum(document);
return Lists.newArrayList(rawDatum);
}
return Lists.newArrayList();
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9495cf52/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
index 78eb3e6..4231f56 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
@@ -197,7 +197,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
public StreamsResultSet readCurrent() {
- LOGGER.info("{}{} - readCurrent", idsBatches, screenNameBatches);
+ LOGGER.debug("{}{} - readCurrent", idsBatches, screenNameBatches);
StreamsResultSet result;
@@ -206,7 +206,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
result = new StreamsResultSet(providerQueue);
result.setCounter(new DatumStatusCounter());
providerQueue = constructQueue();
- LOGGER.info("{}{} - providing {} docs", idsBatches, screenNameBatches, result.size());
+ LOGGER.debug("{}{} - providing {} docs", idsBatches, screenNameBatches, result.size());
} finally {
lock.writeLock().unlock();
}
@@ -215,6 +215,8 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
LOGGER.info("{}{} - completed", idsBatches, screenNameBatches);
running.set(false);
+
+ LOGGER.info("Exiting");
}
return result;
[5/8] incubator-streams git commit: per PR feedback, don’t use import .*;
Posted by sb...@apache.org.
per PR feedback, don\u2019t use import .*;
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/f1540b15
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/f1540b15
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/f1540b15
Branch: refs/heads/master
Commit: f1540b15c2dcbb6c99cd9be90cbe2626b8ef49fc
Parents: 9495cf5
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Wed Oct 5 18:06:00 2016 -0500
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Wed Oct 5 18:06:00 2016 -0500
----------------------------------------------------------------------
.../TwitterJsonDeleteActivityConverter.java | 2 +-
.../TwitterJsonRetweetActivityConverter.java | 2 +-
.../TwitterJsonTweetActivityConverter.java | 2 +-
...terJsonUserstreameventActivityConverter.java | 3 ++-
.../FetchAndReplaceTwitterProcessor.java | 11 ++++++---
.../processor/TwitterEventProcessor.java | 13 +----------
.../twitter/provider/TwitterConfigurator.java | 8 +++----
.../provider/TwitterEventClassifier.java | 7 +++++-
.../twitter/provider/TwitterStreamProvider.java | 24 +++++++++++++++-----
.../provider/TwitterTimelineProvider.java | 16 +++++++++----
.../provider/TwitterTimelineProviderTask.java | 6 +++--
.../TwitterUserInformationProvider.java | 7 ++++--
.../test/TwitterDocumentClassifierTest.java | 6 ++++-
.../twitter/test/TwitterObjectMapperIT.java | 5 ++--
14 files changed, 69 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1540b15/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonDeleteActivityConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonDeleteActivityConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonDeleteActivityConverter.java
index eb35b71..3e61ef9 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonDeleteActivityConverter.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonDeleteActivityConverter.java
@@ -30,7 +30,7 @@ import org.apache.streams.twitter.pojo.Tweet;
import java.io.Serializable;
import java.util.List;
-import static org.apache.streams.twitter.converter.util.TwitterActivityUtil.*;
+import static org.apache.streams.twitter.converter.util.TwitterActivityUtil.updateActivity;
/**
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1540b15/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonRetweetActivityConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonRetweetActivityConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonRetweetActivityConverter.java
index 22a4a58..30a1916 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonRetweetActivityConverter.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonRetweetActivityConverter.java
@@ -28,7 +28,7 @@ import org.apache.streams.twitter.pojo.Retweet;
import java.io.Serializable;
import java.util.List;
-import static org.apache.streams.twitter.converter.util.TwitterActivityUtil.*;
+import static org.apache.streams.twitter.converter.util.TwitterActivityUtil.updateActivity;
public class TwitterJsonRetweetActivityConverter implements ActivityConverter<Retweet>, Serializable {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1540b15/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonTweetActivityConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonTweetActivityConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonTweetActivityConverter.java
index 0b9f0ec..0997a7f 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonTweetActivityConverter.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonTweetActivityConverter.java
@@ -28,7 +28,7 @@ import org.apache.streams.twitter.pojo.Tweet;
import java.io.Serializable;
import java.util.List;
-import static org.apache.streams.twitter.converter.util.TwitterActivityUtil.*;
+import static org.apache.streams.twitter.converter.util.TwitterActivityUtil.updateActivity;
public class TwitterJsonTweetActivityConverter implements ActivityConverter<Tweet>, Serializable {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1540b15/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserstreameventActivityConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserstreameventActivityConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserstreameventActivityConverter.java
index 357c41c..b3647fa 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserstreameventActivityConverter.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserstreameventActivityConverter.java
@@ -30,7 +30,8 @@ import org.apache.streams.twitter.pojo.UserstreamEvent;
import java.util.List;
-import static org.apache.streams.twitter.converter.util.TwitterActivityUtil.*;
+import static org.apache.streams.twitter.converter.util.TwitterActivityUtil.formatId;
+import static org.apache.streams.twitter.converter.util.TwitterActivityUtil.getProvider;
/**
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1540b15/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java
index 8c5f55b..8330167 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java
@@ -28,21 +28,26 @@ import org.apache.streams.exceptions.ActivityConversionException;
import org.apache.streams.pojo.json.Activity;
import org.apache.streams.twitter.TwitterConfiguration;
import org.apache.streams.twitter.TwitterStreamConfiguration;
+import org.apache.streams.twitter.converter.StreamsTwitterMapper;
import org.apache.streams.twitter.pojo.Delete;
import org.apache.streams.twitter.pojo.Retweet;
import org.apache.streams.twitter.pojo.Tweet;
import org.apache.streams.twitter.provider.TwitterConfigurator;
import org.apache.streams.twitter.provider.TwitterEventClassifier;
-import org.apache.streams.twitter.converter.StreamsTwitterMapper;
import org.apache.streams.twitter.provider.TwitterProviderUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import twitter4j.*;
+import twitter4j.Status;
+import twitter4j.Twitter;
+import twitter4j.TwitterException;
+import twitter4j.TwitterFactory;
+import twitter4j.TwitterObjectFactory;
import twitter4j.conf.ConfigurationBuilder;
import java.util.List;
-import static org.apache.streams.twitter.converter.util.TwitterActivityUtil.*;
+import static org.apache.streams.twitter.converter.util.TwitterActivityUtil.getProvider;
+import static org.apache.streams.twitter.converter.util.TwitterActivityUtil.updateActivity;
/**
* Given an Activity, fetches the tweet by the activity object id and replaces the existing activity with the converted activity
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1540b15/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
index 45f84ff..ed6b90a 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
@@ -18,27 +18,16 @@
package org.apache.streams.twitter.processor;
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
-import org.apache.commons.lang3.StringUtils;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.exceptions.ActivityConversionException;
import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.twitter.converter.*;
+import org.apache.streams.twitter.converter.StreamsTwitterMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
/**
* This class performs conversion of a twitter event to a specified outClass
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1540b15/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterConfigurator.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterConfigurator.java
index 370960b..4b7b3ef 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterConfigurator.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterConfigurator.java
@@ -19,17 +19,15 @@
package org.apache.streams.twitter.provider;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Lists;
import com.typesafe.config.Config;
-import com.typesafe.config.ConfigException;
import com.typesafe.config.ConfigRenderOptions;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.twitter.*;
+import org.apache.streams.twitter.TwitterConfiguration;
+import org.apache.streams.twitter.TwitterStreamConfiguration;
+import org.apache.streams.twitter.TwitterUserInformationConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.List;
/**
* This class resolves TwitterConfiguration from typesafe config
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1540b15/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java
index f54dd0a..9466c2e 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java
@@ -24,8 +24,13 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.commons.lang.StringUtils;
import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.twitter.pojo.*;
import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
+import org.apache.streams.twitter.pojo.Delete;
+import org.apache.streams.twitter.pojo.FriendList;
+import org.apache.streams.twitter.pojo.Retweet;
+import org.apache.streams.twitter.pojo.Tweet;
+import org.apache.streams.twitter.pojo.User;
+import org.apache.streams.twitter.pojo.UserstreamEvent;
import java.io.IOException;
import java.io.Serializable;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1540b15/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
index 30e2b56..f584950 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
@@ -18,7 +18,6 @@
package org.apache.streams.twitter.provider;
-import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
@@ -26,7 +25,11 @@ import com.twitter.hbc.ClientBuilder;
import com.twitter.hbc.core.Constants;
import com.twitter.hbc.core.Hosts;
import com.twitter.hbc.core.HttpHosts;
-import com.twitter.hbc.core.endpoint.*;
+import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
+import com.twitter.hbc.core.endpoint.StatusesFirehoseEndpoint;
+import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint;
+import com.twitter.hbc.core.endpoint.StreamingEndpoint;
+import com.twitter.hbc.core.endpoint.UserstreamEndpoint;
import com.twitter.hbc.httpclient.BasicClient;
import com.twitter.hbc.httpclient.auth.Authentication;
import com.twitter.hbc.httpclient.auth.BasicAuth;
@@ -34,7 +37,12 @@ import com.twitter.hbc.httpclient.auth.OAuth1;
import com.typesafe.config.Config;
import org.apache.commons.lang.NotImplementedException;
import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.*;
+import org.apache.streams.core.DatumStatus;
+import org.apache.streams.core.DatumStatusCountable;
+import org.apache.streams.core.DatumStatusCounter;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
import org.apache.streams.twitter.TwitterStreamConfiguration;
import org.apache.streams.util.ComponentUtils;
import org.joda.time.DateTime;
@@ -45,10 +53,14 @@ import java.io.Serializable;
import java.math.BigInteger;
import java.util.List;
import java.util.Queue;
-import java.util.concurrent.*;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* TwitterStreamProvider wraps a hosebird client and passes recieved documents
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1540b15/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index 61cddaf..2924623 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -22,7 +22,6 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
-import com.google.common.collect.Queues;
import com.google.common.util.concurrent.Uninterruptibles;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
@@ -38,22 +37,29 @@ import org.apache.streams.core.StreamsResultSet;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.twitter.TwitterUserInformationConfiguration;
import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
-import org.apache.streams.twitter.pojo.Tweet;
import org.apache.streams.util.ComponentUtils;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import twitter4j.*;
+import twitter4j.Status;
+import twitter4j.Twitter;
+import twitter4j.TwitterException;
+import twitter4j.TwitterFactory;
+import twitter4j.User;
import twitter4j.conf.ConfigurationBuilder;
import java.io.BufferedOutputStream;
import java.io.File;
-import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.io.Serializable;
import java.math.BigInteger;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1540b15/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
index b8d5e1d..111d213 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
@@ -23,12 +23,14 @@ import com.google.common.collect.Lists;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
-import org.apache.streams.twitter.pojo.*;
import org.apache.streams.util.ComponentUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import twitter4j.*;
+import twitter4j.Paging;
import twitter4j.Status;
+import twitter4j.Twitter;
+import twitter4j.TwitterException;
+import twitter4j.TwitterObjectFactory;
import java.util.List;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1540b15/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
index 4231f56..44f8a24 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
@@ -21,7 +21,6 @@ package org.apache.streams.twitter.provider;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
-import com.google.common.collect.Queues;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.commons.lang.NotImplementedException;
@@ -52,7 +51,11 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
-import java.util.concurrent.*;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1540b15/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterDocumentClassifierTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterDocumentClassifierTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterDocumentClassifierTest.java
index 1bf3691..044fe3c 100644
--- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterDocumentClassifierTest.java
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterDocumentClassifierTest.java
@@ -19,7 +19,11 @@
package org.apache.streams.twitter.test;
import org.apache.streams.twitter.converter.TwitterDocumentClassifier;
-import org.apache.streams.twitter.pojo.*;
+import org.apache.streams.twitter.pojo.Delete;
+import org.apache.streams.twitter.pojo.Follow;
+import org.apache.streams.twitter.pojo.Retweet;
+import org.apache.streams.twitter.pojo.Tweet;
+import org.apache.streams.twitter.pojo.User;
import org.junit.Assert;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1540b15/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterObjectMapperIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterObjectMapperIT.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterObjectMapperIT.java
index 4da2af2..e8bbf49 100644
--- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterObjectMapperIT.java
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterObjectMapperIT.java
@@ -31,7 +31,6 @@ import org.apache.streams.twitter.pojo.Delete;
import org.apache.streams.twitter.pojo.Retweet;
import org.apache.streams.twitter.pojo.Tweet;
import org.junit.Assert;
-import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,7 +39,9 @@ import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
-import static org.hamcrest.CoreMatchers.*;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertThat;