You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/04/03 22:52:27 UTC
git commit: Simplified twitter provider options Userstreams are
connecting need documentation on creating oauth credentials authorized for
specific user stream
Repository: incubator-streams
Updated Branches:
refs/heads/springcleaning cc9bc0468 -> 4a2ca2d7f
Simplified twitter provider options
Userstreams are connecting
need documentation on creating oauth credentials authorized for specific user stream
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/4a2ca2d7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/4a2ca2d7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/4a2ca2d7
Branch: refs/heads/springcleaning
Commit: 4a2ca2d7f7ee2110d546b95104ad4b31416d49dd
Parents: cc9bc04
Author: sblackmon <sb...@w2odigital.com>
Authored: Thu Apr 3 14:03:34 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Thu Apr 3 14:03:34 2014 -0500
----------------------------------------------------------------------
.../provider/TwitterStreamConfigurator.java | 7 +--
.../twitter/provider/TwitterStreamProvider.java | 65 ++++++++++++--------
.../provider/TwitterTimelineProvider.java | 5 +-
.../src/main/resources/reference.conf | 6 +-
streams-runtimes/streams-runtime-local/pom.xml | 7 ++-
5 files changed, 49 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4a2ca2d7/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java
index b1a1a07..9bf2d9a 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java
@@ -22,10 +22,8 @@ public class TwitterStreamConfigurator {
public static TwitterStreamConfiguration detectConfiguration(Config twitter) {
TwitterStreamConfiguration twitterStreamConfiguration = new TwitterStreamConfiguration();
- twitterStreamConfiguration.setProtocol(twitter.getString("protocol"));
- twitterStreamConfiguration.setHost(twitter.getString("host"));
- twitterStreamConfiguration.setPort(twitter.getLong("port"));
- twitterStreamConfiguration.setVersion(twitter.getString("version"));
+
+ twitterStreamConfiguration.setEndpoint(twitter.getString("endpoint"));
try {
Config basicauth = StreamsConfigurator.config.getConfig("twitter.basicauth");
@@ -57,7 +55,6 @@ public class TwitterStreamConfigurator {
} catch( ConfigException ce ) {}
twitterStreamConfiguration.setFilterLevel(twitter.getString("filter-level"));
- twitterStreamConfiguration.setEndpoint(twitter.getString("endpoint"));
twitterStreamConfiguration.setWith(twitter.getString("with"));
twitterStreamConfiguration.setReplies(twitter.getString("replies"));
twitterStreamConfiguration.setJsonStoreEnabled("true");
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4a2ca2d7/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
index 2b8a2f1..39e1ad5 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
@@ -10,10 +10,9 @@ import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.twitter.hbc.ClientBuilder;
import com.twitter.hbc.core.Constants;
-import com.twitter.hbc.core.endpoint.StatusesFirehoseEndpoint;
-import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint;
-import com.twitter.hbc.core.endpoint.StreamingEndpoint;
-import com.twitter.hbc.core.endpoint.UserstreamEndpoint;
+import com.twitter.hbc.core.Hosts;
+import com.twitter.hbc.core.HttpHosts;
+import com.twitter.hbc.core.endpoint.*;
import com.twitter.hbc.core.processor.StringDelimitedProcessor;
import com.twitter.hbc.httpclient.BasicClient;
import com.twitter.hbc.httpclient.auth.Authentication;
@@ -62,6 +61,8 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable {
protected volatile Queue<StreamsDatum> providerQueue;
+ protected Hosts hosebirdHosts;
+ protected Authentication auth;
protected StreamingEndpoint endpoint;
protected BasicClient client;
@@ -137,32 +138,42 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable {
Preconditions.checkNotNull(config.getEndpoint());
- if(config.getEndpoint().endsWith("user.json") ) {
- endpoint = new UserstreamEndpoint();
+ if(config.getEndpoint().equals("userstream") ) {
- Optional<String> with = Optional.fromNullable(config.getWith());
- Optional<String> replies = Optional.fromNullable(config.getReplies());
-
- if( with.isPresent() ) endpoint.addPostParameter("with", with.get());
- if( replies.isPresent() ) endpoint.addPostParameter("replies", replies.get());
+ hosebirdHosts = new HttpHosts(Constants.USERSTREAM_HOST);
+ UserstreamEndpoint userstreamEndpoint = new UserstreamEndpoint();
+ userstreamEndpoint.withFollowings(true);
+ userstreamEndpoint.withUser(false);
+ userstreamEndpoint.allReplies(false);
+ endpoint = userstreamEndpoint;
}
- else if(config.getEndpoint().endsWith("sample.json") ) {
- endpoint = new StatusesSampleEndpoint();
+ else if(config.getEndpoint().equals("sample") ) {
+
+ hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
Optional<List<String>> track = Optional.fromNullable(config.getTrack());
Optional<List<Long>> follow = Optional.fromNullable(config.getFollow());
- if( track.isPresent() ) endpoint.addPostParameter("track", Joiner.on(",").join(track.get()));
- if( follow.isPresent() ) endpoint.addPostParameter("follow", Joiner.on(",").join(follow.get()));
+ if( track.isPresent() || follow.isPresent() ) {
+ StatusesFilterEndpoint statusesFilterEndpoint = new StatusesFilterEndpoint();
+ if( track.isPresent() )
+ statusesFilterEndpoint.trackTerms(track.get());
+ if( follow.isPresent() )
+ statusesFilterEndpoint.followings(follow.get());
+ } else {
+ endpoint = new StatusesSampleEndpoint();
+ }
}
- else if( config.getEndpoint().endsWith("firehose.json"))
+ else if( config.getEndpoint().endsWith("firehose")) {
+ hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
endpoint = new StatusesFirehoseEndpoint();
- else
+ } else {
+ LOGGER.error("NO ENDPOINT RESOLVED");
return;
+ }
- Authentication auth;
if( config.getBasicauth() != null ) {
Preconditions.checkNotNull(config.getBasicauth().getUsername());
@@ -172,6 +183,7 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable {
config.getBasicauth().getUsername(),
config.getBasicauth().getPassword()
);
+
} else if( config.getOauth() != null ) {
Preconditions.checkNotNull(config.getOauth().getConsumerKey());
@@ -183,23 +195,24 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable {
config.getOauth().getConsumerSecret(),
config.getOauth().getAccessToken(),
config.getOauth().getAccessTokenSecret());
+
} else {
+ LOGGER.error("NO AUTH RESOLVED");
return;
}
-
client = new ClientBuilder()
- .name("apache/streams/streams-contrib/streams-provider-twitter")
- .hosts(config.getProtocol() + "://" + config.getHost())
- .endpoint(endpoint)
- .authentication(auth)
- .processor(new StringDelimitedProcessor(inQueue))
- .build();
+ .name("apache/streams/streams-contrib/streams-provider-twitter")
+ .hosts(hosebirdHosts)
+ .endpoint(endpoint)
+ .authentication(auth)
+ .processor(new StringDelimitedProcessor(inQueue))
+ .build();
}
@Override
public void cleanUp() {
- for (int i = 0; i < 10; i++) {
+ for (int i = 0; i < 5; i++) {
inQueue.add(TwitterEventProcessor.TERMINATE);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4a2ca2d7/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index 4133d13..b9551ad 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -254,15 +254,12 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
Preconditions.checkNotNull(config.getFollow());
- Preconditions.checkArgument(config.getHost().equals("api.twitter.com"));
- Preconditions.checkArgument(config.getEndpoint().equals("statuses/user_timeline"));
-
Boolean jsonStoreEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getJsonStoreEnabled()))).or(true);
Boolean includeEntitiesEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getIncludeEntities()))).or(true);
ids = config.getFollow().iterator();
- String baseUrl = config.getProtocol() + "://" + config.getHost() + ":" + config.getPort() + "/" + config.getVersion() + "/";
+ String baseUrl = "https://api.twitter.com:443/1.1/";
ConfigurationBuilder builder = new ConfigurationBuilder()
.setOAuthConsumerKey(config.getOauth().getConsumerKey())
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4a2ca2d7/streams-contrib/streams-provider-twitter/src/main/resources/reference.conf
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/resources/reference.conf b/streams-contrib/streams-provider-twitter/src/main/resources/reference.conf
index d437db8..680f313 100644
--- a/streams-contrib/streams-provider-twitter/src/main/resources/reference.conf
+++ b/streams-contrib/streams-provider-twitter/src/main/resources/reference.conf
@@ -1,9 +1,5 @@
twitter {
- protocol = "https"
- host = "stream.twitter.com"
- port = 443
- version = "1.1"
- endpoint = "statuses/sample.json"
+ endpoint = "sample"
filter-level = "none"
oauth {
appName = "Apache Streams"
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4a2ca2d7/streams-runtimes/streams-runtime-local/pom.xml
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/pom.xml b/streams-runtimes/streams-runtime-local/pom.xml
index ee76b6b..b7ddb9a 100644
--- a/streams-runtimes/streams-runtime-local/pom.xml
+++ b/streams-runtimes/streams-runtime-local/pom.xml
@@ -71,6 +71,12 @@
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
</dependency>
+ <!-- This ensures slf4j-log4j12 is not packaged in implementations -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
@@ -97,7 +103,6 @@
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
- <version>1.8</version>
<executions>
<execution>
<id>add-source</id>