You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/04/03 22:52:27 UTC

git commit: Simplified twitter provider options Userstreams are connecting need documentation on creating oauth credentials authorized for specific user stream

Repository: incubator-streams
Updated Branches:
  refs/heads/springcleaning cc9bc0468 -> 4a2ca2d7f


Simplified twitter provider options
Userstreams are connecting
need documentation on creating oauth credentials authorized for specific user stream


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/4a2ca2d7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/4a2ca2d7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/4a2ca2d7

Branch: refs/heads/springcleaning
Commit: 4a2ca2d7f7ee2110d546b95104ad4b31416d49dd
Parents: cc9bc04
Author: sblackmon <sb...@w2odigital.com>
Authored: Thu Apr 3 14:03:34 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Thu Apr 3 14:03:34 2014 -0500

----------------------------------------------------------------------
 .../provider/TwitterStreamConfigurator.java     |  7 +--
 .../twitter/provider/TwitterStreamProvider.java | 65 ++++++++++++--------
 .../provider/TwitterTimelineProvider.java       |  5 +-
 .../src/main/resources/reference.conf           |  6 +-
 streams-runtimes/streams-runtime-local/pom.xml  |  7 ++-
 5 files changed, 49 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4a2ca2d7/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java
index b1a1a07..9bf2d9a 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java
@@ -22,10 +22,8 @@ public class TwitterStreamConfigurator {
     public static TwitterStreamConfiguration detectConfiguration(Config twitter) {
 
         TwitterStreamConfiguration twitterStreamConfiguration = new TwitterStreamConfiguration();
-        twitterStreamConfiguration.setProtocol(twitter.getString("protocol"));
-        twitterStreamConfiguration.setHost(twitter.getString("host"));
-        twitterStreamConfiguration.setPort(twitter.getLong("port"));
-        twitterStreamConfiguration.setVersion(twitter.getString("version"));
+
+        twitterStreamConfiguration.setEndpoint(twitter.getString("endpoint"));
 
         try {
             Config basicauth = StreamsConfigurator.config.getConfig("twitter.basicauth");
@@ -57,7 +55,6 @@ public class TwitterStreamConfigurator {
         } catch( ConfigException ce ) {}
 
         twitterStreamConfiguration.setFilterLevel(twitter.getString("filter-level"));
-        twitterStreamConfiguration.setEndpoint(twitter.getString("endpoint"));
         twitterStreamConfiguration.setWith(twitter.getString("with"));
         twitterStreamConfiguration.setReplies(twitter.getString("replies"));
         twitterStreamConfiguration.setJsonStoreEnabled("true");

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4a2ca2d7/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
index 2b8a2f1..39e1ad5 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
@@ -10,10 +10,9 @@ import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.twitter.hbc.ClientBuilder;
 import com.twitter.hbc.core.Constants;
-import com.twitter.hbc.core.endpoint.StatusesFirehoseEndpoint;
-import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint;
-import com.twitter.hbc.core.endpoint.StreamingEndpoint;
-import com.twitter.hbc.core.endpoint.UserstreamEndpoint;
+import com.twitter.hbc.core.Hosts;
+import com.twitter.hbc.core.HttpHosts;
+import com.twitter.hbc.core.endpoint.*;
 import com.twitter.hbc.core.processor.StringDelimitedProcessor;
 import com.twitter.hbc.httpclient.BasicClient;
 import com.twitter.hbc.httpclient.auth.Authentication;
@@ -62,6 +61,8 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable {
 
     protected volatile Queue<StreamsDatum> providerQueue;
 
+    protected Hosts hosebirdHosts;
+    protected Authentication auth;
     protected StreamingEndpoint endpoint;
     protected BasicClient client;
 
@@ -137,32 +138,42 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable {
 
         Preconditions.checkNotNull(config.getEndpoint());
 
-        if(config.getEndpoint().endsWith("user.json") ) {
-            endpoint = new UserstreamEndpoint();
+        if(config.getEndpoint().equals("userstream") ) {
 
-            Optional<String> with = Optional.fromNullable(config.getWith());
-            Optional<String> replies = Optional.fromNullable(config.getReplies());
-
-            if( with.isPresent() ) endpoint.addPostParameter("with", with.get());
-            if( replies.isPresent() ) endpoint.addPostParameter("replies", replies.get());
+            hosebirdHosts = new HttpHosts(Constants.USERSTREAM_HOST);
 
+            UserstreamEndpoint userstreamEndpoint = new UserstreamEndpoint();
+            userstreamEndpoint.withFollowings(true);
+            userstreamEndpoint.withUser(false);
+            userstreamEndpoint.allReplies(false);
+            endpoint = userstreamEndpoint;
         }
-        else if(config.getEndpoint().endsWith("sample.json") ) {
-            endpoint = new StatusesSampleEndpoint();
+        else if(config.getEndpoint().equals("sample") ) {
+
+            hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
 
             Optional<List<String>> track = Optional.fromNullable(config.getTrack());
             Optional<List<Long>> follow = Optional.fromNullable(config.getFollow());
 
-            if( track.isPresent() ) endpoint.addPostParameter("track", Joiner.on(",").join(track.get()));
-            if( follow.isPresent() ) endpoint.addPostParameter("follow", Joiner.on(",").join(follow.get()));
+            if( track.isPresent() || follow.isPresent() ) {
+                StatusesFilterEndpoint statusesFilterEndpoint = new StatusesFilterEndpoint();
+                if( track.isPresent() )
+                    statusesFilterEndpoint.trackTerms(track.get());
+                if( follow.isPresent() )
+                    statusesFilterEndpoint.followings(follow.get());
+            } else {
+                endpoint = new StatusesSampleEndpoint();
+            }
 
         }
-        else if( config.getEndpoint().endsWith("firehose.json"))
+        else if( config.getEndpoint().endsWith("firehose")) {
+            hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
             endpoint = new StatusesFirehoseEndpoint();
-        else
+        } else {
+            LOGGER.error("NO ENDPOINT RESOLVED");
             return;
+        }
 
-        Authentication auth;
         if( config.getBasicauth() != null ) {
 
             Preconditions.checkNotNull(config.getBasicauth().getUsername());
@@ -172,6 +183,7 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable {
                     config.getBasicauth().getUsername(),
                     config.getBasicauth().getPassword()
             );
+
         } else if( config.getOauth() != null ) {
 
             Preconditions.checkNotNull(config.getOauth().getConsumerKey());
@@ -183,23 +195,24 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable {
                     config.getOauth().getConsumerSecret(),
                     config.getOauth().getAccessToken(),
                     config.getOauth().getAccessTokenSecret());
+
         } else {
+            LOGGER.error("NO AUTH RESOLVED");
             return;
         }
 
-
         client = new ClientBuilder()
-                .name("apache/streams/streams-contrib/streams-provider-twitter")
-                .hosts(config.getProtocol() + "://" + config.getHost())
-                .endpoint(endpoint)
-                .authentication(auth)
-                .processor(new StringDelimitedProcessor(inQueue))
-                .build();
+            .name("apache/streams/streams-contrib/streams-provider-twitter")
+            .hosts(hosebirdHosts)
+            .endpoint(endpoint)
+            .authentication(auth)
+            .processor(new StringDelimitedProcessor(inQueue))
+            .build();
     }
 
     @Override
     public void cleanUp() {
-        for (int i = 0; i < 10; i++) {
+        for (int i = 0; i < 5; i++) {
             inQueue.add(TwitterEventProcessor.TERMINATE);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4a2ca2d7/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index 4133d13..b9551ad 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -254,15 +254,12 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
 
         Preconditions.checkNotNull(config.getFollow());
 
-        Preconditions.checkArgument(config.getHost().equals("api.twitter.com"));
-        Preconditions.checkArgument(config.getEndpoint().equals("statuses/user_timeline"));
-
         Boolean jsonStoreEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getJsonStoreEnabled()))).or(true);
         Boolean includeEntitiesEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getIncludeEntities()))).or(true);
 
         ids = config.getFollow().iterator();
 
-        String baseUrl = config.getProtocol() + "://" + config.getHost() + ":" + config.getPort() + "/" + config.getVersion() + "/";
+        String baseUrl = "https://api.twitter.com:443/1.1/";
 
         ConfigurationBuilder builder = new ConfigurationBuilder()
                 .setOAuthConsumerKey(config.getOauth().getConsumerKey())

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4a2ca2d7/streams-contrib/streams-provider-twitter/src/main/resources/reference.conf
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/resources/reference.conf b/streams-contrib/streams-provider-twitter/src/main/resources/reference.conf
index d437db8..680f313 100644
--- a/streams-contrib/streams-provider-twitter/src/main/resources/reference.conf
+++ b/streams-contrib/streams-provider-twitter/src/main/resources/reference.conf
@@ -1,9 +1,5 @@
 twitter {
-    protocol = "https"
-    host = "stream.twitter.com"
-    port = 443
-    version = "1.1"
-    endpoint = "statuses/sample.json"
+    endpoint = "sample"
     filter-level = "none"
     oauth {
         appName = "Apache Streams"

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4a2ca2d7/streams-runtimes/streams-runtime-local/pom.xml
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/pom.xml b/streams-runtimes/streams-runtime-local/pom.xml
index ee76b6b..b7ddb9a 100644
--- a/streams-runtimes/streams-runtime-local/pom.xml
+++ b/streams-runtimes/streams-runtime-local/pom.xml
@@ -71,6 +71,12 @@
             <groupId>org.slf4j</groupId>
             <artifactId>log4j-over-slf4j</artifactId>
         </dependency>
+        <!-- This ensures slf4j-log4j12 is not packaged in implementations -->
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <scope>provided</scope>
+        </dependency>
         <dependency>
             <groupId>org.hamcrest</groupId>
             <artifactId>hamcrest-all</artifactId>
@@ -97,7 +103,6 @@
             <plugin>
                 <groupId>org.codehaus.mojo</groupId>
                 <artifactId>build-helper-maven-plugin</artifactId>
-                <version>1.8</version>
                 <executions>
                     <execution>
                         <id>add-source</id>