You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2015/04/02 23:21:52 UTC
[1/9] incubator-streams git commit: STREAMS-235
Repository: incubator-streams
Updated Branches:
refs/heads/master 2b994de99 -> a71653cff
STREAMS-235
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/43a3818f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/43a3818f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/43a3818f
Branch: refs/heads/master
Commit: 43a3818f59c6d0ca3c37e11b92160b52327cd903
Parents: 5082690
Author: sblackmon <sb...@apache.org>
Authored: Tue Nov 25 14:14:14 2014 -0600
Committer: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Committed: Thu Mar 26 12:59:12 2015 -0500
----------------------------------------------------------------------
.../streams-provider-twitter/pom.xml | 1 +
.../provider/TwitterFollowersProviderTask.java | 120 ++++++++++++++++++
.../provider/TwitterFollowingProvider.java | 114 +++++++++++++++++
.../provider/TwitterFriendsProviderTask.java | 123 +++++++++++++++++++
.../TwitterUserInformationProvider.java | 7 +-
.../TwitterFollowActivitySerializer.java | 72 +++++++++++
.../src/main/jsonschema/com/twitter/Follow.json | 14 +++
7 files changed, 446 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/43a3818f/streams-contrib/streams-provider-twitter/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/pom.xml b/streams-contrib/streams-provider-twitter/pom.xml
index 0f9d361..26211d7 100644
--- a/streams-contrib/streams-provider-twitter/pom.xml
+++ b/streams-contrib/streams-provider-twitter/pom.xml
@@ -142,6 +142,7 @@
<sourcePath>src/main/jsonschema/com/twitter/TwitterUserInformationConfiguration.json</sourcePath>
<sourcePath>src/main/jsonschema/com/twitter/Delete.json</sourcePath>
<sourcePath>src/main/jsonschema/com/twitter/UserstreamEvent.json</sourcePath>
+ <sourcePath>src/main/jsonschema/com/twitter/Follow.json</sourcePath>
<sourcePath>src/main/jsonschema/com/twitter/FriendList.json</sourcePath>
<sourcePath>src/main/jsonschema/com/twitter/Retweet.json</sourcePath>
<sourcePath>src/main/jsonschema/com/twitter/tweet.json</sourcePath>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/43a3818f/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersProviderTask.java
new file mode 100644
index 0000000..c0f8e8a
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersProviderTask.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.provider;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.twitter.pojo.Follow;
+import org.apache.streams.twitter.pojo.User;
+import org.apache.streams.twitter.serializer.StreamsTwitterMapper;
+import org.apache.streams.util.ComponentUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import twitter4j.Twitter;
+import twitter4j.TwitterException;
+import twitter4j.TwitterObjectFactory;
+
+import java.io.IOException;
+
+/**
+ * Retrieve recent posts for a single user id.
+ */
+public class TwitterFollowersProviderTask implements Runnable {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(TwitterFollowersProviderTask.class);
+
+ private final static ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(StreamsTwitterMapper.TWITTER_FORMAT));
+
+ protected TwitterFollowingProvider provider;
+ protected Twitter client;
+ protected Long id;
+
+ public TwitterFollowersProviderTask(TwitterFollowingProvider provider, Twitter twitter, Long id) {
+ this.provider = provider;
+ this.client = twitter;
+ this.id = id;
+ }
+
+ @Override
+ public void run() {
+
+ getFollowers(id);
+
+ LOGGER.info(id + " Thread Finished");
+
+ }
+
+ protected void getFollowers(Long id) {
+
+ int keepTrying = 0;
+
+ long paging = 1;
+
+
+ // keep trying to load, give it 5 attempts.
+ //while (keepTrying < 10)
+ while (keepTrying < 5)
+ {
+ try
+ {
+ twitter4j.User followee4j;
+ String followeeJson;
+ try {
+ followee4j = client.users().showUser(id);
+ followeeJson = TwitterObjectFactory.getRawJSON(followee4j);
+ } catch (TwitterException e) {
+ LOGGER.error("Failure looking up " + id);
+ break;
+ }
+
+ for (twitter4j.User follower4j : client.friendsFollowers().getFollowersList(id.longValue(), paging)) {
+
+ String followerJson = TwitterObjectFactory.getRawJSON(follower4j);
+
+ try {
+ Follow follow = new Follow()
+ .withFollowee(mapper.readValue(followeeJson, User.class))
+ .withFollower(mapper.readValue(followerJson, User.class));
+
+ ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue);
+ } catch (JsonParseException e) {
+ LOGGER.warn(e.getMessage());
+ } catch (JsonMappingException e) {
+ LOGGER.warn(e.getMessage());
+ } catch (IOException e) {
+ LOGGER.warn(e.getMessage());
+ }
+ paging++;
+ keepTrying = 10;
+ }
+ }
+ catch(TwitterException twitterException) {
+ keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
+ }
+ catch(Exception e) {
+ keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/43a3818f/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
new file mode 100644
index 0000000..a4df278
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
@@ -0,0 +1,114 @@
+package org.apache.streams.twitter.provider;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Queues;
+import com.typesafe.config.Config;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.DatumStatusCounter;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.twitter.TwitterUserInformationConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import twitter4j.Twitter;
+
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * Created by sblackmon on 11/25/14.
+ */
+public class TwitterFollowingProvider extends TwitterUserInformationProvider {
+
+ public static final String STREAMS_ID = "TwitterFollowingProvider";
+ private static final Logger LOGGER = LoggerFactory.getLogger(TwitterFollowingProvider.class);
+
+ protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ public static final int MAX_NUMBER_WAITING = 10000;
+
+ public TwitterFollowingProvider() {
+ super(TwitterConfigurator.detectTwitterUserInformationConfiguration(StreamsConfigurator.config.getConfig("twitter")));
+ }
+
+ public TwitterFollowingProvider(TwitterUserInformationConfiguration config) {
+ super(config);
+ }
+
+ @Override
+ public void startStream() {
+
+ running.set(true);
+
+ Preconditions.checkArgument(idsBatches.hasNext());
+
+ LOGGER.info("startStream");
+
+ while (idsBatches.hasNext()) {
+ submitFollowingThreads(idsBatches.next());
+ }
+
+ running.set(true);
+
+ executor.shutdown();
+
+ }
+
+ protected void submitFollowingThreads(Long[] ids) {
+ Twitter client = getTwitterClient();
+
+ if( getConfig().getEndpoint().equals("friends") ) {
+ for (int i = 0; i < ids.length; i++) {
+ TwitterFriendsProviderTask providerTask = new TwitterFriendsProviderTask(this, client, ids[i]);
+ executor.submit(providerTask);
+ }
+ } else if( getConfig().getEndpoint().equals("followers") ) {
+ for (int i = 0; i < ids.length; i++) {
+ TwitterFollowersProviderTask providerTask = new TwitterFollowersProviderTask(this, client, ids[i]);
+ executor.submit(providerTask);
+ }
+ }
+ }
+
+ @Override
+ public StreamsResultSet readCurrent() {
+
+ LOGGER.info("Providing {} docs", providerQueue.size());
+
+ StreamsResultSet result;
+
+ try {
+ lock.writeLock().lock();
+ result = new StreamsResultSet(providerQueue);
+ result.setCounter(new DatumStatusCounter());
+ providerQueue = constructQueue();
+ } finally {
+ lock.writeLock().unlock();
+ }
+
+ if (providerQueue.isEmpty() && executor.isTerminated()) {
+ LOGGER.info("Finished. Cleaning up...");
+
+ running.set(false);
+
+ LOGGER.info("Exiting");
+ }
+
+ return result;
+
+ }
+
+ protected Queue<StreamsDatum> constructQueue() {
+ return Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(MAX_NUMBER_WAITING));
+ }
+
+ @Override
+ public void prepare(Object o) {
+ super.prepare(o);
+ Preconditions.checkNotNull(getConfig().getEndpoint());
+ Preconditions.checkArgument(getConfig().getEndpoint().equals("friends") || getConfig().getEndpoint().equals("followers"));
+ return;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/43a3818f/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsProviderTask.java
new file mode 100644
index 0000000..5d3a7d2
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsProviderTask.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.provider;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.twitter.pojo.Follow;
+import org.apache.streams.twitter.serializer.StreamsTwitterMapper;
+import org.apache.streams.util.ComponentUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import twitter4j.PagableResponseList;
+import twitter4j.Twitter;
+import twitter4j.TwitterException;
+import twitter4j.TwitterObjectFactory;
+import twitter4j.User;
+
+import java.io.IOException;
+
+/**
+ * Retrieve recent posts for a single user id.
+ */
+public class TwitterFriendsProviderTask implements Runnable {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(TwitterFriendsProviderTask.class);
+
+ private final static ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(StreamsTwitterMapper.TWITTER_FORMAT));
+
+ protected TwitterFollowingProvider provider;
+ protected Twitter client;
+ protected Long id;
+
+ public TwitterFriendsProviderTask(TwitterFollowingProvider provider, Twitter twitter, Long id) {
+ this.provider = provider;
+ this.client = twitter;
+ this.id = id;
+ }
+
+ @Override
+ public void run() {
+
+ getFriends(id);
+
+ LOGGER.info(id + " Thread Finished");
+
+ }
+
+ protected void getFriends(Long id) {
+
+ int keepTrying = 0;
+
+ long curser = -1;
+
+ // keep trying to load, give it 5 attempts.
+ //while (keepTrying < 10)
+ while (keepTrying < 1)
+ {
+ try
+ {
+ twitter4j.User follower4j;
+ String followerJson;
+ try {
+ follower4j = client.users().showUser(id);
+ followerJson = TwitterObjectFactory.getRawJSON(follower4j);
+ } catch (TwitterException e) {
+ LOGGER.error("Failure looking up " + id);
+ break;
+ }
+
+ PagableResponseList<User> followeeList = client.friendsFollowers().getFriendsList(id.longValue(), curser);
+
+ for (twitter4j.User followee4j : followeeList ) {
+
+ String followeeJson = TwitterObjectFactory.getRawJSON(followee4j);
+
+ try {
+ Follow follow = new Follow()
+ .withFollowee(mapper.readValue(followeeJson, org.apache.streams.twitter.pojo.User.class))
+ .withFollower(mapper.readValue(followerJson, org.apache.streams.twitter.pojo.User.class));
+
+ ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue);
+ } catch (JsonParseException e) {
+ LOGGER.warn(e.getMessage());
+ } catch (JsonMappingException e) {
+ LOGGER.warn(e.getMessage());
+ } catch (IOException e) {
+ LOGGER.warn(e.getMessage());
+ }
+ }
+ curser = followeeList.getNextCursor();
+ keepTrying = 10;
+
+ }
+ catch(TwitterException twitterException) {
+ keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
+ }
+ catch(Exception e) {
+ keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/43a3818f/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
index d5bf317..cad5aa8 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
@@ -54,7 +54,6 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
public static final String STREAMS_ID = "TwitterUserInformationProvider";
private static final Logger LOGGER = LoggerFactory.getLogger(TwitterUserInformationProvider.class);
-
private TwitterUserInformationConfiguration twitterUserInformationConfiguration;
private Class klass;
@@ -110,8 +109,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
running.set(true);
}
-
- private void loadBatch(Long[] ids) {
+ protected void loadBatch(Long[] ids) {
Twitter client = getTwitterClient();
int keepTrying = 0;
@@ -140,7 +138,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
}
}
- private void loadBatch(String[] ids) {
+ protected void loadBatch(String[] ids) {
Twitter client = getTwitterClient();
int keepTrying = 0;
@@ -234,7 +232,6 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
Preconditions.checkNotNull(providerQueue);
- Preconditions.checkNotNull(this.klass);
Preconditions.checkNotNull(twitterUserInformationConfiguration.getOauth().getConsumerKey());
Preconditions.checkNotNull(twitterUserInformationConfiguration.getOauth().getConsumerSecret());
Preconditions.checkNotNull(twitterUserInformationConfiguration.getOauth().getAccessToken());
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/43a3818f/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterFollowActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterFollowActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterFollowActivitySerializer.java
new file mode 100644
index 0000000..60da85a
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterFollowActivitySerializer.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.serializer;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.twitter.pojo.Follow;
+import org.apache.streams.twitter.pojo.User;
+import org.apache.streams.twitter.serializer.util.TwitterActivityUtil;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+import static org.apache.streams.twitter.serializer.util.TwitterActivityUtil.updateActivity;
+
+public class TwitterFollowActivitySerializer implements ActivitySerializer<Follow>, Serializable {
+
+ public TwitterFollowActivitySerializer() {}
+
+ private static TwitterFollowActivitySerializer instance = new TwitterFollowActivitySerializer();
+
+ public static TwitterFollowActivitySerializer getInstance() {
+ return instance;
+ }
+
+ @Override
+ public String serializationFormat() {
+ return null;
+ }
+
+ @Override
+ public Follow serialize(Activity deserialized) throws ActivitySerializerException {
+ return null;
+ }
+
+ @Override
+ public Activity deserialize(Follow event) throws ActivitySerializerException {
+
+ Activity activity = new Activity();
+ activity.setVerb("follow");
+ activity.setActor(TwitterActivityUtil.buildActor(event.getFollower()));
+ activity.setObject(TwitterActivityUtil.buildActor(event.getFollowee()));
+
+ return activity;
+ }
+
+ @Override
+ public List<Activity> deserializeAll(List<Follow> serializedList) {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/43a3818f/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/Follow.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/Follow.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/Follow.json
new file mode 100644
index 0000000..148038e
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/Follow.json
@@ -0,0 +1,14 @@
+{
+ "type": "object",
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "id": "#",
+ "javaType" : "org.apache.streams.twitter.pojo.Follow",
+ "properties": {
+ "follower": {
+ "$ref": "User.json"
+ },
+ "followee": {
+ "$ref": "User.json"
+ }
+ }
+}
\ No newline at end of file
[7/9] incubator-streams git commit: fixes from integration testing
with incubator-streams-examples/twitter-follow-graph
Posted by sb...@apache.org.
fixes from integration testing with incubator-streams-examples/twitter-follow-graph
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/8bf1cfaf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/8bf1cfaf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/8bf1cfaf
Branch: refs/heads/master
Commit: 8bf1cfaf05455c28df33eb825735555a61b2255a
Parents: 4bfbfc4
Author: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Authored: Fri Mar 27 10:00:52 2015 -0500
Committer: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Committed: Fri Mar 27 10:00:52 2015 -0500
----------------------------------------------------------------------
.../TwitterFollowActivityConverter.java | 5 +-
.../provider/TwitterFollowingProvider.java | 2 +-
.../TwitterActivityConverterProcessorTest.java | 115 ---------------
.../test/TwitterActivityConvertersTest.java | 139 ++++++-------------
4 files changed, 48 insertions(+), 213 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/8bf1cfaf/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterFollowActivityConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterFollowActivityConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterFollowActivityConverter.java
index dedda69..e0ed4a4 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterFollowActivityConverter.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterFollowActivityConverter.java
@@ -23,6 +23,8 @@ import org.apache.commons.lang.NotImplementedException;
import org.apache.streams.data.ActivityConverter;
import org.apache.streams.exceptions.ActivityConversionException;
import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+import org.apache.streams.pojo.json.Provider;
import org.apache.streams.twitter.pojo.Follow;
import org.apache.streams.twitter.converter.util.TwitterActivityUtil;
@@ -64,7 +66,8 @@ public class TwitterFollowActivityConverter implements ActivityConverter<Follow>
activity.setVerb("follow");
activity.setActor(TwitterActivityUtil.buildActor(event.getFollower()));
activity.setObject(TwitterActivityUtil.buildActor(event.getFollowee()));
-
+ activity.setId(activity.getActor().getId() + "-follow->" + activity.getObject().getId());
+ activity.setProvider((Provider) new Provider().withId("twitter"));
return Lists.newArrayList(activity);
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/8bf1cfaf/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
index 24dea0f..70c92c9 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
@@ -112,7 +112,7 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider {
@Override
public StreamsResultSet readCurrent() {
- LOGGER.info("Providing {} docs", providerQueue.size());
+ LOGGER.debug("Providing {} docs", providerQueue.size());
StreamsResultSet result;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/8bf1cfaf/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterActivityConverterProcessorTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterActivityConverterProcessorTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterActivityConverterProcessorTest.java
deleted file mode 100644
index 61a3a6b..0000000
--- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterActivityConverterProcessorTest.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.test;
-
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
-import org.apache.streams.converter.ActivityConverterProcessor;
-import org.apache.streams.converter.ActivityConverterProcessorConfiguration;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.twitter.pojo.Retweet;
-import org.apache.streams.twitter.converter.StreamsTwitterMapper;
-import org.apache.streams.twitter.converter.TwitterDocumentClassifier;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.util.List;
-
-import static org.hamcrest.CoreMatchers.*;
-import static org.junit.Assert.assertThat;
-
-/**
-* Created with IntelliJ IDEA.
-* User: sblackmon
-* Date: 8/20/13
-* Time: 5:57 PM
-* To change this template use File | Settings | File Templates.
-*/
-public class TwitterActivityConverterProcessorTest {
-
- private final static Logger LOGGER = LoggerFactory.getLogger(TwitterActivityConverterProcessorTest.class);
-
- private ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(StreamsTwitterMapper.TWITTER_FORMAT));
-
- private static final String TWITTER_JSON= "{\"created_at\":\"Wed Dec 11 22:27:34 +0000 2013\",\"id\":410898682356047872,\"id_str\":\"410898682356047872\",\"text\":\"RT @ughhblog: RRome (Brooklyn, NY) \\u2013 MY GIRL http:\\/\\/t.co\\/x6uxX9PLsH via @indierapblog @RRoseRRome\",\"source\":\"\\u003ca href=\\\"https:\\/\\/about.twitter.com\\/products\\/tweetdeck\\\" rel=\\\"nofollow\\\"\\u003eTweetDeck\\u003c\\/a\\u003e\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":70463906,\"id_str\":\"70463906\",\"name\":\"MHM DESIGNS, LLC\",\"screen_name\":\"MHMDESIGNS\",\"location\":\"Los Angeles New York\",\"url\":\"http:\\/\\/www.mhmdesigns.com\",\"description\":\"Multi Media Made Simple- Web desig, Graphic Design, Internet Marketing, Photography, Video Production and much much more.\",\"protected\":false,\"followers_count\":10,\"friends_coun
t\":64,\"listed_count\":1,\"created_at\":\"Mon Aug 31 18:31:54 +0000 2009\",\"favourites_count\":0,\"utc_offset\":-28800,\"time_zone\":\"Pacific Time (US & Canada)\",\"geo_enabled\":false,\"verified\":false,\"statuses_count\":87,\"lang\":\"en\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"9AE4E8\",\"profile_background_image_url\":\"http:\\/\\/a0.twimg.com\\/profile_background_images\\/33456434\\/body.png\",\"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_background_images\\/33456434\\/body.png\",\"profile_background_tile\":false,\"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/391494416\\/mhm_design_logo__normal.png\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/391494416\\/mhm_design_logo__normal.png\",\"profile_link_color\":\"0084B4\",\"profile_sidebar_border_color\":\"BDDCAD\",\"profile_sidebar_fill_color\":\"DDFFCC\",\"profile_text_color\":\"333333\",\"profile_us
e_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweeted_status\":{\"created_at\":\"Wed Dec 11 10:56:49 +0000 2013\",\"id\":410724848306892800,\"id_str\":\"410724848306892800\",\"text\":\"RRome (Brooklyn, NY) \\u2013 MY GIRL http:\\/\\/t.co\\/x6uxX9PLsH via @indierapblog @RRoseRRome\",\"source\":\"\\u003ca href=\\\"http:\\/\\/twitter.com\\/tweetbutton\\\" rel=\\\"nofollow\\\"\\u003eTweet Button\\u003c\\/a\\u003e\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":538836510,\"id_str\":\"538836510\",\"name\":\"UGHHBlog\",\"screen_name\":\"ughhblog\",\"location\":\"Los Angeles\",\"url\":\"http:\\/\\/www.undergroundhiphopblog.com\",\"description\":\"http:\\/\\/UN
DERGROUNDHIPHOPBLOG.com: A top Indie\\/Underground Hip Hop community blog. Submission Email: ughhblog@gmail.com \\/\\/\\/ Official Host: @pawz1\",\"protected\":false,\"followers_count\":2598,\"friends_count\":373,\"listed_count\":25,\"created_at\":\"Wed Mar 28 05:40:49 +0000 2012\",\"favourites_count\":423,\"utc_offset\":-28800,\"time_zone\":\"Pacific Time (US & Canada)\",\"geo_enabled\":false,\"verified\":false,\"statuses_count\":9623,\"lang\":\"en\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"131516\",\"profile_background_image_url\":\"http:\\/\\/a0.twimg.com\\/profile_background_images\\/544717772\\/UGHHBlogLogo.jpg\",\"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_background_images\\/544717772\\/UGHHBlogLogo.jpg\",\"profile_background_tile\":false,\"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/2583702975\\/uas8528qzzdlnsb7igzn_normal.jpeg\",\"profile_image_url_https\":\"https:\\/\\/pbs.tw
img.com\\/profile_images\\/2583702975\\/uas8528qzzdlnsb7igzn_normal.jpeg\",\"profile_link_color\":\"009999\",\"profile_sidebar_border_color\":\"EEEEEE\",\"profile_sidebar_fill_color\":\"EFEFEF\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweet_count\":4,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[{\"url\":\"http:\\/\\/t.co\\/x6uxX9PLsH\",\"expanded_url\":\"http:\\/\\/indierapblog.com\\/rrome-brooklyn-ny-my-girl\\/\",\"display_url\":\"indierapblog.com\\/rrome-brooklyn\\u2026\",\"indices\":[31,53]}],\"user_mentions\":[{\"screen_name\":\"IndieRapBlog\",\"name\":\"IndieRapBlog.com\",\"id\":922776728,\"id_str\":\"922776728\",\"indices\":[58,71]},{\"screen_name\":\"RRoseRRome\",\"name\":\"RRome\",\"id\":76371478,\"id_str\":\"76
371478\",\"indices\":[72,83]}]},\"favorited\":false,\"retweeted\":false,\"possibly_sensitive\":false,\"lang\":\"en\"},\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[{\"url\":\"http:\\/\\/t.co\\/x6uxX9PLsH\",\"expanded_url\":\"http:\\/\\/indierapblog.com\\/rrome-brooklyn-ny-my-girl\\/\",\"display_url\":\"indierapblog.com\\/rrome-brooklyn\\u2026\",\"indices\":[45,67]}],\"user_mentions\":[{\"screen_name\":\"ughhblog\",\"name\":\"UGHHBlog\",\"id\":538836510,\"id_str\":\"538836510\",\"indices\":[3,12]},{\"screen_name\":\"IndieRapBlog\",\"name\":\"IndieRapBlog.com\",\"id\":922776728,\"id_str\":\"922776728\",\"indices\":[72,85]},{\"screen_name\":\"RRoseRRome\",\"name\":\"RRome\",\"id\":76371478,\"id_str\":\"76371478\",\"indices\":[86,97]}]},\"favorited\":false,\"retweeted\":false,\"possibly_sensitive\":false,\"filter_level\":\"medium\",\"lang\":\"en\"}";
-
- @Test
- public void Tests()
- {
- mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.TRUE);
- mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
- mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
-
- InputStream is = TwitterActivityConverterProcessorTest.class.getResourceAsStream("/testtweets.txt");
- InputStreamReader isr = new InputStreamReader(is);
- BufferedReader br = new BufferedReader(isr);
-
- ObjectNode event = null;
- try {
- event = (ObjectNode) mapper.readTree(TWITTER_JSON);
- } catch (IOException e) {
- e.printStackTrace();
- Assert.fail();
- }
-
- assertThat(event, is(not(nullValue())));
-
- Retweet retweet = mapper.convertValue(event, Retweet.class);
-
- assertThat(retweet, is(not(nullValue())));
- assertThat(retweet.getCreatedAt(), is(not(nullValue())));
- assertThat(retweet.getText(), is(not(nullValue())));
- assertThat(retweet.getUser(), is(not(nullValue())));
- assertThat(retweet.getRetweetedStatus(), is(not(nullValue())));
-
- Activity activity = null;
- try {
- ActivityConverterProcessorConfiguration converterProcessorConfiguration =
- new ActivityConverterProcessorConfiguration()
- .withClassifiers((List) Lists.newArrayList(new TwitterDocumentClassifier()));
- ActivityConverterProcessor converter = new ActivityConverterProcessor(converterProcessorConfiguration);
- converter.prepare(converterProcessorConfiguration);
- List<StreamsDatum> result = converter.process(new StreamsDatum(TWITTER_JSON));
- activity = (Activity)result.get(0).getDocument();
- } catch (Throwable e) {
- e.printStackTrace();
- Assert.fail();
- }
-
- assertThat(activity, is(not(nullValue())));
-
- assertThat(activity.getId(), is(not(nullValue())));
- assertThat(activity.getActor(), is(not(nullValue())));
- assertThat(activity.getActor().getId(), is(not(nullValue())));
- assertThat(activity.getVerb(), is(not(nullValue())));
- assertThat(activity.getProvider(), is(not(nullValue())));
- assertThat(activity.getObject(), is(not(nullValue())));
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/8bf1cfaf/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterActivityConvertersTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterActivityConvertersTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterActivityConvertersTest.java
index a8f5d86..d43cc27 100644
--- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterActivityConvertersTest.java
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterActivityConvertersTest.java
@@ -19,28 +19,23 @@
package org.apache.streams.twitter.test;
import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.commons.lang.StringUtils;
-import org.apache.streams.converter.TypeConverterUtil;
-import org.apache.streams.data.ActivityConverter;
+import com.google.common.collect.Lists;
+import org.apache.streams.converter.ActivityConverterUtil;
+import org.apache.streams.data.util.ActivityUtil;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
+import org.apache.streams.twitter.pojo.Delete;
+import org.apache.streams.twitter.pojo.Follow;
import org.apache.streams.twitter.pojo.Retweet;
-import org.apache.streams.twitter.pojo.Tweet;
import org.apache.streams.twitter.converter.TwitterDocumentClassifier;
-import org.apache.streams.twitter.converter.StreamsTwitterMapper;
-import org.apache.streams.twitter.converter.TwitterJsonRetweetActivityConverter;
-import org.apache.streams.twitter.converter.TwitterJsonTweetActivityConverter;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.BufferedReader;
-import java.io.InputStream;
-import java.io.InputStreamReader;
+import java.util.List;
-import static org.hamcrest.CoreMatchers.*;
-import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
/**
@@ -54,96 +49,48 @@ public class TwitterActivityConvertersTest {
private final static Logger LOGGER = LoggerFactory.getLogger(TwitterActivityConvertersTest.class);
- private ObjectMapper mapper = StreamsJacksonMapper.getInstance(StreamsTwitterMapper.TWITTER_FORMAT);
+ private ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT));
- @Test
- public void Tests()
- {
- InputStream is = TwitterActivityConvertersTest.class.getResourceAsStream("/testtweets.txt");
- InputStreamReader isr = new InputStreamReader(is);
- BufferedReader br = new BufferedReader(isr);
-
- ActivityConverter activityConverter;
-
- try {
- while (br.ready()) {
- String line = br.readLine();
- if(!StringUtils.isEmpty(line))
- {
- LOGGER.info("raw: {}", line);
-
- Class detected = new TwitterDocumentClassifier().detectClasses(line).get(0);
-
- if( detected == Tweet.class ) {
- activityConverter = new TwitterJsonTweetActivityConverter();
- } else if( detected == Retweet.class ) {
- activityConverter = new TwitterJsonRetweetActivityConverter();
- } else {
- Assert.fail();
- return;
- }
-
- Object typedObject = TypeConverterUtil.convert(line, detected, mapper);
-
- Activity activity = (Activity) activityConverter.toActivityList(typedObject).get(0);
-
- String activitystring = mapper.writeValueAsString(activity);
-
- LOGGER.info("activity: {}", activitystring);
-
- assertThat(activity, is(not(nullValue())));
-
- assertThat(activity.getId(), is(not(nullValue())));
- assertThat(activity.getActor(), is(not(nullValue())));
- assertThat(activity.getActor().getId(), is(not(nullValue())));
- assertThat(activity.getVerb(), is(not(nullValue())));
- assertThat(activity.getProvider(), is(not(nullValue())));
-
- if( detected == Tweet.class ) {
-
- assertEquals(activity.getVerb(), "post");
-
- Tweet tweet = mapper.readValue(line, Tweet.class);
-
- if( tweet.getEntities() != null &&
- tweet.getEntities().getUrls() != null &&
- tweet.getEntities().getUrls().size() > 0 ) {
-
-
- assertThat(activity.getLinks(), is(not(nullValue())));
- assertEquals(tweet.getEntities().getUrls().size(), activity.getLinks().size());
- }
+ private ActivityConverterUtil activityConverterUtil = ActivityConverterUtil.getInstance();
- } else if( detected == Retweet.class ) {
+ private String tweet = "{\"created_at\":\"Wed Dec 11 22:27:34 +0000 2013\",\"id\":410898682381615105,\"id_str\":\"410898682381615105\",\"text\":\"Men's Basketball Single-Game Tickets Available - A limited number of tickets remain for Kentucky's upcoming men's ... http:\\/\\/t.co\\/SH5YZGpdRx\",\"source\":\"\\u003ca href=\\\"http:\\/\\/www.hootsuite.com\\\" rel=\\\"nofollow\\\"\\u003eHootSuite\\u003c\\/a\\u003e\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":91407775,\"id_str\":\"91407775\",\"name\":\"Winchester, KY\",\"screen_name\":\"winchester_ky\",\"location\":\"\",\"url\":null,\"description\":null,\"protected\":false,\"followers_count\":136,\"friends_count\":0,\"listed_count\":1,\"created_at\":\"Fri Nov 20 19:29:02 +0000 2009\",\"favourites_count\":0,\"utc_offset\":null,\"time_zone\":null,\"geo_enabled\":false,\"verified\":fa
lse,\"statuses_count\":1793,\"lang\":\"en\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C0DEED\",\"profile_background_image_url\":\"http:\\/\\/abs.twimg.com\\/images\\/themes\\/theme1\\/bg.png\",\"profile_background_image_url_https\":\"https:\\/\\/abs.twimg.com\\/images\\/themes\\/theme1\\/bg.png\",\"profile_background_tile\":false,\"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/613854495\\/winchester_sociallogo_normal.jpg\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/613854495\\/winchester_sociallogo_normal.jpg\",\"profile_link_color\":\"0084B4\",\"profile_sidebar_border_color\":\"C0DEED\",\"profile_sidebar_fill_color\":\"DDEEF6\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"default_profile\":true,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors
\":null,\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[{\"url\":\"http:\\/\\/t.co\\/SH5YZGpdRx\",\"expanded_url\":\"http:\\/\\/ow.ly\\/2C2XL1\",\"display_url\":\"ow.ly\\/2C2XL1\",\"indices\":[118,140]}],\"user_mentions\":[]},\"favorited\":false,\"retweeted\":false,\"possibly_sensitive\":false,\"filter_level\":\"medium\",\"lang\":\"en\"}\n";
+ private String retweet = "{\"created_at\":\"Wed Dec 11 22:27:34 +0000 2013\",\"id\":410898682385797121,\"id_str\":\"410898682385797121\",\"text\":\"RT @hemocional: Cuando te acarici\\u00e9 me di cuenta que hab\\u00eda vivido toda mi vida con las manos vac\\u00edas.\\nALEJANDRO JODOROWSKY.\",\"source\":\"web\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":163149656,\"id_str\":\"163149656\",\"name\":\"Carolina\",\"screen_name\":\"_titinaok\",\"location\":\"Montevideo\",\"url\":\"http:\\/\\/www.youtube.com\\/watch?v=N3v5vZ-tU1E\",\"description\":\"Tantas veces me defin\\u00ed ...Soy nada y todo a la vez\",\"protected\":false,\"followers_count\":41,\"friends_count\":75,\"listed_count\":2,\"created_at\":\"Mon Jul 05 17:35:49 +0000 2010\",\"favourites_count\":4697,\"utc_offset\":-10800,\"time_zone\":\"Buenos Aires\",\"geo_enabled\":fal
se,\"verified\":false,\"statuses_count\":5257,\"lang\":\"es\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C4A64B\",\"profile_background_image_url\":\"http:\\/\\/a0.twimg.com\\/profile_background_images\\/378800000096791690\\/f64a07abbaa735b39ad7655fdaa2f416.jpeg\",\"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_background_images\\/378800000096791690\\/f64a07abbaa735b39ad7655fdaa2f416.jpeg\",\"profile_background_tile\":true,\"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/378800000799213504\\/496d008f457390005825d2eb4ca50a63_normal.jpeg\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/378800000799213504\\/496d008f457390005825d2eb4ca50a63_normal.jpeg\",\"profile_banner_url\":\"https:\\/\\/pbs.twimg.com\\/profile_banners\\/163149656\\/1379722210\",\"profile_link_color\":\"BF415A\",\"profile_sidebar_border_color\":\"000000\",\"profile_sidebar_fill_color\":\"B17CED\",\"pr
ofile_text_color\":\"3D1957\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweeted_status\":{\"created_at\":\"Wed Dec 11 22:25:06 +0000 2013\",\"id\":410898060206960640,\"id_str\":\"410898060206960640\",\"text\":\"Cuando te acarici\\u00e9 me di cuenta que hab\\u00eda vivido toda mi vida con las manos vac\\u00edas.\\nALEJANDRO JODOROWSKY.\",\"source\":\"\\u003ca href=\\\"http:\\/\\/bufferapp.com\\\" rel=\\\"nofollow\\\"\\u003eBuffer\\u003c\\/a\\u003e\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":552929456,\"id_str\":\"552929456\",\"name\":\"Habilidad emocional\",\"screen_name\":\"hemocional\",\"location\":\"\",\"url\":\"http:\\/\\/www.hab
ilidademocional.com\",\"description\":\"Pensamientos y reflexiones para ayudar a mirar la vida de una manera m\\u00e1s saludable y a crecer interiormente cada d\\u00eda m\\u00e1s. #InteligenciaEmocional #Psicolog\\u00eda\",\"protected\":false,\"followers_count\":34307,\"friends_count\":325,\"listed_count\":361,\"created_at\":\"Fri Apr 13 19:00:11 +0000 2012\",\"favourites_count\":44956,\"utc_offset\":3600,\"time_zone\":\"Madrid\",\"geo_enabled\":false,\"verified\":false,\"statuses_count\":24011,\"lang\":\"es\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"000000\",\"profile_background_image_url\":\"http:\\/\\/a0.twimg.com\\/profile_background_images\\/378800000123681920\\/aab7226ae139f0ff93b04a08a8541477.jpeg\",\"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_background_images\\/378800000123681920\\/aab7226ae139f0ff93b04a08a8541477.jpeg\",\"profile_background_tile\":false,\"profile_image_url\":\"http:\\/\\/pbs.twimg
.com\\/profile_images\\/2430091220\\/zdkea46xhe3g4e65nuwl_normal.gif\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/2430091220\\/zdkea46xhe3g4e65nuwl_normal.gif\",\"profile_banner_url\":\"https:\\/\\/pbs.twimg.com\\/profile_banners\\/552929456\\/1383180255\",\"profile_link_color\":\"FF00E1\",\"profile_sidebar_border_color\":\"FFFFFF\",\"profile_sidebar_fill_color\":\"F3F3F3\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweet_count\":9,\"favorite_count\":6,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[],\"user_mentions\":[]},\"favorited\":false,\"retweeted\":false,\"lang\":\"es\"},\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[],\"user_mentions\":[{\"screen_name\":\"he
mocional\",\"name\":\"Habilidad emocional\",\"id\":552929456,\"id_str\":\"552929456\",\"indices\":[3,14]}]},\"favorited\":false,\"retweeted\":false,\"filter_level\":\"medium\",\"lang\":\"es\"}\n";
+ private String delete = "{\"delete\":{\"status\":{\"id\":377518972486553600,\"user_id\":1249045572,\"id_str\":\"377518972486553600\",\"user_id_str\":\"1249045572\"}}}\n";
+ private String follow = "{\"follower\":{\"id\":1249045572},\"followee\":{\"id\":32386852}}\n";
- Retweet retweet = mapper.readValue(line, Retweet.class);
-
- assertThat(retweet.getRetweetedStatus(), is(not(nullValue())));
-
- assertEquals(activity.getVerb(), "share");
-
- assertThat(activity.getObject(), is(not(nullValue())));
- assertThat(activity.getObject().getObjectType(), is(not(nullValue())));
- assertThat(activity.getObject().getObjectType(), is(not(nullValue())));
-
- if( retweet.getRetweetedStatus().getEntities() != null &&
- retweet.getRetweetedStatus().getEntities().getUrls() != null &&
- retweet.getRetweetedStatus().getEntities().getUrls().size() > 0 ) {
-
- assertThat(activity.getLinks(), is(not(nullValue())));
- assertEquals(retweet.getRetweetedStatus().getEntities().getUrls().size(), activity.getLinks().size());
- }
-
- }
+ @Test
+ public void testConvertTweet() {
+ List<Activity> activityList = activityConverterUtil.convert(tweet);
+ Assert.assertTrue(activityList.size() == 1);
+ Activity activity = activityList.get(0);
+ if( !ActivityUtil.isValid(activity) )
+ Assert.fail();
+ }
+ @Test
+ public void testConvertRetweet() {
+ List<Activity> activityList = activityConverterUtil.convert(retweet);
+ Assert.assertTrue(activityList.size() == 1);
+ Activity activity = activityList.get(0);
+ if( !ActivityUtil.isValid(activity) )
+ Assert.fail();
+ }
+ @Test
+ public void testConvertDelete() {
+ List<Activity> activityList = activityConverterUtil.convert(delete);
+ Assert.assertTrue(activityList.size() == 1);
+ Activity activity = activityList.get(0);
+ if( !ActivityUtil.isValid(activity) )
+ Assert.fail();
+ }
- }
- }
- } catch( Exception e ) {
- System.out.println(e);
- e.printStackTrace();
+ @Test
+ public void testConvertFollow() {
+ List<Activity> activityList = activityConverterUtil.convert(follow);
+ Assert.assertTrue(activityList.size() == 1);
+ Activity activity = activityList.get(0);
+ if( !ActivityUtil.isValid(activity) )
Assert.fail();
- }
}
}
[3/9] incubator-streams git commit: loop logic
Posted by sb...@apache.org.
loop logic
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/cd6d7c1a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/cd6d7c1a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/cd6d7c1a
Branch: refs/heads/master
Commit: cd6d7c1a11ff5a29a474b13fa8e8a0968d01c006
Parents: e5d5086
Author: sblackmon <sb...@apache.org>
Authored: Tue Nov 25 15:39:01 2014 -0600
Committer: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Committed: Thu Mar 26 13:04:58 2015 -0500
----------------------------------------------------------------------
.../provider/TwitterFollowersProviderTask.java | 17 ++++++++---------
.../provider/TwitterFriendsProviderTask.java | 8 ++------
2 files changed, 10 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cd6d7c1a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersProviderTask.java
index 9f91267..03c0640 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersProviderTask.java
@@ -29,6 +29,7 @@ import org.apache.streams.twitter.pojo.User;
import org.apache.streams.util.ComponentUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import twitter4j.PagableResponseList;
import twitter4j.Twitter;
import twitter4j.TwitterException;
import twitter4j.TwitterObjectFactory;
@@ -67,12 +68,9 @@ public class TwitterFollowersProviderTask implements Runnable {
int keepTrying = 0;
- long paging = 1;
+ long curser = -1;
-
- // keep trying to load, give it 5 attempts.
- //while (keepTrying < 10)
- while (keepTrying < 5)
+ do
{
try
{
@@ -86,7 +84,9 @@ public class TwitterFollowersProviderTask implements Runnable {
break;
}
- for (twitter4j.User follower4j : client.friendsFollowers().getFollowersList(id.longValue(), paging)) {
+ PagableResponseList<twitter4j.User> followerList = client.friendsFollowers().getFollowersList(id.longValue(), curser);
+
+ for (twitter4j.User follower4j : followerList) {
String followerJson = TwitterObjectFactory.getRawJSON(follower4j);
@@ -103,9 +103,8 @@ public class TwitterFollowersProviderTask implements Runnable {
} catch (IOException e) {
LOGGER.warn(e.getMessage());
}
- paging++;
- keepTrying = 10;
}
+ curser = followerList.getNextCursor();
}
catch(TwitterException twitterException) {
keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
@@ -113,7 +112,7 @@ public class TwitterFollowersProviderTask implements Runnable {
catch(Exception e) {
keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
}
- }
+ } while (curser != 0 && keepTrying < 10);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cd6d7c1a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsProviderTask.java
index 408a614..434e208 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsProviderTask.java
@@ -70,9 +70,7 @@ public class TwitterFriendsProviderTask implements Runnable {
long curser = -1;
- // keep trying to load, give it 5 attempts.
- //while (keepTrying < 10)
- while (keepTrying < 1)
+ do
{
try
{
@@ -107,8 +105,6 @@ public class TwitterFriendsProviderTask implements Runnable {
}
}
curser = followeeList.getNextCursor();
- keepTrying = 10;
-
}
catch(TwitterException twitterException) {
keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
@@ -116,7 +112,7 @@ public class TwitterFriendsProviderTask implements Runnable {
catch(Exception e) {
keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
}
- }
+ } while (curser != 0 && keepTrying < 10);
}
}
[4/9] incubator-streams git commit: screen name support
Posted by sb...@apache.org.
screen name support
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/3dceeb5e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/3dceeb5e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/3dceeb5e
Branch: refs/heads/master
Commit: 3dceeb5efdb260e1b3c4f7cf15b4e38ccb23df8c
Parents: cd6d7c1
Author: sblackmon <sb...@apache.org>
Authored: Thu Nov 27 12:32:13 2014 -0600
Committer: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Committed: Thu Mar 26 20:14:14 2015 -0500
----------------------------------------------------------------------
.../TwitterFollowActivityConverter.java | 81 ++++++++++++++++++++
.../provider/TwitterFollowersProviderTask.java | 67 +++++++++++++++-
.../provider/TwitterFollowingProvider.java | 21 ++++-
.../provider/TwitterFriendsProviderTask.java | 67 +++++++++++++++-
.../TwitterFollowActivitySerializer.java | 70 -----------------
5 files changed, 231 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3dceeb5e/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterFollowActivityConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterFollowActivityConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterFollowActivityConverter.java
new file mode 100644
index 0000000..dedda69
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterFollowActivityConverter.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.converter;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.streams.data.ActivityConverter;
+import org.apache.streams.exceptions.ActivityConversionException;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.twitter.pojo.Follow;
+import org.apache.streams.twitter.converter.util.TwitterActivityUtil;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class TwitterFollowActivityConverter implements ActivityConverter<Follow>, Serializable {
+
+ public TwitterFollowActivityConverter() {
+ }
+
+ private static TwitterFollowActivityConverter instance = new TwitterFollowActivityConverter();
+
+ public static TwitterFollowActivityConverter getInstance() {
+ return instance;
+ }
+
+ public static Class requiredClass = Follow.class;
+
+ @Override
+ public Class requiredClass() {
+ return requiredClass;
+ }
+
+ @Override
+ public String serializationFormat() {
+ return null;
+ }
+
+ @Override
+ public Follow fromActivity(Activity deserialized) throws ActivityConversionException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public List<Activity> toActivityList(Follow event) throws ActivityConversionException {
+
+ Activity activity = new Activity();
+ activity.setVerb("follow");
+ activity.setActor(TwitterActivityUtil.buildActor(event.getFollower()));
+ activity.setObject(TwitterActivityUtil.buildActor(event.getFollowee()));
+
+ return Lists.newArrayList(activity);
+ }
+
+ @Override
+ public List<Follow> fromActivityList(List<Activity> list) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public List<Activity> toActivityList(List<Follow> list) {
+ throw new NotImplementedException();
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3dceeb5e/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersProviderTask.java
index 03c0640..cb679a6 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersProviderTask.java
@@ -48,6 +48,7 @@ public class TwitterFollowersProviderTask implements Runnable {
protected TwitterFollowingProvider provider;
protected Twitter client;
protected Long id;
+ protected String screenName;
public TwitterFollowersProviderTask(TwitterFollowingProvider provider, Twitter twitter, Long id) {
this.provider = provider;
@@ -55,12 +56,22 @@ public class TwitterFollowersProviderTask implements Runnable {
this.id = id;
}
+ public TwitterFollowersProviderTask(TwitterFollowingProvider provider, Twitter twitter, String screenName) {
+ this.provider = provider;
+ this.client = twitter;
+ this.screenName = screenName;
+ }
+
+
@Override
public void run() {
- getFollowers(id);
+ if( id != null )
+ getFollowers(id);
+ if( screenName != null)
+ getFollowers(screenName);
- LOGGER.info(id + " Thread Finished");
+ LOGGER.info(id != null ? id.toString() : screenName + " Thread Finished");
}
@@ -115,4 +126,56 @@ public class TwitterFollowersProviderTask implements Runnable {
} while (curser != 0 && keepTrying < 10);
}
+ protected void getFollowers(String screenName) {
+
+ int keepTrying = 0;
+
+ long curser = -1;
+
+ do
+ {
+ try
+ {
+ twitter4j.User followee4j;
+ String followeeJson;
+ try {
+ followee4j = client.users().showUser(screenName);
+ followeeJson = TwitterObjectFactory.getRawJSON(followee4j);
+ } catch (TwitterException e) {
+ LOGGER.error("Failure looking up " + screenName);
+ break;
+ }
+
+ PagableResponseList<twitter4j.User> followerList = client.friendsFollowers().getFollowersList(screenName, curser);
+
+ for (twitter4j.User follower4j : followerList) {
+
+ String followerJson = TwitterObjectFactory.getRawJSON(follower4j);
+
+ try {
+ Follow follow = new Follow()
+ .withFollowee(mapper.readValue(followeeJson, User.class))
+ .withFollower(mapper.readValue(followerJson, User.class));
+
+ ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue);
+ } catch (JsonParseException e) {
+ LOGGER.warn(e.getMessage());
+ } catch (JsonMappingException e) {
+ LOGGER.warn(e.getMessage());
+ } catch (IOException e) {
+ LOGGER.warn(e.getMessage());
+ }
+ }
+ curser = followerList.getNextCursor();
+ }
+ catch(TwitterException twitterException) {
+ keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
+ }
+ catch(Exception e) {
+ keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
+ }
+ } while (curser != 0 && keepTrying < 10);
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3dceeb5e/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
index 0a62fc8..24dea0f 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
@@ -60,13 +60,16 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider {
running.set(true);
- Preconditions.checkArgument(idsBatches.hasNext());
+ Preconditions.checkArgument(idsBatches.hasNext() || screenNameBatches.hasNext());
LOGGER.info("startStream");
while (idsBatches.hasNext()) {
submitFollowingThreads(idsBatches.next());
}
+ while (screenNameBatches.hasNext()) {
+ submitFollowingThreads(screenNameBatches.next());
+ }
running.set(true);
@@ -90,6 +93,22 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider {
}
}
+ protected void submitFollowingThreads(String[] screenNames) {
+ Twitter client = getTwitterClient();
+
+ if( getConfig().getEndpoint().equals("friends") ) {
+ for (int i = 0; i < screenNames.length; i++) {
+ TwitterFriendsProviderTask providerTask = new TwitterFriendsProviderTask(this, client, screenNames[i]);
+ executor.submit(providerTask);
+ }
+ } else if( getConfig().getEndpoint().equals("followers") ) {
+ for (int i = 0; i < screenNames.length; i++) {
+ TwitterFollowersProviderTask providerTask = new TwitterFollowersProviderTask(this, client, screenNames[i]);
+ executor.submit(providerTask);
+ }
+ }
+ }
+
@Override
public StreamsResultSet readCurrent() {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3dceeb5e/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsProviderTask.java
index 434e208..c5ababe 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsProviderTask.java
@@ -48,6 +48,7 @@ public class TwitterFriendsProviderTask implements Runnable {
protected TwitterFollowingProvider provider;
protected Twitter client;
protected Long id;
+ protected String screenName;
public TwitterFriendsProviderTask(TwitterFollowingProvider provider, Twitter twitter, Long id) {
this.provider = provider;
@@ -55,12 +56,22 @@ public class TwitterFriendsProviderTask implements Runnable {
this.id = id;
}
+ public TwitterFriendsProviderTask(TwitterFollowingProvider provider, Twitter twitter, String screenName) {
+ this.provider = provider;
+ this.client = twitter;
+ this.screenName = screenName;
+ }
+
+
@Override
public void run() {
- getFriends(id);
+ if( id != null )
+ getFriends(id);
+ if( screenName != null)
+ getFriends(screenName);
- LOGGER.info(id + " Thread Finished");
+ LOGGER.info(id != null ? id.toString() : screenName + " Thread Finished");
}
@@ -115,4 +126,56 @@ public class TwitterFriendsProviderTask implements Runnable {
} while (curser != 0 && keepTrying < 10);
}
+ protected void getFriends(String screenName) {
+
+ int keepTrying = 0;
+
+ long curser = -1;
+
+ do
+ {
+ try
+ {
+ twitter4j.User follower4j;
+ String followerJson;
+ try {
+ follower4j = client.users().showUser(screenName);
+ followerJson = TwitterObjectFactory.getRawJSON(follower4j);
+ } catch (TwitterException e) {
+ LOGGER.error("Failure looking up " + screenName);
+ break;
+ }
+
+ PagableResponseList<User> followeeList = client.friendsFollowers().getFriendsList(screenName, curser);
+
+ for (twitter4j.User followee4j : followeeList ) {
+
+ String followeeJson = TwitterObjectFactory.getRawJSON(followee4j);
+
+ try {
+ Follow follow = new Follow()
+ .withFollowee(mapper.readValue(followeeJson, org.apache.streams.twitter.pojo.User.class))
+ .withFollower(mapper.readValue(followerJson, org.apache.streams.twitter.pojo.User.class));
+
+ ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue);
+ } catch (JsonParseException e) {
+ LOGGER.warn(e.getMessage());
+ } catch (JsonMappingException e) {
+ LOGGER.warn(e.getMessage());
+ } catch (IOException e) {
+ LOGGER.warn(e.getMessage());
+ }
+ }
+ curser = followeeList.getNextCursor();
+ }
+ catch(TwitterException twitterException) {
+ keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
+ }
+ catch(Exception e) {
+ keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
+ }
+ } while (curser != 0 && keepTrying < 10);
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3dceeb5e/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterFollowActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterFollowActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterFollowActivitySerializer.java
deleted file mode 100644
index 4dd29cc..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterFollowActivitySerializer.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.serializer;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.streams.data.ActivitySerializer;
-import org.apache.streams.exceptions.ActivitySerializerException;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.twitter.pojo.Follow;
-import org.apache.streams.twitter.pojo.User;
-import org.apache.streams.twitter.converter.util.TwitterActivityUtil;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.List;
-
-public class TwitterFollowActivitySerializer implements ActivitySerializer<Follow>, Serializable {
-
- public TwitterFollowActivitySerializer() {}
-
- private static TwitterFollowActivitySerializer instance = new TwitterFollowActivitySerializer();
-
- public static TwitterFollowActivitySerializer getInstance() {
- return instance;
- }
-
- @Override
- public String serializationFormat() {
- return null;
- }
-
- @Override
- public Follow serialize(Activity deserialized) throws ActivitySerializerException {
- return null;
- }
-
- @Override
- public Activity deserialize(Follow event) throws ActivitySerializerException {
-
- Activity activity = new Activity();
- activity.setVerb("follow");
- activity.setActor(TwitterActivityUtil.buildActor(event.getFollower()));
- activity.setObject(TwitterActivityUtil.buildActor(event.getFollowee()));
-
- return activity;
- }
-
- @Override
- public List<Activity> deserializeAll(List<Follow> serializedList) {
- return null;
- }
-}
[9/9] incubator-streams git commit: Merge branch 'STREAMS-235' into
asf-master
Posted by sb...@apache.org.
Merge branch 'STREAMS-235' into asf-master
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/a71653cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/a71653cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/a71653cf
Branch: refs/heads/master
Commit: a71653cff8d466928dc8fbfde4923a4cb92c4a76
Parents: 2b994de 835539a
Author: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Authored: Thu Apr 2 16:21:26 2015 -0500
Committer: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Committed: Thu Apr 2 16:21:26 2015 -0500
----------------------------------------------------------------------
.../streams-provider-twitter/pom.xml | 3 +-
.../converter/TwitterDocumentClassifier.java | 12 +-
.../TwitterFollowActivityConverter.java | 84 +++++++++
.../provider/TwitterFollowingProvider.java | 137 ++++++++++++++
.../provider/TwitterFollowingProviderTask.java | 160 ++++++++++++++++
.../provider/TwitterFriendsProviderTask.java | 181 +++++++++++++++++++
.../TwitterUserInformationProvider.java | 7 +-
.../src/main/jsonschema/com/twitter/Follow.json | 14 ++
.../TwitterActivityConverterProcessorTest.java | 115 ------------
.../test/TwitterActivityConvertersTest.java | 139 +++++---------
.../test/TwitterDocumentClassifierTest.java | 11 ++
11 files changed, 643 insertions(+), 220 deletions(-)
----------------------------------------------------------------------
[6/9] incubator-streams git commit: follow detection test
Posted by sb...@apache.org.
follow detection test
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/4bfbfc40
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/4bfbfc40
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/4bfbfc40
Branch: refs/heads/master
Commit: 4bfbfc400e94c27c847cb2efb670831c80d0a9d9
Parents: c14e335
Author: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Authored: Thu Mar 26 20:29:11 2015 -0500
Committer: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Committed: Thu Mar 26 20:29:11 2015 -0500
----------------------------------------------------------------------
.../twitter/converter/TwitterDocumentClassifier.java | 10 +++++++---
.../twitter/test/TwitterDocumentClassifierTest.java | 11 +++++++++++
2 files changed, 18 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bfbfc40/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDocumentClassifier.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDocumentClassifier.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDocumentClassifier.java
index bdd6682..7c8ed8c 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDocumentClassifier.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDocumentClassifier.java
@@ -24,9 +24,13 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.streams.data.DocumentClassifier;
import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Follow;
-import org.apache.streams.twitter.pojo.*;
-import sun.reflect.generics.reflectiveObjects.LazyReflectiveObjectGenerator;
+import org.apache.streams.twitter.pojo.Delete;
+import org.apache.streams.twitter.pojo.Follow;
+import org.apache.streams.twitter.pojo.FriendList;
+import org.apache.streams.twitter.pojo.Retweet;
+import org.apache.streams.twitter.pojo.Tweet;
+import org.apache.streams.twitter.pojo.User;
+import org.apache.streams.twitter.pojo.UserstreamEvent;
import java.io.IOException;
import java.util.List;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bfbfc40/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterDocumentClassifierTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterDocumentClassifierTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterDocumentClassifierTest.java
index 324a605..a55bd09 100644
--- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterDocumentClassifierTest.java
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterDocumentClassifierTest.java
@@ -21,6 +21,7 @@ package org.apache.streams.twitter.test;
import org.apache.streams.data.ActivitySerializer;
import org.apache.streams.twitter.converter.TwitterDocumentClassifier;
import org.apache.streams.twitter.pojo.Delete;
+import org.apache.streams.twitter.pojo.Follow;
import org.apache.streams.twitter.pojo.Retweet;
import org.apache.streams.twitter.pojo.Tweet;
import org.apache.streams.twitter.pojo.User;
@@ -42,6 +43,7 @@ public class TwitterDocumentClassifierTest {
private String tweet = "{\"created_at\":\"Wed Dec 11 22:27:34 +0000 2013\",\"id\":410898682381615105,\"id_str\":\"410898682381615105\",\"text\":\"Men's Basketball Single-Game Tickets Available - A limited number of tickets remain for Kentucky's upcoming men's ... http:\\/\\/t.co\\/SH5YZGpdRx\",\"source\":\"\\u003ca href=\\\"http:\\/\\/www.hootsuite.com\\\" rel=\\\"nofollow\\\"\\u003eHootSuite\\u003c\\/a\\u003e\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":91407775,\"id_str\":\"91407775\",\"name\":\"Winchester, KY\",\"screen_name\":\"winchester_ky\",\"location\":\"\",\"url\":null,\"description\":null,\"protected\":false,\"followers_count\":136,\"friends_count\":0,\"listed_count\":1,\"created_at\":\"Fri Nov 20 19:29:02 +0000 2009\",\"favourites_count\":0,\"utc_offset\":null,\"time_zone\":null,\"geo_enabled\":false,\"verified\":fa
lse,\"statuses_count\":1793,\"lang\":\"en\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C0DEED\",\"profile_background_image_url\":\"http:\\/\\/abs.twimg.com\\/images\\/themes\\/theme1\\/bg.png\",\"profile_background_image_url_https\":\"https:\\/\\/abs.twimg.com\\/images\\/themes\\/theme1\\/bg.png\",\"profile_background_tile\":false,\"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/613854495\\/winchester_sociallogo_normal.jpg\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/613854495\\/winchester_sociallogo_normal.jpg\",\"profile_link_color\":\"0084B4\",\"profile_sidebar_border_color\":\"C0DEED\",\"profile_sidebar_fill_color\":\"DDEEF6\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"default_profile\":true,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors
\":null,\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[{\"url\":\"http:\\/\\/t.co\\/SH5YZGpdRx\",\"expanded_url\":\"http:\\/\\/ow.ly\\/2C2XL1\",\"display_url\":\"ow.ly\\/2C2XL1\",\"indices\":[118,140]}],\"user_mentions\":[]},\"favorited\":false,\"retweeted\":false,\"possibly_sensitive\":false,\"filter_level\":\"medium\",\"lang\":\"en\"}\n";
private String retweet = "{\"created_at\":\"Wed Dec 11 22:27:34 +0000 2013\",\"id\":410898682385797121,\"id_str\":\"410898682385797121\",\"text\":\"RT @hemocional: Cuando te acarici\\u00e9 me di cuenta que hab\\u00eda vivido toda mi vida con las manos vac\\u00edas.\\nALEJANDRO JODOROWSKY.\",\"source\":\"web\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":163149656,\"id_str\":\"163149656\",\"name\":\"Carolina\",\"screen_name\":\"_titinaok\",\"location\":\"Montevideo\",\"url\":\"http:\\/\\/www.youtube.com\\/watch?v=N3v5vZ-tU1E\",\"description\":\"Tantas veces me defin\\u00ed ...Soy nada y todo a la vez\",\"protected\":false,\"followers_count\":41,\"friends_count\":75,\"listed_count\":2,\"created_at\":\"Mon Jul 05 17:35:49 +0000 2010\",\"favourites_count\":4697,\"utc_offset\":-10800,\"time_zone\":\"Buenos Aires\",\"geo_enabled\":fal
se,\"verified\":false,\"statuses_count\":5257,\"lang\":\"es\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C4A64B\",\"profile_background_image_url\":\"http:\\/\\/a0.twimg.com\\/profile_background_images\\/378800000096791690\\/f64a07abbaa735b39ad7655fdaa2f416.jpeg\",\"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_background_images\\/378800000096791690\\/f64a07abbaa735b39ad7655fdaa2f416.jpeg\",\"profile_background_tile\":true,\"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/378800000799213504\\/496d008f457390005825d2eb4ca50a63_normal.jpeg\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/378800000799213504\\/496d008f457390005825d2eb4ca50a63_normal.jpeg\",\"profile_banner_url\":\"https:\\/\\/pbs.twimg.com\\/profile_banners\\/163149656\\/1379722210\",\"profile_link_color\":\"BF415A\",\"profile_sidebar_border_color\":\"000000\",\"profile_sidebar_fill_color\":\"B17CED\",\"pr
ofile_text_color\":\"3D1957\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweeted_status\":{\"created_at\":\"Wed Dec 11 22:25:06 +0000 2013\",\"id\":410898060206960640,\"id_str\":\"410898060206960640\",\"text\":\"Cuando te acarici\\u00e9 me di cuenta que hab\\u00eda vivido toda mi vida con las manos vac\\u00edas.\\nALEJANDRO JODOROWSKY.\",\"source\":\"\\u003ca href=\\\"http:\\/\\/bufferapp.com\\\" rel=\\\"nofollow\\\"\\u003eBuffer\\u003c\\/a\\u003e\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":552929456,\"id_str\":\"552929456\",\"name\":\"Habilidad emocional\",\"screen_name\":\"hemocional\",\"location\":\"\",\"url\":\"http:\\/\\/www.hab
ilidademocional.com\",\"description\":\"Pensamientos y reflexiones para ayudar a mirar la vida de una manera m\\u00e1s saludable y a crecer interiormente cada d\\u00eda m\\u00e1s. #InteligenciaEmocional #Psicolog\\u00eda\",\"protected\":false,\"followers_count\":34307,\"friends_count\":325,\"listed_count\":361,\"created_at\":\"Fri Apr 13 19:00:11 +0000 2012\",\"favourites_count\":44956,\"utc_offset\":3600,\"time_zone\":\"Madrid\",\"geo_enabled\":false,\"verified\":false,\"statuses_count\":24011,\"lang\":\"es\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"000000\",\"profile_background_image_url\":\"http:\\/\\/a0.twimg.com\\/profile_background_images\\/378800000123681920\\/aab7226ae139f0ff93b04a08a8541477.jpeg\",\"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_background_images\\/378800000123681920\\/aab7226ae139f0ff93b04a08a8541477.jpeg\",\"profile_background_tile\":false,\"profile_image_url\":\"http:\\/\\/pbs.twimg
.com\\/profile_images\\/2430091220\\/zdkea46xhe3g4e65nuwl_normal.gif\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/2430091220\\/zdkea46xhe3g4e65nuwl_normal.gif\",\"profile_banner_url\":\"https:\\/\\/pbs.twimg.com\\/profile_banners\\/552929456\\/1383180255\",\"profile_link_color\":\"FF00E1\",\"profile_sidebar_border_color\":\"FFFFFF\",\"profile_sidebar_fill_color\":\"F3F3F3\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweet_count\":9,\"favorite_count\":6,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[],\"user_mentions\":[]},\"favorited\":false,\"retweeted\":false,\"lang\":\"es\"},\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[],\"user_mentions\":[{\"screen_name\":\"he
mocional\",\"name\":\"Habilidad emocional\",\"id\":552929456,\"id_str\":\"552929456\",\"indices\":[3,14]}]},\"favorited\":false,\"retweeted\":false,\"filter_level\":\"medium\",\"lang\":\"es\"}\n";
private String delete = "{\"delete\":{\"status\":{\"id\":377518972486553600,\"user_id\":1249045572,\"id_str\":\"377518972486553600\",\"user_id_str\":\"1249045572\"}}}\n";
+ private String follow = "{\"follower\":{\"id\":1249045572},\"followee\":{\"id\":32386852}}\n";
private String user = "{\"location\":\"\",\"default_profile\":true,\"profile_background_tile\":false,\"statuses_count\":1,\"lang\":\"en\",\"profile_link_color\":\"0084B4\",\"id\":32386852,\"following\":false,\"protected\":false,\"favourites_count\":0,\"profile_text_color\":\"333333\",\"description\":\"\",\"verified\":false,\"contributors_enabled\":false,\"profile_sidebar_border_color\":\"C0DEED\",\"name\":\"Fred Gilkey\",\"profile_background_color\":\"C0DEED\",\"created_at\":\"Fri Apr 17 12:35:56 +0000 2009\",\"is_translation_enabled\":false,\"default_profile_image\":true,\"followers_count\":2,\"profile_image_url_https\":\"https://abs.twimg.com/sticky/default_profile_images/default_profile_1_normal.png\",\"geo_enabled\":false,\"status\":{\"contributors\":null,\"text\":\"Working\",\"geo\":null,\"retweeted\":false,\"in_reply_to_screen_name\":null,\"truncated\":false,\"lang\":\"en\",\"entities\":{\"symbols\":[],\"urls\":[],\"hashtags\":[],\"user_mentions\":[]},\"in_reply_to_status_
id_str\":null,\"id\":1541596700,\"source\":\"web\",\"in_reply_to_user_id_str\":null,\"favorited\":false,\"in_reply_to_status_id\":null,\"retweet_count\":0,\"created_at\":\"Fri Apr 17 12:37:54 +0000 2009\",\"in_reply_to_user_id\":null,\"favorite_count\":0,\"id_str\":\"1541596700\",\"place\":null,\"coordinates\":null},\"profile_background_image_url\":\"http://abs.twimg.com/images/themes/theme1/bg.png\",\"profile_background_image_url_https\":\"https://abs.twimg.com/images/themes/theme1/bg.png\",\"follow_request_sent\":false,\"entities\":{\"description\":{\"urls\":[]}},\"url\":null,\"utc_offset\":null,\"time_zone\":null,\"notifications\":false,\"profile_use_background_image\":true,\"friends_count\":1,\"profile_sidebar_fill_color\":\"DDEEF6\",\"screen_name\":\"4TYLove\",\"id_str\":\"32386852\",\"profile_image_url\":\"http://abs.twimg.com/sticky/default_profile_images/default_profile_1_normal.png\",\"listed_count\":0,\"is_translator\":false}";
@Test
@@ -72,6 +74,15 @@ public class TwitterDocumentClassifierTest {
}
@Test
+ public void testDetectFollow() {
+ List<Class> detected = new TwitterDocumentClassifier().detectClasses(follow);
+ Assert.assertTrue(detected.size() == 1);
+ Class result = detected.get(0);
+ if( !result.equals(Follow.class) )
+ Assert.fail();
+ }
+
+ @Test
public void testDetectUser() {
List<Class> detected = new TwitterDocumentClassifier().detectClasses(user);
Assert.assertTrue(detected.size() == 1);
[2/9] incubator-streams git commit: cleanup post-277
Posted by sb...@apache.org.
cleanup post-277
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/e5d50866
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/e5d50866
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/e5d50866
Branch: refs/heads/master
Commit: e5d50866be407e9a25ecf9936c0a16631d01ee2b
Parents: 43a3818
Author: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Authored: Thu Mar 26 13:04:23 2015 -0500
Committer: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Committed: Thu Mar 26 13:04:23 2015 -0500
----------------------------------------------------------------------
.../provider/TwitterFollowersProviderTask.java | 3 +--
.../provider/TwitterFollowingProvider.java | 18 ++++++++++++++++++
.../provider/TwitterFriendsProviderTask.java | 3 +--
.../TwitterFollowActivitySerializer.java | 4 +---
4 files changed, 21 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e5d50866/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersProviderTask.java
index c0f8e8a..9f91267 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersProviderTask.java
@@ -26,7 +26,6 @@ import org.apache.streams.core.StreamsDatum;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.twitter.pojo.Follow;
import org.apache.streams.twitter.pojo.User;
-import org.apache.streams.twitter.serializer.StreamsTwitterMapper;
import org.apache.streams.util.ComponentUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,7 +42,7 @@ public class TwitterFollowersProviderTask implements Runnable {
private final static Logger LOGGER = LoggerFactory.getLogger(TwitterFollowersProviderTask.class);
- private final static ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(StreamsTwitterMapper.TWITTER_FORMAT));
+ private final static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
protected TwitterFollowingProvider provider;
protected Twitter client;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e5d50866/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
index a4df278..0a62fc8 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.apache.streams.twitter.provider;
import com.google.common.base.Preconditions;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e5d50866/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsProviderTask.java
index 5d3a7d2..408a614 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsProviderTask.java
@@ -25,7 +25,6 @@ import com.google.common.collect.Lists;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.twitter.pojo.Follow;
-import org.apache.streams.twitter.serializer.StreamsTwitterMapper;
import org.apache.streams.util.ComponentUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,7 +43,7 @@ public class TwitterFriendsProviderTask implements Runnable {
private final static Logger LOGGER = LoggerFactory.getLogger(TwitterFriendsProviderTask.class);
- private final static ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(StreamsTwitterMapper.TWITTER_FORMAT));
+ private final static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
protected TwitterFollowingProvider provider;
protected Twitter client;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e5d50866/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterFollowActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterFollowActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterFollowActivitySerializer.java
index 60da85a..4dd29cc 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterFollowActivitySerializer.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterFollowActivitySerializer.java
@@ -26,14 +26,12 @@ import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.pojo.json.Activity;
import org.apache.streams.twitter.pojo.Follow;
import org.apache.streams.twitter.pojo.User;
-import org.apache.streams.twitter.serializer.util.TwitterActivityUtil;
+import org.apache.streams.twitter.converter.util.TwitterActivityUtil;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
-import static org.apache.streams.twitter.serializer.util.TwitterActivityUtil.updateActivity;
-
public class TwitterFollowActivitySerializer implements ActivitySerializer<Follow>, Serializable {
public TwitterFollowActivitySerializer() {}
[5/9] incubator-streams git commit: follow Json detection
Posted by sb...@apache.org.
follow Json detection
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/c14e335e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/c14e335e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/c14e335e
Branch: refs/heads/master
Commit: c14e335ede2d05c1265af609fbbfea196001802f
Parents: 3dceeb5
Author: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Authored: Thu Mar 26 20:14:28 2015 -0500
Committer: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Committed: Thu Mar 26 20:14:28 2015 -0500
----------------------------------------------------------------------
.../streams/twitter/converter/TwitterDocumentClassifier.java | 2 ++
1 file changed, 2 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c14e335e/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDocumentClassifier.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDocumentClassifier.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDocumentClassifier.java
index 9eda110..bdd6682 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDocumentClassifier.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDocumentClassifier.java
@@ -68,6 +68,8 @@ public class TwitterDocumentClassifier implements DocumentClassifier {
classList.add(FriendList.class);
else if( objectNode.findValue("target_object") != null )
classList.add(UserstreamEvent.class);
+ else if( objectNode.findValue("follower") != null && objectNode.findValue("followee") != null)
+ classList.add(Follow.class);
else if ( objectNode.findValue("location") != null && objectNode.findValue("user") == null)
classList.add(User.class);
else
[8/9] incubator-streams git commit: PR feedback for
https://github.com/apache/incubator-streams/pull/202
Posted by sb...@apache.org.
PR feedback for https://github.com/apache/incubator-streams/pull/202
Signed-off-by: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/835539af
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/835539af
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/835539af
Branch: refs/heads/master
Commit: 835539af9b791f1a0f7bf968bcb0507b75a63304
Parents: 8bf1cfa
Author: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Authored: Thu Apr 2 16:08:24 2015 -0500
Committer: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Committed: Thu Apr 2 16:08:24 2015 -0500
----------------------------------------------------------------------
.../streams-provider-twitter/pom.xml | 2 +-
.../provider/TwitterFollowersProviderTask.java | 181 -------------------
.../provider/TwitterFollowingProvider.java | 28 +--
.../provider/TwitterFollowingProviderTask.java | 160 ++++++++++++++++
4 files changed, 168 insertions(+), 203 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/835539af/streams-contrib/streams-provider-twitter/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/pom.xml b/streams-contrib/streams-provider-twitter/pom.xml
index 26211d7..acdbf4a 100644
--- a/streams-contrib/streams-provider-twitter/pom.xml
+++ b/streams-contrib/streams-provider-twitter/pom.xml
@@ -88,7 +88,7 @@
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-core</artifactId>
- <version>4.0.1</version>
+ <version>4.0.3</version>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/835539af/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersProviderTask.java
deleted file mode 100644
index cb679a6..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersProviderTask.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.provider;
-
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.databind.JsonMappingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Lists;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.twitter.pojo.Follow;
-import org.apache.streams.twitter.pojo.User;
-import org.apache.streams.util.ComponentUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import twitter4j.PagableResponseList;
-import twitter4j.Twitter;
-import twitter4j.TwitterException;
-import twitter4j.TwitterObjectFactory;
-
-import java.io.IOException;
-
-/**
- * Retrieve recent posts for a single user id.
- */
-public class TwitterFollowersProviderTask implements Runnable {
-
- private final static Logger LOGGER = LoggerFactory.getLogger(TwitterFollowersProviderTask.class);
-
- private final static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-
- protected TwitterFollowingProvider provider;
- protected Twitter client;
- protected Long id;
- protected String screenName;
-
- public TwitterFollowersProviderTask(TwitterFollowingProvider provider, Twitter twitter, Long id) {
- this.provider = provider;
- this.client = twitter;
- this.id = id;
- }
-
- public TwitterFollowersProviderTask(TwitterFollowingProvider provider, Twitter twitter, String screenName) {
- this.provider = provider;
- this.client = twitter;
- this.screenName = screenName;
- }
-
-
- @Override
- public void run() {
-
- if( id != null )
- getFollowers(id);
- if( screenName != null)
- getFollowers(screenName);
-
- LOGGER.info(id != null ? id.toString() : screenName + " Thread Finished");
-
- }
-
- protected void getFollowers(Long id) {
-
- int keepTrying = 0;
-
- long curser = -1;
-
- do
- {
- try
- {
- twitter4j.User followee4j;
- String followeeJson;
- try {
- followee4j = client.users().showUser(id);
- followeeJson = TwitterObjectFactory.getRawJSON(followee4j);
- } catch (TwitterException e) {
- LOGGER.error("Failure looking up " + id);
- break;
- }
-
- PagableResponseList<twitter4j.User> followerList = client.friendsFollowers().getFollowersList(id.longValue(), curser);
-
- for (twitter4j.User follower4j : followerList) {
-
- String followerJson = TwitterObjectFactory.getRawJSON(follower4j);
-
- try {
- Follow follow = new Follow()
- .withFollowee(mapper.readValue(followeeJson, User.class))
- .withFollower(mapper.readValue(followerJson, User.class));
-
- ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue);
- } catch (JsonParseException e) {
- LOGGER.warn(e.getMessage());
- } catch (JsonMappingException e) {
- LOGGER.warn(e.getMessage());
- } catch (IOException e) {
- LOGGER.warn(e.getMessage());
- }
- }
- curser = followerList.getNextCursor();
- }
- catch(TwitterException twitterException) {
- keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
- }
- catch(Exception e) {
- keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
- }
- } while (curser != 0 && keepTrying < 10);
- }
-
- protected void getFollowers(String screenName) {
-
- int keepTrying = 0;
-
- long curser = -1;
-
- do
- {
- try
- {
- twitter4j.User followee4j;
- String followeeJson;
- try {
- followee4j = client.users().showUser(screenName);
- followeeJson = TwitterObjectFactory.getRawJSON(followee4j);
- } catch (TwitterException e) {
- LOGGER.error("Failure looking up " + screenName);
- break;
- }
-
- PagableResponseList<twitter4j.User> followerList = client.friendsFollowers().getFollowersList(screenName, curser);
-
- for (twitter4j.User follower4j : followerList) {
-
- String followerJson = TwitterObjectFactory.getRawJSON(follower4j);
-
- try {
- Follow follow = new Follow()
- .withFollowee(mapper.readValue(followeeJson, User.class))
- .withFollower(mapper.readValue(followerJson, User.class));
-
- ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue);
- } catch (JsonParseException e) {
- LOGGER.warn(e.getMessage());
- } catch (JsonMappingException e) {
- LOGGER.warn(e.getMessage());
- } catch (IOException e) {
- LOGGER.warn(e.getMessage());
- }
- }
- curser = followerList.getNextCursor();
- }
- catch(TwitterException twitterException) {
- keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
- }
- catch(Exception e) {
- keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
- }
- } while (curser != 0 && keepTrying < 10);
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/835539af/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
index 70c92c9..272d4b7 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
@@ -20,7 +20,6 @@ package org.apache.streams.twitter.provider;
import com.google.common.base.Preconditions;
import com.google.common.collect.Queues;
-import com.typesafe.config.Config;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.DatumStatusCounter;
import org.apache.streams.core.StreamsDatum;
@@ -80,33 +79,20 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider {
protected void submitFollowingThreads(Long[] ids) {
Twitter client = getTwitterClient();
- if( getConfig().getEndpoint().equals("friends") ) {
- for (int i = 0; i < ids.length; i++) {
- TwitterFriendsProviderTask providerTask = new TwitterFriendsProviderTask(this, client, ids[i]);
- executor.submit(providerTask);
- }
- } else if( getConfig().getEndpoint().equals("followers") ) {
- for (int i = 0; i < ids.length; i++) {
- TwitterFollowersProviderTask providerTask = new TwitterFollowersProviderTask(this, client, ids[i]);
- executor.submit(providerTask);
- }
+ for (int i = 0; i < ids.length; i++) {
+ TwitterFollowingProviderTask providerTask = new TwitterFollowingProviderTask(this, client, ids[i], getConfig().getEndpoint());
+ executor.submit(providerTask);
}
}
protected void submitFollowingThreads(String[] screenNames) {
Twitter client = getTwitterClient();
- if( getConfig().getEndpoint().equals("friends") ) {
- for (int i = 0; i < screenNames.length; i++) {
- TwitterFriendsProviderTask providerTask = new TwitterFriendsProviderTask(this, client, screenNames[i]);
- executor.submit(providerTask);
- }
- } else if( getConfig().getEndpoint().equals("followers") ) {
- for (int i = 0; i < screenNames.length; i++) {
- TwitterFollowersProviderTask providerTask = new TwitterFollowersProviderTask(this, client, screenNames[i]);
- executor.submit(providerTask);
- }
+ for (int i = 0; i < screenNames.length; i++) {
+ TwitterFollowingProviderTask providerTask = new TwitterFollowingProviderTask(this, client, screenNames[i], getConfig().getEndpoint());
+ executor.submit(providerTask);
}
+
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/835539af/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
new file mode 100644
index 0000000..ea737c5
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.provider;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.twitter.pojo.Follow;
+import org.apache.streams.twitter.pojo.User;
+import org.apache.streams.util.ComponentUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import twitter4j.PagableResponseList;
+import twitter4j.Twitter;
+import twitter4j.TwitterException;
+import twitter4j.TwitterObjectFactory;
+
+import java.io.IOException;
+
+/**
+ * Retrieve recent posts for a single user id.
+ */
+public class TwitterFollowingProviderTask implements Runnable {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(TwitterFollowingProviderTask.class);
+
+ private final static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+ protected TwitterFollowingProvider provider;
+ protected Twitter client;
+ protected Long id;
+ protected String screenName;
+ protected String endpoint;
+
+ private int max_per_page = 200;
+
+ public TwitterFollowingProviderTask(TwitterFollowingProvider provider, Twitter twitter, Long id, String endpoint) {
+ this.provider = provider;
+ this.client = twitter;
+ this.id = id;
+ this.endpoint = endpoint;
+ }
+
+ public TwitterFollowingProviderTask(TwitterFollowingProvider provider, Twitter twitter, String screenName, String endpoint) {
+ this.provider = provider;
+ this.client = twitter;
+ this.screenName = screenName;
+ this.endpoint = endpoint;
+ }
+
+
+ @Override
+ public void run() {
+
+ Preconditions.checkArgument(id != null || screenName != null);
+
+ if( id != null )
+ getFollowing(id);
+ else if( screenName != null)
+ getFollowing(screenName);
+
+ LOGGER.info(id != null ? id.toString() : screenName + " Thread Finished");
+
+ }
+
+ protected void getFollowing(Long id) {
+
+ Preconditions.checkArgument(endpoint.equals("friends") || endpoint.equals("followers"));
+
+ int keepTrying = 0;
+
+ long curser = -1;
+
+ do
+ {
+ try
+ {
+ twitter4j.User followee4j;
+ String followeeJson;
+ try {
+ followee4j = client.users().showUser(id);
+ followeeJson = TwitterObjectFactory.getRawJSON(followee4j);
+ } catch (TwitterException e) {
+ LOGGER.error("Failure looking up " + id);
+ break;
+ }
+
+ PagableResponseList<twitter4j.User> list = null;
+ if( endpoint.equals("followers") )
+ list = client.friendsFollowers().getFollowersList(id.longValue(), curser, max_per_page);
+ else if( endpoint.equals("friends") )
+ list = client.friendsFollowers().getFriendsList(id.longValue(), curser, max_per_page);
+
+ Preconditions.checkNotNull(list);
+ Preconditions.checkArgument(list.size() > 0);
+
+ for (twitter4j.User follower4j : list) {
+
+ String followerJson = TwitterObjectFactory.getRawJSON(follower4j);
+
+ try {
+ Follow follow = new Follow()
+ .withFollowee(mapper.readValue(followeeJson, User.class))
+ .withFollower(mapper.readValue(followerJson, User.class));
+
+ ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue);
+ } catch (JsonParseException e) {
+ LOGGER.warn(e.getMessage());
+ } catch (JsonMappingException e) {
+ LOGGER.warn(e.getMessage());
+ } catch (IOException e) {
+ LOGGER.warn(e.getMessage());
+ }
+ }
+ if( list.size() == max_per_page )
+ curser = list.getNextCursor();
+ else break;
+ }
+ catch(TwitterException twitterException) {
+ keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
+ }
+ catch(Exception e) {
+ keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
+ }
+ } while (curser != 0 && keepTrying < 10);
+ }
+
+ protected void getFollowing(String screenName) {
+
+ twitter4j.User user = null;
+ try {
+ user = client.users().showUser(screenName);
+ } catch (TwitterException e) {
+ LOGGER.error("Failure looking up " + id);
+ }
+ Preconditions.checkNotNull(user);
+ getFollowing(user.getId());
+ }
+
+
+}