You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/11/25 20:25:02 UTC
[21/42] incubator-streams git commit: STREAMS-440: custom
checkstyle.xml, address compliance
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserActivityConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserActivityConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserActivityConverter.java
index ef74371..b8ce79b 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserActivityConverter.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserActivityConverter.java
@@ -18,65 +18,69 @@
package org.apache.streams.twitter.converter;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.NotImplementedException;
import org.apache.streams.data.ActivityConverter;
import org.apache.streams.exceptions.ActivityConversionException;
import org.apache.streams.pojo.json.Activity;
import org.apache.streams.twitter.pojo.User;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.NotImplementedException;
+
import java.util.List;
import static org.apache.streams.twitter.converter.util.TwitterActivityUtil.updateActivity;
public class TwitterJsonUserActivityConverter implements ActivityConverter<User> {
- public static Class requiredClass = User.class;
+ public static Class requiredClass = User.class;
- @Override
- public Class requiredClass() {
- return requiredClass;
- }
+ @Override
+ public Class requiredClass() {
+ return requiredClass;
+ }
- private static TwitterJsonUserActivityConverter instance = new TwitterJsonUserActivityConverter();
+ private static TwitterJsonUserActivityConverter instance = new TwitterJsonUserActivityConverter();
- public static TwitterJsonUserActivityConverter getInstance() {
- return instance;
- }
+ public static TwitterJsonUserActivityConverter getInstance() {
+ return instance;
+ }
- @Override
- public String serializationFormat() {
- return null;
- }
+ @Override
+ public String serializationFormat() {
+ return null;
+ }
- @Override
- public User fromActivity(Activity deserialized) throws ActivityConversionException {
- throw new NotImplementedException();
- }
+ @Override
+ public User fromActivity(Activity deserialized) throws ActivityConversionException {
+ throw new NotImplementedException();
+ }
- @Override
- public List<Activity> toActivityList(User user) throws ActivityConversionException {
+ @Override
+ public List<User> fromActivityList(List<Activity> list) {
+ throw new NotImplementedException();
+ }
- Activity activity = new Activity();
- updateActivity(user, activity);
- return Lists.newArrayList(activity);
- }
+ @Override
+ public List<Activity> toActivityList(User user) throws ActivityConversionException {
- @Override
- public List<User> fromActivityList(List<Activity> list) {
- throw new NotImplementedException();
- }
+ Activity activity = new Activity();
+ updateActivity(user, activity);
+
+ return Lists.newArrayList(activity);
+ }
- @Override
- public List<Activity> toActivityList(List<User> serializedList) {
- List<Activity> result = Lists.newArrayList();
- for( User item : serializedList ) {
- try {
- List<Activity> activities = toActivityList(item);
- result.addAll(activities);
- } catch (ActivityConversionException e) {}
- }
- return result;
+ @Override
+ public List<Activity> toActivityList(List<User> serializedList) {
+ List<Activity> result = Lists.newArrayList();
+ for ( User item : serializedList ) {
+ try {
+ List<Activity> activities = toActivityList(item);
+ result.addAll(activities);
+ } catch (ActivityConversionException ex) {
+ //
+ }
}
+ return result;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserActivityObjectConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserActivityObjectConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserActivityObjectConverter.java
index d62b1e8..7cb4158 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserActivityObjectConverter.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserActivityObjectConverter.java
@@ -18,47 +18,42 @@
package org.apache.streams.twitter.converter;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.streams.data.ActivityConverter;
import org.apache.streams.data.ActivityObjectConverter;
import org.apache.streams.exceptions.ActivityConversionException;
-import org.apache.streams.pojo.json.Activity;
import org.apache.streams.pojo.json.ActivityObject;
import org.apache.streams.twitter.pojo.User;
-import java.util.List;
+import org.apache.commons.lang.NotImplementedException;
import static org.apache.streams.twitter.converter.util.TwitterActivityUtil.buildActor;
-import static org.apache.streams.twitter.converter.util.TwitterActivityUtil.updateActivity;
public class TwitterJsonUserActivityObjectConverter implements ActivityObjectConverter<User> {
- public static Class requiredClass = User.class;
+ public static Class requiredClass = User.class;
- @Override
- public Class requiredClass() {
- return requiredClass;
- }
+ @Override
+ public Class requiredClass() {
+ return requiredClass;
+ }
- private static TwitterJsonUserActivityObjectConverter instance = new TwitterJsonUserActivityObjectConverter();
+ private static TwitterJsonUserActivityObjectConverter instance = new TwitterJsonUserActivityObjectConverter();
- public static TwitterJsonUserActivityObjectConverter getInstance() {
- return instance;
- }
+ public static TwitterJsonUserActivityObjectConverter getInstance() {
+ return instance;
+ }
- @Override
- public String serializationFormat() {
- return null;
- }
+ @Override
+ public String serializationFormat() {
+ return null;
+ }
- @Override
- public User fromActivityObject(ActivityObject deserialized) throws ActivityConversionException {
- throw new NotImplementedException();
- }
+ @Override
+ public User fromActivityObject(ActivityObject deserialized) throws ActivityConversionException {
+ throw new NotImplementedException();
+ }
- @Override
- public ActivityObject toActivityObject(User serialized) throws ActivityConversionException {
- return buildActor(serialized);
- }
+ @Override
+ public ActivityObject toActivityObject(User serialized) throws ActivityConversionException {
+ return buildActor(serialized);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserstreameventActivityConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserstreameventActivityConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserstreameventActivityConverter.java
index bb31fd6..6685a96 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserstreameventActivityConverter.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserstreameventActivityConverter.java
@@ -18,15 +18,16 @@
package org.apache.streams.twitter.converter;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.NotImplementedException;
import org.apache.streams.data.ActivityConverter;
import org.apache.streams.exceptions.ActivityConversionException;
import org.apache.streams.pojo.json.Activity;
import org.apache.streams.pojo.json.ActivityObject;
import org.apache.streams.twitter.pojo.UserstreamEvent;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.NotImplementedException;
+
import java.util.List;
import static org.apache.streams.twitter.converter.util.TwitterActivityUtil.formatId;
@@ -34,87 +35,101 @@ import static org.apache.streams.twitter.converter.util.TwitterActivityUtil.getP
/**
-* Created with IntelliJ IDEA.
-* User: mdelaet
-* Date: 9/30/13
-* Time: 9:24 AM
-* To change this template use File | Settings | File Templates.
-*/
+ * TwitterJsonUserstreameventActivityConverter.
+ */
+// TODO: Use this class explicitely somewhere
public class TwitterJsonUserstreameventActivityConverter implements ActivityConverter<UserstreamEvent> {
- public static Class requiredClass = UserstreamEvent.class;
-
- @Override
- public Class requiredClass() {
- return requiredClass;
- }
-
- private static TwitterJsonUserstreameventActivityConverter instance = new TwitterJsonUserstreameventActivityConverter();
-
- public static TwitterJsonUserstreameventActivityConverter getInstance() {
- return instance;
- }
-
- @Override
- public String serializationFormat() {
- return null;
- }
-
- @Override
- public UserstreamEvent fromActivity(Activity deserialized) throws ActivityConversionException {
- throw new NotImplementedException();
- }
-
- @Override
- public List<Activity> toActivityList(UserstreamEvent userstreamEvent) throws ActivityConversionException {
-
- Activity activity = convert(userstreamEvent);
- return Lists.newArrayList(activity);
-
- }
-
- @Override
- public List<UserstreamEvent> fromActivityList(List<Activity> list) {
- throw new NotImplementedException();
- }
-
- @Override
- public List<Activity> toActivityList(List<UserstreamEvent> serializedList) {
- return null;
- }
-
- public Activity convert(UserstreamEvent event) throws ActivityConversionException {
-
- Activity activity = new Activity();
- activity.setActor(buildActor(event));
- activity.setVerb(detectVerb(event));
- activity.setObject(buildActivityObject(event));
- activity.setId(formatId(activity.getVerb()));
- if(Strings.isNullOrEmpty(activity.getId()))
- throw new ActivityConversionException("Unable to determine activity id");
- activity.setProvider(getProvider());
- return activity;
- }
-
- public ActivityObject buildActor(UserstreamEvent event) {
- ActivityObject actor = new ActivityObject();
- //actor.setId(formatId(delete.getDelete().getStatus().getUserIdStr()));
- return actor;
- }
-
- public ActivityObject buildActivityObject(UserstreamEvent event) {
- ActivityObject actObj = new ActivityObject();
- //actObj.setId(formatId(delete.getDelete().getStatus().getIdStr()));
- //actObj.setObjectType("tweet");
- return actObj;
- }
-
- public String detectVerb(UserstreamEvent event) {
- return null;
- }
-
- public ActivityObject buildTarget(UserstreamEvent event) {
- return null;
+ public static Class requiredClass = UserstreamEvent.class;
+
+ @Override
+ public Class requiredClass() {
+ return requiredClass;
+ }
+
+ private static TwitterJsonUserstreameventActivityConverter instance = new TwitterJsonUserstreameventActivityConverter();
+
+ public static TwitterJsonUserstreameventActivityConverter getInstance() {
+ return instance;
+ }
+
+ @Override
+ public String serializationFormat() {
+ return null;
+ }
+
+ @Override
+ public UserstreamEvent fromActivity(Activity deserialized) throws ActivityConversionException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public List<UserstreamEvent> fromActivityList(List<Activity> list) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public List<Activity> toActivityList(UserstreamEvent userstreamEvent) throws ActivityConversionException {
+
+ Activity activity = convert(userstreamEvent);
+ return Lists.newArrayList(activity);
+
+ }
+
+ @Override
+ public List<Activity> toActivityList(List<UserstreamEvent> serializedList) {
+ return null;
+ }
+
+ /**
+ * convert UserstreamEvent to Activity.
+ * @param event UserstreamEvent
+ * @return Activity
+ * @throws ActivityConversionException ActivityConversionException
+ */
+ public Activity convert(UserstreamEvent event) throws ActivityConversionException {
+
+ Activity activity = new Activity();
+ activity.setActor(buildActor(event));
+ activity.setVerb(detectVerb(event));
+ activity.setObject(buildActivityObject(event));
+ activity.setId(formatId(activity.getVerb()));
+ if (Strings.isNullOrEmpty(activity.getId())) {
+ throw new ActivityConversionException("Unable to determine activity id");
}
+ activity.setProvider(getProvider());
+ return activity;
+ }
+
+ /**
+ * build ActivityObject from UserstreamEvent
+ * @param event UserstreamEvent
+ * @return $.actor
+ */
+ public ActivityObject buildActor(UserstreamEvent event) {
+ ActivityObject actor = new ActivityObject();
+ //actor.setId(formatId(delete.getDelete().getStatus().getUserIdStr()));
+ return actor;
+ }
+
+ /**
+ * build ActivityObject from UserstreamEvent
+ * @param event UserstreamEvent
+ * @return $.object
+ */
+ public ActivityObject buildActivityObject(UserstreamEvent event) {
+ ActivityObject actObj = new ActivityObject();
+ //actObj.setId(formatId(delete.getDelete().getStatus().getIdStr()));
+ //actObj.setObjectType("tweet");
+ return actObj;
+ }
+
+ public String detectVerb(UserstreamEvent event) {
+ return null;
+ }
+
+ public ActivityObject buildTarget(UserstreamEvent event) {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/util/TwitterActivityUtil.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/util/TwitterActivityUtil.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/util/TwitterActivityUtil.java
index 4015514..e0e2e80 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/util/TwitterActivityUtil.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/util/TwitterActivityUtil.java
@@ -19,12 +19,6 @@
package org.apache.streams.twitter.converter.util;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
import org.apache.streams.exceptions.ActivityConversionException;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.pojo.extensions.ExtensionUtil;
@@ -41,6 +35,14 @@ import org.apache.streams.twitter.pojo.Retweet;
import org.apache.streams.twitter.pojo.Tweet;
import org.apache.streams.twitter.pojo.User;
import org.apache.streams.twitter.pojo.UserMentions;
+import org.apache.streams.twitter.provider.TwitterErrorHandler;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,323 +54,342 @@ import java.util.Map;
import static com.google.common.math.DoubleMath.mean;
/**
- * Provides utilities for working with Activity objects within the context of Twitter
+ * Provides utilities for working with Activity objects within the context of Twitter.
*/
public class TwitterActivityUtil {
- private final static Logger LOGGER = LoggerFactory.getLogger(TwitterActivityUtil.class);
-
- /**
- * Updates the given Activity object with the values from the Tweet
- * @param tweet the object to use as the source
- * @param activity the target of the updates. Will receive all values from the tweet.
- * @throws org.apache.streams.exceptions.ActivityConversionException
- */
- public static void updateActivity(Tweet tweet, Activity activity) throws ActivityConversionException {
- ObjectMapper mapper = StreamsJacksonMapper.getInstance();
- activity.setActor(buildActor(tweet));
- activity.setId(formatId(activity.getVerb(),
- Optional.fromNullable(
- tweet.getIdStr())
- .or(Optional.of(tweet.getId().toString()))
- .orNull()));
-
- if(tweet instanceof Retweet) {
- updateActivityContent(activity, ((Retweet) tweet).getRetweetedStatus(), "share");
- } else {
- updateActivityContent(activity, tweet, "post");
- }
-
- if(Strings.isNullOrEmpty(activity.getId()))
- throw new ActivityConversionException("Unable to determine activity id");
- try {
- activity.setPublished(tweet.getCreatedAt());
- } catch( Exception e ) {
- throw new ActivityConversionException("Unable to determine publishedDate", e);
- }
- activity.setTarget(buildTarget(tweet));
- activity.setProvider(getProvider());
- activity.setUrl(String.format("http://twitter.com/%s/%s/%s", tweet.getUser().getScreenName(),"/status/",tweet.getIdStr()));
-
- addTwitterExtension(activity, mapper.convertValue(tweet, ObjectNode.class));
+ private static final Logger LOGGER = LoggerFactory.getLogger(TwitterActivityUtil.class);
+
+ static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+ /**
+ * Updates the given Activity object with the values from the Tweet.
+ * @param tweet the object to use as the source
+ * @param activity the target of the updates. Will receive all values from the tweet.
+ * @throws ActivityConversionException ActivityConversionException
+ */
+ public static void updateActivity(Tweet tweet, Activity activity) throws ActivityConversionException {
+ activity.setActor(buildActor(tweet));
+ activity.setId(formatId(activity.getVerb(),
+ Optional.fromNullable(
+ tweet.getIdStr())
+ .or(Optional.of(tweet.getId().toString()))
+ .orNull()));
+
+ if (tweet instanceof Retweet) {
+ updateActivityContent(activity, ((Retweet) tweet).getRetweetedStatus(), "share");
+ } else {
+ updateActivityContent(activity, tweet, "post");
}
- /**
- * Updates the given Activity object with the values from the User
- * @param user the object to use as the source
- * @param activity the target of the updates. Will receive all values from the tweet.
- */
- public static void updateActivity(User user, Activity activity) {
- activity.setActor(buildActor(user));
- activity.setId(null);
- activity.setVerb(null);
+ if (Strings.isNullOrEmpty(activity.getId())) {
+ throw new ActivityConversionException("Unable to determine activity id");
}
-
- /**
- * Updates the activity for a delete event
- * @param delete the delete event
- * @param activity the Activity object to update
- * @throws org.apache.streams.exceptions.ActivityConversionException
- */
- public static void updateActivity(Delete delete, Activity activity) throws ActivityConversionException {
- activity.setActor(buildActor(delete));
- activity.setVerb("delete");
- activity.setObject(buildActivityObject(delete));
- activity.setId(formatId(activity.getVerb(), delete.getDelete().getStatus().getIdStr()));
- if(Strings.isNullOrEmpty(activity.getId()))
- throw new ActivityConversionException("Unable to determine activity id");
- activity.setProvider(getProvider());
- addTwitterExtension(activity, StreamsJacksonMapper.getInstance().convertValue(delete, ObjectNode.class));
+ try {
+ activity.setPublished(tweet.getCreatedAt());
+ } catch ( Exception ex ) {
+ throw new ActivityConversionException("Unable to determine publishedDate", ex);
}
-
- /**
- * Builds the actor for a delete event
- * @param delete the delete event
- * @return a valid Actor
- */
- public static ActivityObject buildActor(Delete delete) {
- ActivityObject actor = new ActivityObject();
- actor.setId(formatId(delete.getDelete().getStatus().getUserIdStr()));
- actor.setObjectType("page");
- return actor;
+ activity.setTarget(buildTarget(tweet));
+ activity.setProvider(getProvider());
+ activity.setUrl(String.format("http://twitter.com/%s/%s/%s", tweet.getUser().getScreenName(),"/status/",tweet.getIdStr()));
+
+ addTwitterExtension(activity, mapper.convertValue(tweet, ObjectNode.class));
+ }
+
+ /**
+ * Updates the given Activity object with the values from the User
+ * @param user the object to use as the source
+ * @param activity the target of the updates. Will receive all values from the tweet.
+ */
+ public static void updateActivity(User user, Activity activity) {
+ activity.setActor(buildActor(user));
+ activity.setId(null);
+ activity.setVerb(null);
+ }
+
+ /**
+ * Updates the activity for a delete event.
+ * @param delete the delete event
+ * @param activity the Activity object to update
+ * @throws ActivityConversionException ActivityConversionException
+ */
+ public static void updateActivity(Delete delete, Activity activity) throws ActivityConversionException {
+ activity.setActor(buildActor(delete));
+ activity.setVerb("delete");
+ activity.setObject(buildActivityObject(delete));
+ activity.setId(formatId(activity.getVerb(), delete.getDelete().getStatus().getIdStr()));
+ if (Strings.isNullOrEmpty(activity.getId())) {
+ throw new ActivityConversionException("Unable to determine activity id");
}
-
- /**
- * Builds the ActivityObject for the delete event
- * @param delete the delete event
- * @return a valid Activity Object
- */
- public static ActivityObject buildActivityObject(Delete delete) {
- ActivityObject actObj = new ActivityObject();
- actObj.setId(formatId(delete.getDelete().getStatus().getIdStr()));
- actObj.setObjectType("tweet");
- return actObj;
+ activity.setProvider(getProvider());
+ addTwitterExtension(activity, StreamsJacksonMapper.getInstance().convertValue(delete, ObjectNode.class));
+ }
+
+ /**
+ * Builds the activity {@link org.apache.streams.pojo.json.ActivityObject} actor from the tweet
+ * @param tweet the object to use as the source
+ * @return a valid Actor populated from the Tweet
+ */
+ public static ActivityObject buildActor(Tweet tweet) {
+ ActivityObject actor = new ActivityObject();
+ User user = tweet.getUser();
+
+ return buildActor(user);
+ }
+
+ /**
+ * Builds the activity {@link org.apache.streams.pojo.json.ActivityObject} actor from the User
+ * @param user the object to use as the source
+ * @return a valid Actor populated from the Tweet
+ */
+ public static ActivityObject buildActor(User user) {
+ ActivityObject actor = new ActivityObject();
+ actor.setId(formatId(
+ Optional.fromNullable(
+ user.getIdStr())
+ .or(Optional.of(user.getId().toString()))
+ .orNull()
+ ));
+ actor.setObjectType("page");
+ actor.setDisplayName(user.getName());
+ actor.setAdditionalProperty("handle", user.getScreenName());
+ actor.setSummary(user.getDescription());
+
+ if (user.getUrl() != null) {
+ actor.setUrl(user.getUrl());
}
-
- /**
- * Updates the content, and associated fields, with those from the given tweet
- * @param activity the target of the updates. Will receive all values from the tweet.
- * @param tweet the object to use as the source
- * @param verb the verb for the given activity's type
- */
- public static void updateActivityContent(Activity activity, Tweet tweet, String verb) {
- activity.setVerb(verb);
- activity.setTitle("");
- if( tweet != null ) {
- activity.setObject(buildActivityObject(tweet));
- activity.setLinks(getLinks(tweet));
- activity.setContent(tweet.getText());
- addLocationExtension(activity, tweet);
- addTwitterExtensions(activity, tweet);
- }
+ Map<String, Object> extensions = new HashMap<>();
+ extensions.put("location", user.getLocation());
+ extensions.put("posts", user.getStatusesCount());
+ extensions.put("favorites", user.getFavouritesCount());
+ extensions.put("followers", user.getFollowersCount());
+
+ Image profileImage = new Image();
+ profileImage.setUrl(user.getProfileImageUrlHttps());
+ actor.setImage(profileImage);
+
+ extensions.put("screenName", user.getScreenName());
+
+ actor.setAdditionalProperty("extensions", extensions);
+ return actor;
+ }
+
+ /**
+ * Builds the actor for a delete event.
+ * @param delete the delete event
+ * @return a valid Actor
+ */
+ public static ActivityObject buildActor(Delete delete) {
+ ActivityObject actor = new ActivityObject();
+ actor.setId(formatId(delete.getDelete().getStatus().getUserIdStr()));
+ actor.setObjectType("page");
+ return actor;
+ }
+
+ /**
+ * Creates an {@link org.apache.streams.pojo.json.ActivityObject} for the tweet
+ * @param tweet the object to use as the source
+ * @return a valid ActivityObject
+ */
+ public static ActivityObject buildActivityObject(Tweet tweet) {
+ ActivityObject actObj = new ActivityObject();
+ String id = Optional.fromNullable(
+ tweet.getIdStr())
+ .or(Optional.of(tweet.getId().toString()))
+ .orNull();
+ if ( id != null ) {
+ actObj.setId(id);
}
-
- /**
- * Creates an {@link org.apache.streams.pojo.json.ActivityObject} for the tweet
- * @param tweet the object to use as the source
- * @return a valid ActivityObject
- */
- public static ActivityObject buildActivityObject(Tweet tweet) {
- ActivityObject actObj = new ActivityObject();
- String id = Optional.fromNullable(
- tweet.getIdStr())
- .or(Optional.of(tweet.getId().toString()))
- .orNull();
- if( id != null )
- actObj.setId(id);
- actObj.setObjectType("post");
- actObj.setContent(tweet.getText());
- return actObj;
+ actObj.setObjectType("post");
+ actObj.setContent(tweet.getText());
+ return actObj;
+ }
+
+ /**
+ * Builds the ActivityObject for the delete event.
+ * @param delete the delete event
+ * @return a valid Activity Object
+ */
+ public static ActivityObject buildActivityObject(Delete delete) {
+ ActivityObject actObj = new ActivityObject();
+ actObj.setId(formatId(delete.getDelete().getStatus().getIdStr()));
+ actObj.setObjectType("tweet");
+ return actObj;
+ }
+
+ /**
+ * Updates the content, and associated fields, with those from the given tweet
+ * @param activity the target of the updates. Will receive all values from the tweet.
+ * @param tweet the object to use as the source
+ * @param verb the verb for the given activity's type
+ */
+ public static void updateActivityContent(Activity activity, Tweet tweet, String verb) {
+ activity.setVerb(verb);
+ activity.setTitle("");
+ if ( tweet != null ) {
+ activity.setObject(buildActivityObject(tweet));
+ activity.setLinks(getLinks(tweet));
+ activity.setContent(tweet.getText());
+ addLocationExtension(activity, tweet);
+ addTwitterExtensions(activity, tweet);
}
+ }
- /**
- * Builds the activity {@link org.apache.streams.pojo.json.ActivityObject} actor from the tweet
- * @param tweet the object to use as the source
- * @return a valid Actor populated from the Tweet
- */
- public static ActivityObject buildActor(Tweet tweet) {
- ActivityObject actor = new ActivityObject();
- User user = tweet.getUser();
- return buildActor(user);
- }
- /**
- * Builds the activity {@link org.apache.streams.pojo.json.ActivityObject} actor from the User
- * @param user the object to use as the source
- * @return a valid Actor populated from the Tweet
- */
- public static ActivityObject buildActor(User user) {
- ActivityObject actor = new ActivityObject();
- actor.setId(formatId(
- Optional.fromNullable(
- user.getIdStr())
- .or(Optional.of(user.getId().toString()))
- .orNull()
- ));
- actor.setObjectType("page");
- actor.setDisplayName(user.getName());
- actor.setAdditionalProperty("handle", user.getScreenName());
- actor.setSummary(user.getDescription());
-
- if (user.getUrl()!=null){
- actor.setUrl(user.getUrl());
- }
-
- Map<String, Object> extensions = new HashMap<>();
- extensions.put("location", user.getLocation());
- extensions.put("posts", user.getStatusesCount());
- extensions.put("favorites", user.getFavouritesCount());
- extensions.put("followers", user.getFollowersCount());
-
- Image profileImage = new Image();
- profileImage.setUrl(user.getProfileImageUrlHttps());
- actor.setImage(profileImage);
-
- extensions.put("screenName", user.getScreenName());
-
- actor.setAdditionalProperty("extensions", extensions);
- return actor;
- }
- /**
- * Gets the links from the Twitter event
- * @param tweet the object to use as the source
- * @return a list of links corresponding to the expanded URL (no t.co)
- */
- public static List<String> getLinks(Tweet tweet) {
- List<String> links = new ArrayList<>();
- if( tweet.getEntities().getUrls() != null ) {
- for (Url url : tweet.getEntities().getUrls()) {
- links.add(url.getExpandedUrl());
- }
- }
- else
- LOGGER.debug(" 0 links");
- return links;
- }
- /**
- * Builds the {@link org.apache.streams.twitter.pojo.TargetObject} from the tweet
- * @param tweet the object to use as the source
- * @return currently returns null for all activities
- */
- public static ActivityObject buildTarget(Tweet tweet) {
- return null;
- }
- /**
- * Adds the location extension and populates with teh twitter data
- * @param activity the Activity object to update
- * @param tweet the object to use as the source
- */
- public static void addLocationExtension(Activity activity, Tweet tweet) {
- Map<String, Object> extensions = ExtensionUtil.getInstance().ensureExtensions(activity);
- Map<String, Object> location = new HashMap<>();
- location.put("id", formatId(
- Optional.fromNullable(
- tweet.getIdStr())
- .or(Optional.of(tweet.getId().toString()))
- .orNull()
- ));
- location.put("coordinates", boundingBoxCenter(tweet.getPlace()));
- extensions.put("location", location);
- }
- /**
- * Gets the common twitter {@link org.apache.streams.pojo.json.Provider} object
- * @return a provider object representing Twitter
- */
- public static Provider getProvider() {
- Provider provider = new Provider();
- provider.setId("id:providers:twitter");
- provider.setObjectType("application");
- provider.setDisplayName("Twitter");
- return provider;
+ /**
+ * Gets the links from the Twitter event
+ * @param tweet the object to use as the source
+ * @return a list of links corresponding to the expanded URL (no t.co)
+ */
+ public static List<String> getLinks(Tweet tweet) {
+ List<String> links = new ArrayList<>();
+ if ( tweet.getEntities().getUrls() != null ) {
+ for (Url url : tweet.getEntities().getUrls()) {
+ links.add(url.getExpandedUrl());
+ }
+ } else {
+ LOGGER.debug(" 0 links");
}
- /**
- * Adds the given Twitter event to the activity as an extension
- * @param activity the Activity object to update
- * @param event the Twitter event to add as the extension
- */
- public static void addTwitterExtension(Activity activity, ObjectNode event) {
- Map<String, Object> extensions = ExtensionUtil.getInstance().ensureExtensions(activity);
- extensions.put("twitter", event);
+ return links;
+ }
+
+ /**
+ * Builds the {@link org.apache.streams.twitter.pojo.TargetObject} from the tweet.
+ * @param tweet the object to use as the source
+ * @return currently returns null for all activities
+ */
+ public static ActivityObject buildTarget(Tweet tweet) {
+ return null;
+ }
+
+ /**
+ * Adds the location extension and populates with teh twitter data.
+ * @param activity the Activity object to update
+ * @param tweet the object to use as the source
+ */
+ public static void addLocationExtension(Activity activity, Tweet tweet) {
+ Map<String, Object> extensions = ExtensionUtil.getInstance().ensureExtensions(activity);
+ Map<String, Object> location = new HashMap<>();
+ location.put("id", formatId(
+ Optional.fromNullable(
+ tweet.getIdStr())
+ .or(Optional.of(tweet.getId().toString()))
+ .orNull()
+ ));
+ location.put("coordinates", boundingBoxCenter(tweet.getPlace()));
+ extensions.put("location", location);
+ }
+
+ /**
+ * Gets the common twitter {@link org.apache.streams.pojo.json.Provider} object
+ * @return a provider object representing Twitter
+ */
+ public static Provider getProvider() {
+ Provider provider = new Provider();
+ provider.setId("id:providers:twitter");
+ provider.setObjectType("application");
+ provider.setDisplayName("Twitter");
+ return provider;
+ }
+
+ /**
+ * Adds the given Twitter event to the activity as an extension.
+ * @param activity the Activity object to update
+ * @param event the Twitter event to add as the extension
+ */
+ public static void addTwitterExtension(Activity activity, ObjectNode event) {
+ Map<String, Object> extensions = ExtensionUtil.getInstance().ensureExtensions(activity);
+ extensions.put("twitter", event);
+ }
+
+ /**
+ * Formats the ID to conform with the Apache Streams activity ID convention.
+ * @param idparts the parts of the ID to join
+ * @return a valid Activity ID in format "id:twitter:part1:part2:...partN"
+ */
+ public static String formatId(String... idparts) {
+ return Joiner.on(":").join(Lists.asList("id:twitter", idparts));
+ }
+
+ /**
+ * Takes various parameters from the twitter object that are currently not part of the
+ * activity schema and stores them in a generic extensions attribute.
+ * @param activity Activity
+ * @param tweet Tweet
+ */
+ public static void addTwitterExtensions(Activity activity, Tweet tweet) {
+ Map<String, Object> extensions = ExtensionUtil.getInstance().ensureExtensions(activity);
+
+ List<String> hashtags = new ArrayList<>();
+ for (Hashtag hashtag : tweet.getEntities().getHashtags()) {
+ hashtags.add(hashtag.getText());
}
- /**
- * Formats the ID to conform with the Apache Streams activity ID convention
- * @param idparts the parts of the ID to join
- * @return a valid Activity ID in format "id:twitter:part1:part2:...partN"
- */
- public static String formatId(String... idparts) {
- return Joiner.on(":").join(Lists.asList("id:twitter", idparts));
- }
-
- /**
- * Takes various parameters from the twitter object that are currently not part of teh
- * activity schema and stores them in a generic extensions attribute
- * @param activity
- * @param tweet
- */
- public static void addTwitterExtensions(Activity activity, Tweet tweet) {
- Map<String, Object> extensions = ExtensionUtil.getInstance().ensureExtensions(activity);
+ extensions.put("hashtags", hashtags);
- List<String> hashtags = new ArrayList<>();
- for(Hashtag hashtag : tweet.getEntities().getHashtags()) {
- hashtags.add(hashtag.getText());
- }
- extensions.put("hashtags", hashtags);
+ Map<String, Object> likes = new HashMap<>();
+ likes.put("perspectival", tweet.getFavorited());
+ likes.put("count", tweet.getAdditionalProperties().get("favorite_count"));
- Map<String, Object> likes = new HashMap<>();
- likes.put("perspectival", tweet.getFavorited());
- likes.put("count", tweet.getAdditionalProperties().get("favorite_count"));
+ extensions.put("likes", likes);
- extensions.put("likes", likes);
+ Map<String, Object> rebroadcasts = new HashMap<>();
+ rebroadcasts.put("perspectival", tweet.getRetweeted());
+ rebroadcasts.put("count", tweet.getRetweetCount());
- Map<String, Object> rebroadcasts = new HashMap<>();
- rebroadcasts.put("perspectival", tweet.getRetweeted());
- rebroadcasts.put("count", tweet.getRetweetCount());
+ extensions.put("rebroadcasts", rebroadcasts);
- extensions.put("rebroadcasts", rebroadcasts);
+ List<Map<String, Object>> userMentions = new ArrayList<>();
+ Entities entities = tweet.getEntities();
- List<Map<String, Object>> userMentions = new ArrayList<>();
- Entities entities = tweet.getEntities();
+ for (UserMentions user : entities.getUserMentions()) {
+ //Map the twitter user object into an actor
+ Map<String, Object> actor = new HashMap<>();
+ actor.put("id", "id:twitter:" + user.getIdStr());
+ actor.put("displayName", user.getName());
+ actor.put("handle", user.getScreenName());
- for(UserMentions user : entities.getUserMentions()) {
- //Map the twitter user object into an actor
- Map<String, Object> actor = new HashMap<>();
- actor.put("id", "id:twitter:" + user.getIdStr());
- actor.put("displayName", user.getName());
- actor.put("handle", user.getScreenName());
+ userMentions.add(actor);
+ }
- userMentions.add(actor);
- }
+ extensions.put("user_mentions", userMentions);
- extensions.put("user_mentions", userMentions);
+ extensions.put("keywords", tweet.getText());
+ }
- extensions.put("keywords", tweet.getText());
+ /**
+ * Compute central coordinates from bounding box.
+ * @param place the bounding box to use as the source
+ */
+ public static List<Double> boundingBoxCenter(Place place) {
+ if ( place == null ) {
+ return new ArrayList<>();
}
-
- /**
- * Compute central coordinates from bounding box
- * @param place the bounding box to use as the source
- */
- public static List<Double> boundingBoxCenter(Place place) {
- if( place == null ) return new ArrayList<>();
- if( place.getBoundingBox() == null ) return new ArrayList<>();
- if( place.getBoundingBox().getCoordinates().size() != 1 ) return new ArrayList<>();
- if( place.getBoundingBox().getCoordinates().get(0).size() != 4 ) return new ArrayList<>();
- List<Double> lats = new ArrayList<>();
- List<Double> lons = new ArrayList<>();
- for( List<Double> point : place.getBoundingBox().getCoordinates().get(0)) {
- lats.add(point.get(0));
- lons.add(point.get(1));
- }
- List<Double> result = new ArrayList<>();
- result.add(mean(lats));
- result.add(mean(lons));
- return result;
+ if ( place.getBoundingBox() == null ) {
+ return new ArrayList<>();
+ }
+ if ( place.getBoundingBox().getCoordinates().size() != 1 ) {
+ return new ArrayList<>();
+ }
+ if ( place.getBoundingBox().getCoordinates().get(0).size() != 4 ) {
+ return new ArrayList<>();
+ }
+ List<Double> lats = new ArrayList<>();
+ List<Double> lons = new ArrayList<>();
+ for ( List<Double> point : place.getBoundingBox().getCoordinates().get(0)) {
+ lats.add(point.get(0));
+ lons.add(point.get(1));
}
+ List<Double> result = new ArrayList<>();
+ result.add(mean(lats));
+ result.add(mean(lons));
+ return result;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java
index 046cb76..c1c205b 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java
@@ -19,9 +19,6 @@
package org.apache.streams.twitter.processor;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Lists;
-import java.util.List;
import org.apache.streams.config.ComponentConfigurator;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.StreamsDatum;
@@ -31,11 +28,14 @@ import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.pojo.json.Activity;
import org.apache.streams.twitter.TwitterConfiguration;
import org.apache.streams.twitter.TwitterStreamConfiguration;
+import org.apache.streams.twitter.converter.TwitterDocumentClassifier;
import org.apache.streams.twitter.pojo.Delete;
import org.apache.streams.twitter.pojo.Retweet;
import org.apache.streams.twitter.pojo.Tweet;
-import org.apache.streams.twitter.provider.TwitterEventClassifier;
import org.apache.streams.twitter.provider.TwitterProviderUtil;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import twitter4j.Status;
@@ -45,6 +45,8 @@ import twitter4j.TwitterFactory;
import twitter4j.TwitterObjectFactory;
import twitter4j.conf.ConfigurationBuilder;
+import java.util.List;
+
import static org.apache.streams.twitter.converter.util.TwitterActivityUtil.getProvider;
import static org.apache.streams.twitter.converter.util.TwitterActivityUtil.updateActivity;
@@ -54,132 +56,132 @@ import static org.apache.streams.twitter.converter.util.TwitterActivityUtil.upda
*/
public class FetchAndReplaceTwitterProcessor implements StreamsProcessor {
- private static final String PROVIDER_ID = getProvider().getId();
- private static final Logger LOGGER = LoggerFactory.getLogger(FetchAndReplaceTwitterProcessor.class);
-
- //Default number of attempts before allowing the document through
- private static final int MAX_ATTEMPTS = 5;
- //Start the backoff at 4 minutes. This results in a wait period of 4, 8, 12, 16 & 20 min with an attempt of 5
- public static final int BACKOFF = 1000 * 60 * 4;
-
- private final TwitterConfiguration config;
- private Twitter client;
- private ObjectMapper mapper;
- private int retryCount;
-
- public FetchAndReplaceTwitterProcessor() {
- this(new ComponentConfigurator<>(TwitterStreamConfiguration.class).detectConfiguration(StreamsConfigurator.config, "twitter"));
- }
-
- public FetchAndReplaceTwitterProcessor(TwitterStreamConfiguration config) {
- this.config = config;
+ private static final String PROVIDER_ID = getProvider().getId();
+ private static final Logger LOGGER = LoggerFactory.getLogger(FetchAndReplaceTwitterProcessor.class);
+
+ //Default number of attempts before allowing the document through
+ private static final int MAX_ATTEMPTS = 5;
+ //Start the backoff at 4 minutes. This results in a wait period of 4, 8, 12, 16 & 20 min with an attempt of 5
+ public static final int BACKOFF = 1000 * 60 * 4;
+
+ private final TwitterConfiguration config;
+ private Twitter client;
+ private ObjectMapper mapper;
+ private int retryCount;
+
+ public FetchAndReplaceTwitterProcessor() {
+ this(new ComponentConfigurator<>(TwitterStreamConfiguration.class).detectConfiguration(StreamsConfigurator.config, "twitter"));
+ }
+
+ public FetchAndReplaceTwitterProcessor(TwitterStreamConfiguration config) {
+ this.config = config;
+ }
+
+ @Override
+ public String getId() {
+ return getProvider().getId();
+ }
+
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
+ if (entry.getDocument() instanceof Activity) {
+ Activity doc = (Activity)entry.getDocument();
+ String originalId = doc.getId();
+ if (PROVIDER_ID.equals(doc.getProvider().getId())) {
+ fetchAndReplace(doc, originalId);
+ }
+ } else {
+ throw new IllegalStateException("Requires an activity document");
}
-
- @Override
- public String getId() {
- return getProvider().getId();
+ return Lists.newArrayList(entry);
+ }
+
+
+ @Override
+ public void prepare(Object configurationObject) {
+ this.client = getTwitterClient();
+ this.mapper = StreamsJacksonMapper.getInstance();
+ }
+
+ @Override
+ public void cleanUp() {
+
+ }
+
+ protected void fetchAndReplace(Activity doc, String originalId) {
+ try {
+ String json = fetch(doc);
+ replace(doc, json);
+ doc.setId(originalId);
+ retryCount = 0;
+ } catch (TwitterException tw) {
+ if (tw.exceededRateLimitation()) {
+ sleepAndTryAgain(doc, originalId);
+ }
+ } catch (Exception ex) {
+ LOGGER.warn("Error fetching and replacing tweet for activity {}", doc.getId());
}
-
- @Override
- public List<StreamsDatum> process(StreamsDatum entry) {
- if(entry.getDocument() instanceof Activity) {
- Activity doc = (Activity)entry.getDocument();
- String originalId = doc.getId();
- if(PROVIDER_ID.equals(doc.getProvider().getId())) {
- fetchAndReplace(doc, originalId);
- }
- } else {
- throw new IllegalStateException("Requires an activity document");
- }
- return Lists.newArrayList(entry);
+ }
+
+ protected void replace(Activity doc, String json) throws java.io.IOException, ActivityConversionException {
+ Class documentSubType = new TwitterDocumentClassifier().detectClasses(json).get(0);
+ Object object = mapper.readValue(json, documentSubType);
+
+ if (documentSubType.equals(Retweet.class) || documentSubType.equals(Tweet.class)) {
+ updateActivity((Tweet)object, doc);
+ } else if (documentSubType.equals(Delete.class)) {
+ updateActivity((Delete)object, doc);
+ } else {
+ LOGGER.info("Could not determine the correct update method for {}", documentSubType);
}
+ }
+ protected String fetch(Activity doc) throws TwitterException {
+ String id = doc.getObject().getId();
+ LOGGER.debug("Fetching status from Twitter for {}", id);
+ Long tweetId = Long.valueOf(id.replace("id:twitter:tweets:", ""));
+ Status status = getTwitterClient().showStatus(tweetId);
+ return TwitterObjectFactory.getRawJSON(status);
+ }
- @Override
- public void prepare(Object configurationObject) {
- this.client = getTwitterClient();
- this.mapper = StreamsJacksonMapper.getInstance();
- }
- @Override
- public void cleanUp() {
+ protected Twitter getTwitterClient() {
- }
+ if (this.client == null) {
- protected void fetchAndReplace(Activity doc, String originalId) {
- try {
- String json = fetch(doc);
- replace(doc, json);
- doc.setId(originalId);
- retryCount = 0;
- } catch(TwitterException tw) {
- if(tw.exceededRateLimitation()) {
- sleepAndTryAgain(doc, originalId);
- }
- } catch (Exception e) {
- LOGGER.warn("Error fetching and replacing tweet for activity {}", doc.getId());
- }
- }
+ String baseUrl = TwitterProviderUtil.baseUrl(config);
- protected void replace(Activity doc, String json) throws java.io.IOException, ActivityConversionException {
- Class documentSubType = TwitterEventClassifier.detectClass(json);
- Object object = mapper.readValue(json, documentSubType);
-
- if(documentSubType.equals(Retweet.class) || documentSubType.equals(Tweet.class)) {
- updateActivity((Tweet)object, doc);
- } else if(documentSubType.equals(Delete.class)) {
- updateActivity((Delete)object, doc);
- } else {
- LOGGER.info("Could not determine the correct update method for {}", documentSubType);
- }
- }
+ ConfigurationBuilder builder = new ConfigurationBuilder()
+ .setOAuthConsumerKey(config.getOauth().getConsumerKey())
+ .setOAuthConsumerSecret(config.getOauth().getConsumerSecret())
+ .setOAuthAccessToken(config.getOauth().getAccessToken())
+ .setOAuthAccessTokenSecret(config.getOauth().getAccessTokenSecret())
+ .setIncludeEntitiesEnabled(true)
+ .setJSONStoreEnabled(true)
+ .setAsyncNumThreads(1)
+ .setRestBaseURL(baseUrl)
+ .setIncludeMyRetweetEnabled(Boolean.TRUE)
+ .setPrettyDebugEnabled(Boolean.TRUE);
- protected String fetch(Activity doc) throws TwitterException {
- String id = doc.getObject().getId();
- LOGGER.debug("Fetching status from Twitter for {}", id);
- Long tweetId = Long.valueOf(id.replace("id:twitter:tweets:", ""));
- Status status = getTwitterClient().showStatus(tweetId);
- return TwitterObjectFactory.getRawJSON(status);
+ this.client = new TwitterFactory(builder.build()).getInstance();
}
-
-
- protected Twitter getTwitterClient()
- {
- if(this.client == null) {
-
- String baseUrl = TwitterProviderUtil.baseUrl(config);
-
- ConfigurationBuilder builder = new ConfigurationBuilder()
- .setOAuthConsumerKey(config.getOauth().getConsumerKey())
- .setOAuthConsumerSecret(config.getOauth().getConsumerSecret())
- .setOAuthAccessToken(config.getOauth().getAccessToken())
- .setOAuthAccessTokenSecret(config.getOauth().getAccessTokenSecret())
- .setIncludeEntitiesEnabled(true)
- .setJSONStoreEnabled(true)
- .setAsyncNumThreads(1)
- .setRestBaseURL(baseUrl)
- .setIncludeMyRetweetEnabled(Boolean.TRUE)
- .setPrettyDebugEnabled(Boolean.TRUE);
-
- this.client = new TwitterFactory(builder.build()).getInstance();
- }
- return this.client;
- }
-
- //Hardcore sleep to allow for catch up
- protected void sleepAndTryAgain(Activity doc, String originalId) {
- try {
- //Attempt to fetchAndReplace with a backoff up to the limit then just reset the count and let the process continue
- if(retryCount < MAX_ATTEMPTS) {
- retryCount++;
- LOGGER.debug("Sleeping for {} min due to excessive calls to Twitter API", (retryCount * 4));
- Thread.sleep(BACKOFF * retryCount);
- fetchAndReplace(doc, originalId);
- } else {
- retryCount = 0;
- }
- } catch (InterruptedException e) {
- LOGGER.warn("Thread sleep interrupted while waiting for twitter backoff");
- }
+ return this.client;
+ }
+
+ //Hardcore sleep to allow for catch up
+ protected void sleepAndTryAgain(Activity doc, String originalId) {
+ try {
+ //Attempt to fetchAndReplace with a backoff up to the limit then just reset the count and let the process continue
+ if (retryCount < MAX_ATTEMPTS) {
+ retryCount++;
+ LOGGER.debug("Sleeping for {} min due to excessive calls to Twitter API", (retryCount * 4));
+ Thread.sleep(BACKOFF * retryCount);
+ fetchAndReplace(doc, originalId);
+ } else {
+ retryCount = 0;
+ }
+ } catch (InterruptedException ex) {
+ LOGGER.warn("Thread sleep interrupted while waiting for twitter backoff");
}
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
deleted file mode 100644
index ed6b90a..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.processor;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Lists;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.twitter.converter.StreamsTwitterMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-/**
- * This class performs conversion of a twitter event to a specified outClass
- *
- * Deprecated: use TypeConverterProcessor and ActivityConverterProcessor instead
- */
-@Deprecated
-public class TwitterEventProcessor implements StreamsProcessor {
-
- private final static String STREAMS_ID = "TwitterEventProcessor";
-
- private final static Logger LOGGER = LoggerFactory.getLogger(TwitterEventProcessor.class);
-
- private ObjectMapper mapper = new StreamsTwitterMapper();
-
- private Class inClass;
- private Class outClass;
-
- public TwitterEventProcessor(Class inClass, Class outClass) {
- this.inClass = inClass;
- this.outClass = outClass;
- }
-
- public TwitterEventProcessor( Class outClass) {
- this(null, outClass);
- }
-
- @Override
- public String getId() {
- return STREAMS_ID;
- }
-
- @Override
- public List<StreamsDatum> process(StreamsDatum entry) {
-
- LOGGER.error("You are calling a deprecated / defunct class. Modify your stream to use ActivityConverterProcessor.");
-
- LOGGER.debug("CONVERT FAILED");
-
- return Lists.newArrayList();
-
- }
-
- @Override
- public void prepare(Object configurationObject) {
- mapper = StreamsJacksonMapper.getInstance();
- }
-
- @Override
- public void cleanUp() {
-
- }
-};
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterProfileProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterProfileProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterProfileProcessor.java
deleted file mode 100644
index d49a54f..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterProfileProcessor.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.processor;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
-import org.apache.streams.twitter.pojo.Retweet;
-import org.apache.streams.twitter.pojo.Tweet;
-import org.apache.streams.twitter.pojo.User;
-import org.apache.streams.twitter.provider.TwitterEventClassifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Queue;
-import java.util.Random;
-
-public class TwitterProfileProcessor implements StreamsProcessor, Runnable {
-
- private final static String STREAMS_ID = "TwitterProfileProcessor";
-
- private final static Logger LOGGER = LoggerFactory.getLogger(TwitterProfileProcessor.class);
-
- private ObjectMapper mapper = StreamsJacksonMapper.getInstance(TwitterDateTimeFormat.TWITTER_FORMAT);
-
- private Queue<StreamsDatum> inQueue;
- private Queue<StreamsDatum> outQueue;
-
- private final static String TERMINATE = "TERMINATE";
-
- @Override
- public void run() {
-
- while(true) {
- StreamsDatum item;
- try {
- item = inQueue.poll();
- if(item.getDocument() instanceof String && item.equals(TERMINATE)) {
- LOGGER.info("Terminating!");
- break;
- }
-
- Thread.sleep(new Random().nextInt(100));
-
- for( StreamsDatum entry : process(item)) {
- outQueue.offer(entry);
- }
-
-
- } catch (Exception e) {
- e.printStackTrace();
-
- }
- }
- }
-
- public StreamsDatum createStreamsDatum(User user) {
- return new StreamsDatum(user, user.getIdStr());
- }
-
- @Override
- public String getId() {
- return STREAMS_ID;
- }
-
- @Override
- public List<StreamsDatum> process(StreamsDatum entry) {
-
- List<StreamsDatum> result = new ArrayList<>();
- String item;
- try {
- // first check for valid json
- // since data is coming from outside provider, we don't know what type the events are
- if( entry.getDocument() instanceof String) {
- item = (String) entry.getDocument();
- } else {
- item = mapper.writeValueAsString(entry.getDocument());
- }
-
- Class inClass = TwitterEventClassifier.detectClass(item);
-
- User user;
-
- if ( inClass.equals( Tweet.class )) {
- LOGGER.debug("TWEET");
- Tweet tweet = mapper.readValue(item, Tweet.class);
- user = tweet.getUser();
- result.add(createStreamsDatum(user));
- }
- else if ( inClass.equals( Retweet.class )) {
- LOGGER.debug("RETWEET");
- Retweet retweet = mapper.readValue(item, Retweet.class);
- user = retweet.getRetweetedStatus().getUser();
- result.add(createStreamsDatum(user));
- } else if ( inClass.equals( User.class )) {
- LOGGER.debug("USER");
- user = mapper.readValue(item, User.class);
- result.add(createStreamsDatum(user));
- } else {
- return new ArrayList<>();
- }
-
- return result;
- } catch (Exception e) {
- e.printStackTrace();
- LOGGER.warn("Error processing " + entry.toString());
- return new ArrayList<>();
- }
- }
-
- @Override
- public void prepare(Object o) {
-
- }
-
- @Override
- public void cleanUp() {
-
- }
-};
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
index cc1ecb1..d51e4e7 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
@@ -21,13 +21,14 @@ package org.apache.streams.twitter.processor;
import org.apache.streams.converter.ActivityConverterProcessor;
/**
- * This class performs conversion of a twitter event to a specified outClass
+ * This class performs conversion of a twitter event to a specified outClass.
*
+ * <p/>
* Deprecated: use TypeConverterProcessor and ActivityConverterProcessor instead
*/
@Deprecated
public class TwitterTypeConverter extends ActivityConverterProcessor {
- public final static String STREAMS_ID = "TwitterTypeConverter";
+ public static final String STREAMS_ID = "TwitterTypeConverter";
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java
index 30db471..0dd43bb 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java
@@ -18,63 +18,71 @@
package org.apache.streams.twitter.processor;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
import org.apache.streams.components.http.HttpProcessorConfiguration;
import org.apache.streams.components.http.processor.SimpleHTTPGetProcessor;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProcessor;
import org.apache.streams.pojo.json.Activity;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
- * Class gets a global share count from Twitter API for links on Activity datums
+ * Class gets a global share count from Twitter API for links on Activity datums.
*/
public class TwitterUrlApiProcessor extends SimpleHTTPGetProcessor implements StreamsProcessor {
- private final static String STREAMS_ID = "TwitterUrlApiProcessor";
+ private static final String STREAMS_ID = "TwitterUrlApiProcessor";
- public TwitterUrlApiProcessor() {
- super();
- this.configuration.setHostname("urls.api.twitter.com");
- this.configuration.setResourcePath("/1/urls/count.json");
- this.configuration.setEntity(HttpProcessorConfiguration.Entity.ACTIVITY);
- this.configuration.setExtension("twitter_url_count");
- }
+ /**
+ * TwitterUrlApiProcessor constructor.
+ */
+ public TwitterUrlApiProcessor() {
+ super();
+ this.configuration.setHostname("urls.api.twitter.com");
+ this.configuration.setResourcePath("/1/urls/count.json");
+ this.configuration.setEntity(HttpProcessorConfiguration.Entity.ACTIVITY);
+ this.configuration.setExtension("twitter_url_count");
+ }
- public TwitterUrlApiProcessor(HttpProcessorConfiguration processorConfiguration) {
- super(processorConfiguration);
- this.configuration.setHostname("urls.api.twitter.com");
- this.configuration.setResourcePath("/1/urls/count.json");
- this.configuration.setEntity(HttpProcessorConfiguration.Entity.ACTIVITY);
- this.configuration.setExtension("twitter_url_count");
- }
+ /**
+ * TwitterUrlApiProcessor constructor.
+ */
+ public TwitterUrlApiProcessor(HttpProcessorConfiguration processorConfiguration) {
+ super(processorConfiguration);
+ this.configuration.setHostname("urls.api.twitter.com");
+ this.configuration.setResourcePath("/1/urls/count.json");
+ this.configuration.setEntity(HttpProcessorConfiguration.Entity.ACTIVITY);
+ this.configuration.setExtension("twitter_url_count");
+ }
- @Override
- public String getId() {
- return STREAMS_ID;
- }
+ @Override
+ public String getId() {
+ return STREAMS_ID;
+ }
- @Override
- public List<StreamsDatum> process(StreamsDatum entry) {
- Preconditions.checkArgument(entry.getDocument() instanceof Activity);
- Activity activity = mapper.convertValue(entry.getDocument(), Activity.class);
- if( activity.getLinks() != null && activity.getLinks().size() > 0)
- return super.process(entry);
- else
- return Lists.newArrayList(entry);
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
+ Preconditions.checkArgument(entry.getDocument() instanceof Activity);
+ Activity activity = mapper.convertValue(entry.getDocument(), Activity.class);
+ if ( activity.getLinks() != null && activity.getLinks().size() > 0) {
+ return super.process(entry);
+ } else {
+ return Lists.newArrayList(entry);
}
+ }
- @Override
- protected Map<String, String> prepareParams(StreamsDatum entry) {
+ @Override
+ protected Map<String, String> prepareParams(StreamsDatum entry) {
- Map<String, String> params = new HashMap<>();
+ Map<String, String> params = new HashMap<>();
- params.put("url", mapper.convertValue(entry.getDocument(), Activity.class).getLinks().get(0));
+ params.put("url", mapper.convertValue(entry.getDocument(), Activity.class).getLinks().get(0));
- return params;
- }
+ return params;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java
index 90f6b62..ec43fba 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java
@@ -21,120 +21,113 @@ package org.apache.streams.twitter.provider;
import org.apache.streams.config.ComponentConfigurator;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.twitter.TwitterConfiguration;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import twitter4j.RateLimitStatus;
import twitter4j.Twitter;
import twitter4j.TwitterException;
-import twitter4j.RateLimitStatus;
/**
* Handle expected and unexpected exceptions.
*/
-public class TwitterErrorHandler
-{
- private final static Logger LOGGER = LoggerFactory.getLogger(TwitterErrorHandler.class);
-
- // selected because 3 * 5 + n >= 15 for positive n
- protected static long retry =
- new ComponentConfigurator<TwitterConfiguration>(TwitterConfiguration.class).detectConfiguration(
- StreamsConfigurator.getConfig().getConfig("twitter")
- ).getRetrySleepMs();
- protected static long retryMax =
- new ComponentConfigurator<TwitterConfiguration>(TwitterConfiguration.class).detectConfiguration(
- StreamsConfigurator.getConfig().getConfig("twitter")
- ).getRetryMax();
-
- @Deprecated
- public static int handleTwitterError(Twitter twitter, Exception exception) {
- return handleTwitterError( twitter, null, exception);
- }
+public class TwitterErrorHandler {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(TwitterErrorHandler.class);
+
+ // selected because 3 * 5 + n >= 15 for positive n
+ protected static long retry =
+ new ComponentConfigurator<TwitterConfiguration>(TwitterConfiguration.class).detectConfiguration(
+ StreamsConfigurator.getConfig().getConfig("twitter")
+ ).getRetrySleepMs();
+ protected static long retryMax =
+ new ComponentConfigurator<TwitterConfiguration>(TwitterConfiguration.class).detectConfiguration(
+ StreamsConfigurator.getConfig().getConfig("twitter")
+ ).getRetryMax();
+
+ @Deprecated
+ public static int handleTwitterError(Twitter twitter, Exception exception) {
+ return handleTwitterError( twitter, null, exception);
+ }
+
+ /**
+ * handleTwitterError.
+ * @param twitter Twitter
+ * @param id id
+ * @param exception exception
+ * @return
+ */
+ public static int handleTwitterError(Twitter twitter, Long id, Exception exception) {
+
+ if (exception instanceof TwitterException) {
+ TwitterException twitterException = (TwitterException)exception;
+
+ if (twitterException.exceededRateLimitation()) {
+
+ long millisUntilReset = retry;
+
+ final RateLimitStatus rateLimitStatus = twitterException.getRateLimitStatus();
+ if (rateLimitStatus != null) {
+ millisUntilReset = rateLimitStatus.getSecondsUntilReset() * 1000;
+ }
+
+ LOGGER.warn("Rate Limit Exceeded. Will retry in {} seconds...", millisUntilReset / 1000);
+
+ try {
+ Thread.sleep(millisUntilReset);
+ } catch (InterruptedException e1) {
+ Thread.currentThread().interrupt();
+ }
- public static int handleTwitterError(Twitter twitter, Long id, Exception exception)
- {
- if(exception instanceof TwitterException)
- {
- TwitterException e = (TwitterException)exception;
- if(e.exceededRateLimitation())
- {
- long millisUntilReset = retry;
-
- final RateLimitStatus rateLimitStatus = e.getRateLimitStatus();
- if (rateLimitStatus != null) {
- millisUntilReset = rateLimitStatus.getSecondsUntilReset() * 1000;
- }
-
- LOGGER.warn("Rate Limit Exceeded. Will retry in {} seconds...", millisUntilReset / 1000);
-
- try {
- Thread.sleep(millisUntilReset);
- } catch (InterruptedException e1) {
- Thread.currentThread().interrupt();
- }
-
- return 1;
- }
- else if(e.isCausedByNetworkIssue())
- {
- LOGGER.info("Twitter Network Issues Detected. Backing off...");
- LOGGER.info("{} - {}", e.getExceptionCode(), e.getLocalizedMessage());
- try {
- Thread.sleep(retry);
- } catch (InterruptedException e1) {
- Thread.currentThread().interrupt();
- }
- return 1;
- }
- else if(e.isErrorMessageAvailable())
- {
- if(e.getMessage().toLowerCase().contains("does not exist"))
- {
- if( id != null )
- LOGGER.warn("User does not exist: {}", id);
- else
- LOGGER.warn("User does not exist");
- return (int)retryMax;
- }
- else
- {
- return (int)retryMax/3;
- }
- }
- else
- {
- if(e.getExceptionCode().equals("ced778ef-0c669ac0"))
- {
- // This is a known weird issue, not exactly sure the cause, but you'll never be able to get the data.
- return (int)retryMax/3;
- }
- else if(e.getExceptionCode().equals("4be80492-0a7bf7c7")) {
- // This is a 401 reflecting credentials don't have access to the requested resource.
- if( id != null )
- LOGGER.warn("Authentication Exception accessing id: {}", id);
- else
- LOGGER.warn("Authentication Exception");
- return (int)retryMax;
- }
- else
- {
- LOGGER.warn("Unknown Twitter Exception...");
- LOGGER.warn(" Account: {}", twitter);
- LOGGER.warn(" Access: {}", e.getAccessLevel());
- LOGGER.warn(" Code: {}", e.getExceptionCode());
- LOGGER.warn(" Message: {}", e.getLocalizedMessage());
- return (int)retryMax/10;
- }
- }
+ return 1;
+ } else if (twitterException.isCausedByNetworkIssue()) {
+ LOGGER.info("Twitter Network Issues Detected. Backing off...");
+ LOGGER.info("{} - {}", twitterException.getExceptionCode(), twitterException.getLocalizedMessage());
+ try {
+ Thread.sleep(retry);
+ } catch (InterruptedException e1) {
+ Thread.currentThread().interrupt();
}
- else if(exception instanceof RuntimeException)
- {
- LOGGER.warn("TwitterGrabber: Unknown Runtime Error", exception.getMessage());
- return (int)retryMax/3;
+ return 1;
+ } else if (twitterException.isErrorMessageAvailable()) {
+ if (twitterException.getMessage().toLowerCase().contains("does not exist")) {
+ if ( id != null ) {
+ LOGGER.warn("User does not exist: {}", id);
+ } else {
+ LOGGER.warn("User does not exist");
+ }
+ return (int)retryMax;
+ } else {
+ return (int)retryMax / 3;
}
- else
- {
- LOGGER.info("Completely Unknown Exception: {}", exception);
- return (int)retryMax/3;
+ } else {
+ if (twitterException.getExceptionCode().equals("ced778ef-0c669ac0")) {
+ // This is a known weird issue, not exactly sure the cause, but you'll never be able to get the data.
+ return (int)retryMax / 3;
+ } else if (twitterException.getExceptionCode().equals("4be80492-0a7bf7c7")) {
+ // This is a 401 reflecting credentials don't have access to the requested resource.
+ if ( id != null ) {
+ LOGGER.warn("Authentication Exception accessing id: {}", id);
+ } else {
+ LOGGER.warn("Authentication Exception");
+ }
+ return (int)retryMax;
+ } else {
+ LOGGER.warn("Unknown Twitter Exception...");
+ LOGGER.warn(" Account: {}", twitter);
+ LOGGER.warn(" Access: {}", twitterException.getAccessLevel());
+ LOGGER.warn(" Code: {}", twitterException.getExceptionCode());
+ LOGGER.warn(" Message: {}", twitterException.getLocalizedMessage());
+ return (int)retryMax / 10;
}
+ }
+ } else if (exception instanceof RuntimeException) {
+ LOGGER.warn("TwitterGrabber: Unknown Runtime Error", exception.getMessage());
+ return (int)retryMax / 3;
+ } else {
+ LOGGER.info("Completely Unknown Exception: {}", exception);
+ return (int)retryMax / 3;
}
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java
deleted file mode 100644
index 9466c2e..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.provider;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.StringUtils;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
-import org.apache.streams.twitter.pojo.Delete;
-import org.apache.streams.twitter.pojo.FriendList;
-import org.apache.streams.twitter.pojo.Retweet;
-import org.apache.streams.twitter.pojo.Tweet;
-import org.apache.streams.twitter.pojo.User;
-import org.apache.streams.twitter.pojo.UserstreamEvent;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-/**
- * TwitterEventClassifier classifies twitter events
- *
- * @Deprecated - replaced by TwitterDocumentClassifier - use ActivityConverterProcessor
- */
-public class TwitterEventClassifier implements Serializable {
-
- private static ObjectMapper mapper = new StreamsJacksonMapper(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT));
-
- public static Class detectClass( String json ) {
- Preconditions.checkNotNull(json);
- Preconditions.checkArgument(StringUtils.isNotEmpty(json));
-
- ObjectNode objectNode;
- try {
- objectNode = (ObjectNode) mapper.readTree(json);
- } catch (IOException e) {
- e.printStackTrace();
- return null;
- }
-
- if( objectNode.findValue("retweeted_status") != null && objectNode.get("retweeted_status") != null)
- return Retweet.class;
- else if( objectNode.findValue("delete") != null )
- return Delete.class;
- else if( objectNode.findValue("friends") != null ||
- objectNode.findValue("friends_str") != null )
- return FriendList.class;
- else if( objectNode.findValue("target_object") != null )
- return UserstreamEvent.class;
- else if ( objectNode.findValue("location") != null && objectNode.findValue("user") == null)
- return User.class;
- else
- return Tweet.class;
- }
-
-}