You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/05/05 20:54:18 UTC

[31/52] [abbrv] git commit: Simplified twitter provider options Userstreams are connecting need documentation on creating oauth credentials authorized for specific user stream

Simplified twitter provider options
Userstreams are connecting
need documentation on creating oauth credentials authorized for specific user stream


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/4a2ca2d7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/4a2ca2d7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/4a2ca2d7

Branch: refs/heads/sblackmon
Commit: 4a2ca2d7f7ee2110d546b95104ad4b31416d49dd
Parents: cc9bc04
Author: sblackmon <sb...@w2odigital.com>
Authored: Thu Apr 3 14:03:34 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Thu Apr 3 14:03:34 2014 -0500

----------------------------------------------------------------------
 .../provider/TwitterStreamConfigurator.java     |  7 +--
 .../twitter/provider/TwitterStreamProvider.java | 65 ++++++++++++--------
 .../provider/TwitterTimelineProvider.java       |  5 +-
 .../src/main/resources/reference.conf           |  6 +-
 streams-runtimes/streams-runtime-local/pom.xml  |  7 ++-
 5 files changed, 49 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4a2ca2d7/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java
index b1a1a07..9bf2d9a 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java
@@ -22,10 +22,8 @@ public class TwitterStreamConfigurator {
     public static TwitterStreamConfiguration detectConfiguration(Config twitter) {
 
         TwitterStreamConfiguration twitterStreamConfiguration = new TwitterStreamConfiguration();
-        twitterStreamConfiguration.setProtocol(twitter.getString("protocol"));
-        twitterStreamConfiguration.setHost(twitter.getString("host"));
-        twitterStreamConfiguration.setPort(twitter.getLong("port"));
-        twitterStreamConfiguration.setVersion(twitter.getString("version"));
+
+        twitterStreamConfiguration.setEndpoint(twitter.getString("endpoint"));
 
         try {
             Config basicauth = StreamsConfigurator.config.getConfig("twitter.basicauth");
@@ -57,7 +55,6 @@ public class TwitterStreamConfigurator {
         } catch( ConfigException ce ) {}
 
         twitterStreamConfiguration.setFilterLevel(twitter.getString("filter-level"));
-        twitterStreamConfiguration.setEndpoint(twitter.getString("endpoint"));
         twitterStreamConfiguration.setWith(twitter.getString("with"));
         twitterStreamConfiguration.setReplies(twitter.getString("replies"));
         twitterStreamConfiguration.setJsonStoreEnabled("true");

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4a2ca2d7/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
index 2b8a2f1..39e1ad5 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
@@ -10,10 +10,9 @@ import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.twitter.hbc.ClientBuilder;
 import com.twitter.hbc.core.Constants;
-import com.twitter.hbc.core.endpoint.StatusesFirehoseEndpoint;
-import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint;
-import com.twitter.hbc.core.endpoint.StreamingEndpoint;
-import com.twitter.hbc.core.endpoint.UserstreamEndpoint;
+import com.twitter.hbc.core.Hosts;
+import com.twitter.hbc.core.HttpHosts;
+import com.twitter.hbc.core.endpoint.*;
 import com.twitter.hbc.core.processor.StringDelimitedProcessor;
 import com.twitter.hbc.httpclient.BasicClient;
 import com.twitter.hbc.httpclient.auth.Authentication;
@@ -62,6 +61,8 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable {
 
     protected volatile Queue<StreamsDatum> providerQueue;
 
+    protected Hosts hosebirdHosts;
+    protected Authentication auth;
     protected StreamingEndpoint endpoint;
     protected BasicClient client;
 
@@ -137,32 +138,42 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable {
 
         Preconditions.checkNotNull(config.getEndpoint());
 
-        if(config.getEndpoint().endsWith("user.json") ) {
-            endpoint = new UserstreamEndpoint();
+        if(config.getEndpoint().equals("userstream") ) {
 
-            Optional<String> with = Optional.fromNullable(config.getWith());
-            Optional<String> replies = Optional.fromNullable(config.getReplies());
-
-            if( with.isPresent() ) endpoint.addPostParameter("with", with.get());
-            if( replies.isPresent() ) endpoint.addPostParameter("replies", replies.get());
+            hosebirdHosts = new HttpHosts(Constants.USERSTREAM_HOST);
 
+            UserstreamEndpoint userstreamEndpoint = new UserstreamEndpoint();
+            userstreamEndpoint.withFollowings(true);
+            userstreamEndpoint.withUser(false);
+            userstreamEndpoint.allReplies(false);
+            endpoint = userstreamEndpoint;
         }
-        else if(config.getEndpoint().endsWith("sample.json") ) {
-            endpoint = new StatusesSampleEndpoint();
+        else if(config.getEndpoint().equals("sample") ) {
+
+            hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
 
             Optional<List<String>> track = Optional.fromNullable(config.getTrack());
             Optional<List<Long>> follow = Optional.fromNullable(config.getFollow());
 
-            if( track.isPresent() ) endpoint.addPostParameter("track", Joiner.on(",").join(track.get()));
-            if( follow.isPresent() ) endpoint.addPostParameter("follow", Joiner.on(",").join(follow.get()));
+            if( track.isPresent() || follow.isPresent() ) {
+                StatusesFilterEndpoint statusesFilterEndpoint = new StatusesFilterEndpoint();
+                if( track.isPresent() )
+                    statusesFilterEndpoint.trackTerms(track.get());
+                if( follow.isPresent() )
+                    statusesFilterEndpoint.followings(follow.get());
+            } else {
+                endpoint = new StatusesSampleEndpoint();
+            }
 
         }
-        else if( config.getEndpoint().endsWith("firehose.json"))
+        else if( config.getEndpoint().endsWith("firehose")) {
+            hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
             endpoint = new StatusesFirehoseEndpoint();
-        else
+        } else {
+            LOGGER.error("NO ENDPOINT RESOLVED");
             return;
+        }
 
-        Authentication auth;
         if( config.getBasicauth() != null ) {
 
             Preconditions.checkNotNull(config.getBasicauth().getUsername());
@@ -172,6 +183,7 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable {
                     config.getBasicauth().getUsername(),
                     config.getBasicauth().getPassword()
             );
+
         } else if( config.getOauth() != null ) {
 
             Preconditions.checkNotNull(config.getOauth().getConsumerKey());
@@ -183,23 +195,24 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable {
                     config.getOauth().getConsumerSecret(),
                     config.getOauth().getAccessToken(),
                     config.getOauth().getAccessTokenSecret());
+
         } else {
+            LOGGER.error("NO AUTH RESOLVED");
             return;
         }
 
-
         client = new ClientBuilder()
-                .name("apache/streams/streams-contrib/streams-provider-twitter")
-                .hosts(config.getProtocol() + "://" + config.getHost())
-                .endpoint(endpoint)
-                .authentication(auth)
-                .processor(new StringDelimitedProcessor(inQueue))
-                .build();
+            .name("apache/streams/streams-contrib/streams-provider-twitter")
+            .hosts(hosebirdHosts)
+            .endpoint(endpoint)
+            .authentication(auth)
+            .processor(new StringDelimitedProcessor(inQueue))
+            .build();
     }
 
     @Override
     public void cleanUp() {
-        for (int i = 0; i < 10; i++) {
+        for (int i = 0; i < 5; i++) {
             inQueue.add(TwitterEventProcessor.TERMINATE);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4a2ca2d7/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index 4133d13..b9551ad 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -254,15 +254,12 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
 
         Preconditions.checkNotNull(config.getFollow());
 
-        Preconditions.checkArgument(config.getHost().equals("api.twitter.com"));
-        Preconditions.checkArgument(config.getEndpoint().equals("statuses/user_timeline"));
-
         Boolean jsonStoreEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getJsonStoreEnabled()))).or(true);
         Boolean includeEntitiesEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getIncludeEntities()))).or(true);
 
         ids = config.getFollow().iterator();
 
-        String baseUrl = config.getProtocol() + "://" + config.getHost() + ":" + config.getPort() + "/" + config.getVersion() + "/";
+        String baseUrl = "https://api.twitter.com:443/1.1/";
 
         ConfigurationBuilder builder = new ConfigurationBuilder()
                 .setOAuthConsumerKey(config.getOauth().getConsumerKey())

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4a2ca2d7/streams-contrib/streams-provider-twitter/src/main/resources/reference.conf
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/resources/reference.conf b/streams-contrib/streams-provider-twitter/src/main/resources/reference.conf
index d437db8..680f313 100644
--- a/streams-contrib/streams-provider-twitter/src/main/resources/reference.conf
+++ b/streams-contrib/streams-provider-twitter/src/main/resources/reference.conf
@@ -1,9 +1,5 @@
 twitter {
-    protocol = "https"
-    host = "stream.twitter.com"
-    port = 443
-    version = "1.1"
-    endpoint = "statuses/sample.json"
+    endpoint = "sample"
     filter-level = "none"
     oauth {
         appName = "Apache Streams"

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4a2ca2d7/streams-runtimes/streams-runtime-local/pom.xml
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/pom.xml b/streams-runtimes/streams-runtime-local/pom.xml
index ee76b6b..b7ddb9a 100644
--- a/streams-runtimes/streams-runtime-local/pom.xml
+++ b/streams-runtimes/streams-runtime-local/pom.xml
@@ -71,6 +71,12 @@
             <groupId>org.slf4j</groupId>
             <artifactId>log4j-over-slf4j</artifactId>
         </dependency>
+        <!-- This ensures slf4j-log4j12 is not packaged in implementations -->
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <scope>provided</scope>
+        </dependency>
         <dependency>
             <groupId>org.hamcrest</groupId>
             <artifactId>hamcrest-all</artifactId>
@@ -97,7 +103,6 @@
             <plugin>
                 <groupId>org.codehaus.mojo</groupId>
                 <artifactId>build-helper-maven-plugin</artifactId>
-                <version>1.8</version>
                 <executions>
                     <execution>
                         <id>add-source</id>