You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/11/25 20:25:02 UTC

[21/42] incubator-streams git commit: STREAMS-440: custom checkstyle.xml, address compliance

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserActivityConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserActivityConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserActivityConverter.java
index ef74371..b8ce79b 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserActivityConverter.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserActivityConverter.java
@@ -18,65 +18,69 @@
 
 package org.apache.streams.twitter.converter;
 
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.NotImplementedException;
 import org.apache.streams.data.ActivityConverter;
 import org.apache.streams.exceptions.ActivityConversionException;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.twitter.pojo.User;
 
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.NotImplementedException;
+
 import java.util.List;
 
 import static org.apache.streams.twitter.converter.util.TwitterActivityUtil.updateActivity;
 
 public class TwitterJsonUserActivityConverter implements ActivityConverter<User> {
 
-    public static Class requiredClass = User.class;
+  public static Class requiredClass = User.class;
 
-    @Override
-    public Class requiredClass() {
-        return requiredClass;
-    }
+  @Override
+  public Class requiredClass() {
+    return requiredClass;
+  }
 
-    private static TwitterJsonUserActivityConverter instance = new TwitterJsonUserActivityConverter();
+  private static TwitterJsonUserActivityConverter instance = new TwitterJsonUserActivityConverter();
 
-    public static TwitterJsonUserActivityConverter getInstance() {
-        return instance;
-    }
+  public static TwitterJsonUserActivityConverter getInstance() {
+    return instance;
+  }
 
-    @Override
-    public String serializationFormat() {
-        return null;
-    }
+  @Override
+  public String serializationFormat() {
+    return null;
+  }
 
-    @Override
-    public User fromActivity(Activity deserialized) throws ActivityConversionException {
-        throw new NotImplementedException();
-    }
+  @Override
+  public User fromActivity(Activity deserialized) throws ActivityConversionException {
+    throw new NotImplementedException();
+  }
 
-    @Override
-    public List<Activity> toActivityList(User user) throws ActivityConversionException {
+  @Override
+  public List<User> fromActivityList(List<Activity> list) {
+    throw new NotImplementedException();
+  }
 
-        Activity activity = new Activity();
-        updateActivity(user, activity);
 
-        return Lists.newArrayList(activity);
-    }
+  @Override
+  public List<Activity> toActivityList(User user) throws ActivityConversionException {
 
-    @Override
-    public List<User> fromActivityList(List<Activity> list) {
-        throw new NotImplementedException();
-    }
+    Activity activity = new Activity();
+    updateActivity(user, activity);
+
+    return Lists.newArrayList(activity);
+  }
 
-    @Override
-    public List<Activity> toActivityList(List<User> serializedList) {
-        List<Activity> result = Lists.newArrayList();
-        for( User item : serializedList ) {
-            try {
-                List<Activity> activities = toActivityList(item);
-                result.addAll(activities);
-            } catch (ActivityConversionException e) {}
-        }
-        return result;
+  @Override
+  public List<Activity> toActivityList(List<User> serializedList) {
+    List<Activity> result = Lists.newArrayList();
+    for ( User item : serializedList ) {
+      try {
+        List<Activity> activities = toActivityList(item);
+        result.addAll(activities);
+      } catch (ActivityConversionException ex) {
+        //
+      }
     }
+    return result;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserActivityObjectConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserActivityObjectConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserActivityObjectConverter.java
index d62b1e8..7cb4158 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserActivityObjectConverter.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserActivityObjectConverter.java
@@ -18,47 +18,42 @@
 
 package org.apache.streams.twitter.converter;
 
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.streams.data.ActivityConverter;
 import org.apache.streams.data.ActivityObjectConverter;
 import org.apache.streams.exceptions.ActivityConversionException;
-import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.pojo.json.ActivityObject;
 import org.apache.streams.twitter.pojo.User;
 
-import java.util.List;
+import org.apache.commons.lang.NotImplementedException;
 
 import static org.apache.streams.twitter.converter.util.TwitterActivityUtil.buildActor;
-import static org.apache.streams.twitter.converter.util.TwitterActivityUtil.updateActivity;
 
 public class TwitterJsonUserActivityObjectConverter implements ActivityObjectConverter<User> {
 
-    public static Class requiredClass = User.class;
+  public static Class requiredClass = User.class;
 
-    @Override
-    public Class requiredClass() {
-        return requiredClass;
-    }
+  @Override
+  public Class requiredClass() {
+    return requiredClass;
+  }
 
-    private static TwitterJsonUserActivityObjectConverter instance = new TwitterJsonUserActivityObjectConverter();
+  private static TwitterJsonUserActivityObjectConverter instance = new TwitterJsonUserActivityObjectConverter();
 
-    public static TwitterJsonUserActivityObjectConverter getInstance() {
-        return instance;
-    }
+  public static TwitterJsonUserActivityObjectConverter getInstance() {
+    return instance;
+  }
 
-    @Override
-    public String serializationFormat() {
-        return null;
-    }
+  @Override
+  public String serializationFormat() {
+    return null;
+  }
 
-    @Override
-    public User fromActivityObject(ActivityObject deserialized) throws ActivityConversionException {
-        throw new NotImplementedException();
-    }
+  @Override
+  public User fromActivityObject(ActivityObject deserialized) throws ActivityConversionException {
+    throw new NotImplementedException();
+  }
 
-    @Override
-    public ActivityObject toActivityObject(User serialized) throws ActivityConversionException {
-        return buildActor(serialized);
-    }
+  @Override
+  public ActivityObject toActivityObject(User serialized) throws ActivityConversionException {
+    return buildActor(serialized);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserstreameventActivityConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserstreameventActivityConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserstreameventActivityConverter.java
index bb31fd6..6685a96 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserstreameventActivityConverter.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserstreameventActivityConverter.java
@@ -18,15 +18,16 @@
 
 package org.apache.streams.twitter.converter;
 
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.NotImplementedException;
 import org.apache.streams.data.ActivityConverter;
 import org.apache.streams.exceptions.ActivityConversionException;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.pojo.json.ActivityObject;
 import org.apache.streams.twitter.pojo.UserstreamEvent;
 
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.NotImplementedException;
+
 import java.util.List;
 
 import static org.apache.streams.twitter.converter.util.TwitterActivityUtil.formatId;
@@ -34,87 +35,101 @@ import static org.apache.streams.twitter.converter.util.TwitterActivityUtil.getP
 
 
 /**
-* Created with IntelliJ IDEA.
-* User: mdelaet
-* Date: 9/30/13
-* Time: 9:24 AM
-* To change this template use File | Settings | File Templates.
-*/
+ * TwitterJsonUserstreameventActivityConverter.
+ */
+// TODO: Use this class explicitely somewhere
 public class TwitterJsonUserstreameventActivityConverter implements ActivityConverter<UserstreamEvent> {
 
-    public static Class requiredClass = UserstreamEvent.class;
-
-    @Override
-    public Class requiredClass() {
-        return requiredClass;
-    }
-
-    private static TwitterJsonUserstreameventActivityConverter instance = new TwitterJsonUserstreameventActivityConverter();
-
-    public static TwitterJsonUserstreameventActivityConverter getInstance() {
-        return instance;
-    }
-
-    @Override
-    public String serializationFormat() {
-        return null;
-    }
-
-    @Override
-    public UserstreamEvent fromActivity(Activity deserialized) throws ActivityConversionException {
-        throw new NotImplementedException();
-    }
-
-    @Override
-    public List<Activity> toActivityList(UserstreamEvent userstreamEvent) throws ActivityConversionException {
-
-        Activity activity = convert(userstreamEvent);
-        return Lists.newArrayList(activity);
-
-    }
-
-    @Override
-    public List<UserstreamEvent> fromActivityList(List<Activity> list) {
-        throw new NotImplementedException();
-    }
-
-    @Override
-    public List<Activity> toActivityList(List<UserstreamEvent> serializedList) {
-        return null;
-    }
-
-    public Activity convert(UserstreamEvent event) throws ActivityConversionException {
-
-        Activity activity = new Activity();
-        activity.setActor(buildActor(event));
-        activity.setVerb(detectVerb(event));
-        activity.setObject(buildActivityObject(event));
-        activity.setId(formatId(activity.getVerb()));
-        if(Strings.isNullOrEmpty(activity.getId()))
-            throw new ActivityConversionException("Unable to determine activity id");
-        activity.setProvider(getProvider());
-        return activity;
-    }
-
-    public ActivityObject buildActor(UserstreamEvent event) {
-        ActivityObject actor = new ActivityObject();
-        //actor.setId(formatId(delete.getDelete().getStatus().getUserIdStr()));
-        return actor;
-    }
-
-    public ActivityObject buildActivityObject(UserstreamEvent event) {
-        ActivityObject actObj = new ActivityObject();
-        //actObj.setId(formatId(delete.getDelete().getStatus().getIdStr()));
-        //actObj.setObjectType("tweet");
-        return actObj;
-    }
-
-    public String detectVerb(UserstreamEvent event) {
-        return null;
-    }
-
-    public ActivityObject buildTarget(UserstreamEvent event) {
-        return null;
+  public static Class requiredClass = UserstreamEvent.class;
+
+  @Override
+  public Class requiredClass() {
+    return requiredClass;
+  }
+
+  private static TwitterJsonUserstreameventActivityConverter instance = new TwitterJsonUserstreameventActivityConverter();
+
+  public static TwitterJsonUserstreameventActivityConverter getInstance() {
+    return instance;
+  }
+
+  @Override
+  public String serializationFormat() {
+    return null;
+  }
+
+  @Override
+  public UserstreamEvent fromActivity(Activity deserialized) throws ActivityConversionException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public List<UserstreamEvent> fromActivityList(List<Activity> list) {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public List<Activity> toActivityList(UserstreamEvent userstreamEvent) throws ActivityConversionException {
+
+    Activity activity = convert(userstreamEvent);
+    return Lists.newArrayList(activity);
+
+  }
+
+  @Override
+  public List<Activity> toActivityList(List<UserstreamEvent> serializedList) {
+    return null;
+  }
+
+  /**
+   * convert UserstreamEvent to Activity.
+   * @param event UserstreamEvent
+   * @return Activity
+   * @throws ActivityConversionException ActivityConversionException
+   */
+  public Activity convert(UserstreamEvent event) throws ActivityConversionException {
+
+    Activity activity = new Activity();
+    activity.setActor(buildActor(event));
+    activity.setVerb(detectVerb(event));
+    activity.setObject(buildActivityObject(event));
+    activity.setId(formatId(activity.getVerb()));
+    if (Strings.isNullOrEmpty(activity.getId())) {
+      throw new ActivityConversionException("Unable to determine activity id");
     }
+    activity.setProvider(getProvider());
+    return activity;
+  }
+
+  /**
+   * build ActivityObject from UserstreamEvent
+   * @param event UserstreamEvent
+   * @return $.actor
+   */
+  public ActivityObject buildActor(UserstreamEvent event) {
+    ActivityObject actor = new ActivityObject();
+    //actor.setId(formatId(delete.getDelete().getStatus().getUserIdStr()));
+    return actor;
+  }
+
+  /**
+   * build ActivityObject from UserstreamEvent
+   * @param event UserstreamEvent
+   * @return $.object
+   */
+  public ActivityObject buildActivityObject(UserstreamEvent event) {
+    ActivityObject actObj = new ActivityObject();
+    //actObj.setId(formatId(delete.getDelete().getStatus().getIdStr()));
+    //actObj.setObjectType("tweet");
+    return actObj;
+  }
+
+  public String detectVerb(UserstreamEvent event) {
+    return null;
+  }
+
+  public ActivityObject buildTarget(UserstreamEvent event) {
+    return null;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/util/TwitterActivityUtil.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/util/TwitterActivityUtil.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/util/TwitterActivityUtil.java
index 4015514..e0e2e80 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/util/TwitterActivityUtil.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/util/TwitterActivityUtil.java
@@ -19,12 +19,6 @@
 
 package org.apache.streams.twitter.converter.util;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
 import org.apache.streams.exceptions.ActivityConversionException;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.extensions.ExtensionUtil;
@@ -41,6 +35,14 @@ import org.apache.streams.twitter.pojo.Retweet;
 import org.apache.streams.twitter.pojo.Tweet;
 import org.apache.streams.twitter.pojo.User;
 import org.apache.streams.twitter.pojo.UserMentions;
+import org.apache.streams.twitter.provider.TwitterErrorHandler;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,323 +54,342 @@ import java.util.Map;
 import static com.google.common.math.DoubleMath.mean;
 
 /**
- * Provides utilities for working with Activity objects within the context of Twitter
+ * Provides utilities for working with Activity objects within the context of Twitter.
  */
 public class TwitterActivityUtil {
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(TwitterActivityUtil.class);
-
-    /**
-     * Updates the given Activity object with the values from the Tweet
-     * @param tweet the object to use as the source
-     * @param activity the target of the updates.  Will receive all values from the tweet.
-     * @throws org.apache.streams.exceptions.ActivityConversionException
-     */
-    public static void updateActivity(Tweet tweet, Activity activity) throws ActivityConversionException {
-        ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-        activity.setActor(buildActor(tweet));
-        activity.setId(formatId(activity.getVerb(),
-                Optional.fromNullable(
-                        tweet.getIdStr())
-                        .or(Optional.of(tweet.getId().toString()))
-                        .orNull()));
-
-        if(tweet instanceof Retweet) {
-            updateActivityContent(activity,  ((Retweet) tweet).getRetweetedStatus(), "share");
-        } else {
-            updateActivityContent(activity, tweet, "post");
-        }
-
-        if(Strings.isNullOrEmpty(activity.getId()))
-            throw new ActivityConversionException("Unable to determine activity id");
-        try {
-            activity.setPublished(tweet.getCreatedAt());
-        } catch( Exception e ) {
-            throw new ActivityConversionException("Unable to determine publishedDate", e);
-        }
-        activity.setTarget(buildTarget(tweet));
-        activity.setProvider(getProvider());
-        activity.setUrl(String.format("http://twitter.com/%s/%s/%s", tweet.getUser().getScreenName(),"/status/",tweet.getIdStr()));
-
-        addTwitterExtension(activity, mapper.convertValue(tweet, ObjectNode.class));
+  private static final Logger LOGGER = LoggerFactory.getLogger(TwitterActivityUtil.class);
+
+  static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+  /**
+   * Updates the given Activity object with the values from the Tweet.
+   * @param tweet the object to use as the source
+   * @param activity the target of the updates.  Will receive all values from the tweet.
+   * @throws ActivityConversionException ActivityConversionException
+   */
+  public static void updateActivity(Tweet tweet, Activity activity) throws ActivityConversionException {
+    activity.setActor(buildActor(tweet));
+    activity.setId(formatId(activity.getVerb(),
+        Optional.fromNullable(
+            tweet.getIdStr())
+            .or(Optional.of(tweet.getId().toString()))
+            .orNull()));
+
+    if (tweet instanceof Retweet) {
+      updateActivityContent(activity,  ((Retweet) tweet).getRetweetedStatus(), "share");
+    } else {
+      updateActivityContent(activity, tweet, "post");
     }
 
-    /**
-     * Updates the given Activity object with the values from the User
-     * @param user the object to use as the source
-     * @param activity the target of the updates.  Will receive all values from the tweet.
-     */
-    public static void updateActivity(User user, Activity activity) {
-        activity.setActor(buildActor(user));
-        activity.setId(null);
-        activity.setVerb(null);
+    if (Strings.isNullOrEmpty(activity.getId())) {
+      throw new ActivityConversionException("Unable to determine activity id");
     }
-
-    /**
-     * Updates the activity for a delete event
-     * @param delete the delete event
-     * @param activity the Activity object to update
-     * @throws org.apache.streams.exceptions.ActivityConversionException
-     */
-    public static void updateActivity(Delete delete, Activity activity) throws ActivityConversionException {
-        activity.setActor(buildActor(delete));
-        activity.setVerb("delete");
-        activity.setObject(buildActivityObject(delete));
-        activity.setId(formatId(activity.getVerb(), delete.getDelete().getStatus().getIdStr()));
-        if(Strings.isNullOrEmpty(activity.getId()))
-            throw new ActivityConversionException("Unable to determine activity id");
-        activity.setProvider(getProvider());
-        addTwitterExtension(activity, StreamsJacksonMapper.getInstance().convertValue(delete, ObjectNode.class));
+    try {
+      activity.setPublished(tweet.getCreatedAt());
+    } catch ( Exception ex ) {
+      throw new ActivityConversionException("Unable to determine publishedDate", ex);
     }
-
-    /**
-     * Builds the actor for a delete event
-     * @param delete the delete event
-     * @return a valid Actor
-     */
-    public static ActivityObject buildActor(Delete delete) {
-        ActivityObject actor = new ActivityObject();
-        actor.setId(formatId(delete.getDelete().getStatus().getUserIdStr()));
-        actor.setObjectType("page");
-        return actor;
+    activity.setTarget(buildTarget(tweet));
+    activity.setProvider(getProvider());
+    activity.setUrl(String.format("http://twitter.com/%s/%s/%s", tweet.getUser().getScreenName(),"/status/",tweet.getIdStr()));
+
+    addTwitterExtension(activity, mapper.convertValue(tweet, ObjectNode.class));
+  }
+
+  /**
+   * Updates the given Activity object with the values from the User
+   * @param user the object to use as the source
+   * @param activity the target of the updates.  Will receive all values from the tweet.
+   */
+  public static void updateActivity(User user, Activity activity) {
+    activity.setActor(buildActor(user));
+    activity.setId(null);
+    activity.setVerb(null);
+  }
+
+  /**
+   * Updates the activity for a delete event.
+   * @param delete the delete event
+   * @param activity the Activity object to update
+   * @throws ActivityConversionException ActivityConversionException
+   */
+  public static void updateActivity(Delete delete, Activity activity) throws ActivityConversionException {
+    activity.setActor(buildActor(delete));
+    activity.setVerb("delete");
+    activity.setObject(buildActivityObject(delete));
+    activity.setId(formatId(activity.getVerb(), delete.getDelete().getStatus().getIdStr()));
+    if (Strings.isNullOrEmpty(activity.getId())) {
+      throw new ActivityConversionException("Unable to determine activity id");
     }
-
-    /**
-     * Builds the ActivityObject for the delete event
-     * @param delete the delete event
-     * @return a valid Activity Object
-     */
-    public static ActivityObject buildActivityObject(Delete delete) {
-        ActivityObject actObj = new ActivityObject();
-        actObj.setId(formatId(delete.getDelete().getStatus().getIdStr()));
-        actObj.setObjectType("tweet");
-        return actObj;
+    activity.setProvider(getProvider());
+    addTwitterExtension(activity, StreamsJacksonMapper.getInstance().convertValue(delete, ObjectNode.class));
+  }
+
+  /**
+   * Builds the activity {@link org.apache.streams.pojo.json.ActivityObject} actor from the tweet
+   * @param tweet the object to use as the source
+   * @return a valid Actor populated from the Tweet
+   */
+  public static ActivityObject buildActor(Tweet tweet) {
+    ActivityObject actor = new ActivityObject();
+    User user = tweet.getUser();
+
+    return buildActor(user);
+  }
+
+  /**
+   * Builds the activity {@link org.apache.streams.pojo.json.ActivityObject} actor from the User
+   * @param user the object to use as the source
+   * @return a valid Actor populated from the Tweet
+   */
+  public static ActivityObject buildActor(User user) {
+    ActivityObject actor = new ActivityObject();
+    actor.setId(formatId(
+        Optional.fromNullable(
+            user.getIdStr())
+            .or(Optional.of(user.getId().toString()))
+            .orNull()
+    ));
+    actor.setObjectType("page");
+    actor.setDisplayName(user.getName());
+    actor.setAdditionalProperty("handle", user.getScreenName());
+    actor.setSummary(user.getDescription());
+
+    if (user.getUrl() != null) {
+      actor.setUrl(user.getUrl());
     }
 
-
-    /**
-     * Updates the content, and associated fields, with those from the given tweet
-     * @param activity the target of the updates.  Will receive all values from the tweet.
-     * @param tweet the object to use as the source
-     * @param verb the verb for the given activity's type
-     */
-    public static void updateActivityContent(Activity activity, Tweet tweet, String verb) {
-        activity.setVerb(verb);
-        activity.setTitle("");
-        if( tweet != null ) {
-            activity.setObject(buildActivityObject(tweet));
-            activity.setLinks(getLinks(tweet));
-            activity.setContent(tweet.getText());
-            addLocationExtension(activity, tweet);
-            addTwitterExtensions(activity, tweet);
-        }
+    Map<String, Object> extensions = new HashMap<>();
+    extensions.put("location", user.getLocation());
+    extensions.put("posts", user.getStatusesCount());
+    extensions.put("favorites", user.getFavouritesCount());
+    extensions.put("followers", user.getFollowersCount());
+
+    Image profileImage = new Image();
+    profileImage.setUrl(user.getProfileImageUrlHttps());
+    actor.setImage(profileImage);
+
+    extensions.put("screenName", user.getScreenName());
+
+    actor.setAdditionalProperty("extensions", extensions);
+    return actor;
+  }
+
+  /**
+   * Builds the actor for a delete event.
+   * @param delete the delete event
+   * @return a valid Actor
+   */
+  public static ActivityObject buildActor(Delete delete) {
+    ActivityObject actor = new ActivityObject();
+    actor.setId(formatId(delete.getDelete().getStatus().getUserIdStr()));
+    actor.setObjectType("page");
+    return actor;
+  }
+
+  /**
+   * Creates an {@link org.apache.streams.pojo.json.ActivityObject} for the tweet
+   * @param tweet the object to use as the source
+   * @return a valid ActivityObject
+   */
+  public static ActivityObject buildActivityObject(Tweet tweet) {
+    ActivityObject actObj = new ActivityObject();
+    String id =  Optional.fromNullable(
+        tweet.getIdStr())
+        .or(Optional.of(tweet.getId().toString()))
+        .orNull();
+    if ( id != null ) {
+      actObj.setId(id);
     }
-
-    /**
-     * Creates an {@link org.apache.streams.pojo.json.ActivityObject} for the tweet
-     * @param tweet the object to use as the source
-     * @return a valid ActivityObject
-     */
-    public static ActivityObject buildActivityObject(Tweet tweet) {
-        ActivityObject actObj = new ActivityObject();
-        String id =  Optional.fromNullable(
-                tweet.getIdStr())
-                .or(Optional.of(tweet.getId().toString()))
-                .orNull();
-        if( id != null )
-            actObj.setId(id);
-        actObj.setObjectType("post");
-        actObj.setContent(tweet.getText());
-        return actObj;
+    actObj.setObjectType("post");
+    actObj.setContent(tweet.getText());
+    return actObj;
+  }
+
+  /**
+   * Builds the ActivityObject for the delete event.
+   * @param delete the delete event
+   * @return a valid Activity Object
+   */
+  public static ActivityObject buildActivityObject(Delete delete) {
+    ActivityObject actObj = new ActivityObject();
+    actObj.setId(formatId(delete.getDelete().getStatus().getIdStr()));
+    actObj.setObjectType("tweet");
+    return actObj;
+  }
+
+  /**
+   * Updates the content, and associated fields, with those from the given tweet
+   * @param activity the target of the updates.  Will receive all values from the tweet.
+   * @param tweet the object to use as the source
+   * @param verb the verb for the given activity's type
+   */
+  public static void updateActivityContent(Activity activity, Tweet tweet, String verb) {
+    activity.setVerb(verb);
+    activity.setTitle("");
+    if ( tweet != null ) {
+      activity.setObject(buildActivityObject(tweet));
+      activity.setLinks(getLinks(tweet));
+      activity.setContent(tweet.getText());
+      addLocationExtension(activity, tweet);
+      addTwitterExtensions(activity, tweet);
     }
+  }
 
-    /**
-     * Builds the activity {@link org.apache.streams.pojo.json.ActivityObject} actor from the tweet
-     * @param tweet the object to use as the source
-     * @return a valid Actor populated from the Tweet
-     */
-    public static ActivityObject buildActor(Tweet tweet) {
-        ActivityObject actor = new ActivityObject();
-        User user = tweet.getUser();
 
-        return buildActor(user);
-    }
 
-    /**
-     * Builds the activity {@link org.apache.streams.pojo.json.ActivityObject} actor from the User
-     * @param user the object to use as the source
-     * @return a valid Actor populated from the Tweet
-     */
-    public static ActivityObject buildActor(User user) {
-        ActivityObject actor = new ActivityObject();
-        actor.setId(formatId(
-                Optional.fromNullable(
-                        user.getIdStr())
-                        .or(Optional.of(user.getId().toString()))
-                        .orNull()
-        ));
-        actor.setObjectType("page");
-        actor.setDisplayName(user.getName());
-        actor.setAdditionalProperty("handle", user.getScreenName());
-        actor.setSummary(user.getDescription());
-
-        if (user.getUrl()!=null){
-            actor.setUrl(user.getUrl());
-        }
-
-        Map<String, Object> extensions = new HashMap<>();
-        extensions.put("location", user.getLocation());
-        extensions.put("posts", user.getStatusesCount());
-        extensions.put("favorites", user.getFavouritesCount());
-        extensions.put("followers", user.getFollowersCount());
-
-        Image profileImage = new Image();
-        profileImage.setUrl(user.getProfileImageUrlHttps());
-        actor.setImage(profileImage);
-
-        extensions.put("screenName", user.getScreenName());
-
-        actor.setAdditionalProperty("extensions", extensions);
-        return actor;
-    }
 
-    /**
-     * Gets the links from the Twitter event
-     * @param tweet the object to use as the source
-     * @return a list of links corresponding to the expanded URL (no t.co)
-     */
-    public static List<String> getLinks(Tweet tweet) {
-        List<String> links = new ArrayList<>();
-        if( tweet.getEntities().getUrls() != null ) {
-            for (Url url : tweet.getEntities().getUrls()) {
-                links.add(url.getExpandedUrl());
-            }
-        }
-        else
-            LOGGER.debug("  0 links");
-        return links;
-    }
 
-    /**
-     * Builds the {@link org.apache.streams.twitter.pojo.TargetObject} from the tweet
-     * @param tweet the object to use as the source
-     * @return currently returns null for all activities
-     */
-    public static ActivityObject buildTarget(Tweet tweet) {
-        return null;
-    }
 
-    /**
-     * Adds the location extension and populates with teh twitter data
-     * @param activity the Activity object to update
-     * @param tweet the object to use as the source
-     */
-    public static void addLocationExtension(Activity activity, Tweet tweet) {
-        Map<String, Object> extensions = ExtensionUtil.getInstance().ensureExtensions(activity);
-        Map<String, Object> location = new HashMap<>();
-        location.put("id", formatId(
-                Optional.fromNullable(
-                        tweet.getIdStr())
-                        .or(Optional.of(tweet.getId().toString()))
-                        .orNull()
-        ));
-        location.put("coordinates", boundingBoxCenter(tweet.getPlace()));       
-        extensions.put("location", location);
-    }
 
-    /**
-     * Gets the common twitter {@link org.apache.streams.pojo.json.Provider} object
-     * @return a provider object representing Twitter
-     */
-    public static Provider getProvider() {
-        Provider provider = new Provider();
-        provider.setId("id:providers:twitter");
-        provider.setObjectType("application");
-        provider.setDisplayName("Twitter");
-        return provider;
+  /**
+   * Gets the links from the Twitter event
+   * @param tweet the object to use as the source
+   * @return a list of links corresponding to the expanded URL (no t.co)
+   */
+  public static List<String> getLinks(Tweet tweet) {
+    List<String> links = new ArrayList<>();
+    if ( tweet.getEntities().getUrls() != null ) {
+      for (Url url : tweet.getEntities().getUrls()) {
+        links.add(url.getExpandedUrl());
+      }
+    } else {
+      LOGGER.debug(" 0 links");
     }
-    /**
-     * Adds the given Twitter event to the activity as an extension
-     * @param activity the Activity object to update
-     * @param event the Twitter event to add as the extension
-     */
-    public static void addTwitterExtension(Activity activity, ObjectNode event) {
-        Map<String, Object> extensions = ExtensionUtil.getInstance().ensureExtensions(activity);
-        extensions.put("twitter", event);
+    return links;
+  }
+
+  /**
+   * Builds the {@link org.apache.streams.twitter.pojo.TargetObject} from the tweet.
+   * @param tweet the object to use as the source
+   * @return currently returns null for all activities
+   */
+  public static ActivityObject buildTarget(Tweet tweet) {
+    return null;
+  }
+
+  /**
+   * Adds the location extension and populates with teh twitter data.
+   * @param activity the Activity object to update
+   * @param tweet the object to use as the source
+   */
+  public static void addLocationExtension(Activity activity, Tweet tweet) {
+    Map<String, Object> extensions = ExtensionUtil.getInstance().ensureExtensions(activity);
+    Map<String, Object> location = new HashMap<>();
+    location.put("id", formatId(
+        Optional.fromNullable(
+            tweet.getIdStr())
+            .or(Optional.of(tweet.getId().toString()))
+            .orNull()
+    ));
+    location.put("coordinates", boundingBoxCenter(tweet.getPlace()));
+    extensions.put("location", location);
+  }
+
+  /**
+   * Gets the common twitter {@link org.apache.streams.pojo.json.Provider} object
+   * @return a provider object representing Twitter
+   */
+  public static Provider getProvider() {
+    Provider provider = new Provider();
+    provider.setId("id:providers:twitter");
+    provider.setObjectType("application");
+    provider.setDisplayName("Twitter");
+    return provider;
+  }
+
+  /**
+   * Adds the given Twitter event to the activity as an extension.
+   * @param activity the Activity object to update
+   * @param event the Twitter event to add as the extension
+   */
+  public static void addTwitterExtension(Activity activity, ObjectNode event) {
+    Map<String, Object> extensions = ExtensionUtil.getInstance().ensureExtensions(activity);
+    extensions.put("twitter", event);
+  }
+
+  /**
+   * Formats the ID to conform with the Apache Streams activity ID convention.
+   * @param idparts the parts of the ID to join
+   * @return a valid Activity ID in format "id:twitter:part1:part2:...partN"
+   */
+  public static String formatId(String... idparts) {
+    return Joiner.on(":").join(Lists.asList("id:twitter", idparts));
+  }
+
+  /**
+   * Takes various parameters from the twitter object that are currently not part of the
+   * activity schema and stores them in a generic extensions attribute.
+   * @param activity Activity
+   * @param tweet Tweet
+   */
+  public static void addTwitterExtensions(Activity activity, Tweet tweet) {
+    Map<String, Object> extensions = ExtensionUtil.getInstance().ensureExtensions(activity);
+
+    List<String> hashtags = new ArrayList<>();
+    for (Hashtag hashtag : tweet.getEntities().getHashtags()) {
+      hashtags.add(hashtag.getText());
     }
-    /**
-     * Formats the ID to conform with the Apache Streams activity ID convention
-     * @param idparts the parts of the ID to join
-     * @return a valid Activity ID in format "id:twitter:part1:part2:...partN"
-     */
-    public static String formatId(String... idparts) {
-        return Joiner.on(":").join(Lists.asList("id:twitter", idparts));
-    }
-
-    /**
-     * Takes various parameters from the twitter object that are currently not part of teh
-     * activity schema and stores them in a generic extensions attribute
-     * @param activity
-     * @param tweet
-     */
-    public static void addTwitterExtensions(Activity activity, Tweet tweet) {
-        Map<String, Object> extensions = ExtensionUtil.getInstance().ensureExtensions(activity);
+    extensions.put("hashtags", hashtags);
 
-        List<String> hashtags = new ArrayList<>();
-        for(Hashtag hashtag : tweet.getEntities().getHashtags()) {
-            hashtags.add(hashtag.getText());
-        }
-        extensions.put("hashtags", hashtags);
+    Map<String, Object> likes = new HashMap<>();
+    likes.put("perspectival", tweet.getFavorited());
+    likes.put("count", tweet.getAdditionalProperties().get("favorite_count"));
 
-        Map<String, Object> likes = new HashMap<>();
-        likes.put("perspectival", tweet.getFavorited());
-        likes.put("count", tweet.getAdditionalProperties().get("favorite_count"));
+    extensions.put("likes", likes);
 
-        extensions.put("likes", likes);
+    Map<String, Object> rebroadcasts = new HashMap<>();
+    rebroadcasts.put("perspectival", tweet.getRetweeted());
+    rebroadcasts.put("count", tweet.getRetweetCount());
 
-        Map<String, Object> rebroadcasts = new HashMap<>();
-        rebroadcasts.put("perspectival", tweet.getRetweeted());
-        rebroadcasts.put("count", tweet.getRetweetCount());
+    extensions.put("rebroadcasts", rebroadcasts);
 
-        extensions.put("rebroadcasts", rebroadcasts);
+    List<Map<String, Object>> userMentions = new ArrayList<>();
+    Entities entities = tweet.getEntities();
 
-        List<Map<String, Object>> userMentions = new ArrayList<>();
-        Entities entities = tweet.getEntities();
+    for (UserMentions user : entities.getUserMentions()) {
+      //Map the twitter user object into an actor
+      Map<String, Object> actor = new HashMap<>();
+      actor.put("id", "id:twitter:" + user.getIdStr());
+      actor.put("displayName", user.getName());
+      actor.put("handle", user.getScreenName());
 
-        for(UserMentions user : entities.getUserMentions()) {
-            //Map the twitter user object into an actor
-            Map<String, Object> actor = new HashMap<>();
-            actor.put("id", "id:twitter:" + user.getIdStr());
-            actor.put("displayName", user.getName());
-            actor.put("handle", user.getScreenName());
+      userMentions.add(actor);
+    }
 
-            userMentions.add(actor);
-        }
+    extensions.put("user_mentions", userMentions);
 
-        extensions.put("user_mentions", userMentions);
+    extensions.put("keywords", tweet.getText());
+  }
 
-        extensions.put("keywords", tweet.getText());
+  /**
+   * Compute central coordinates from bounding box.
+   * @param place the bounding box to use as the source
+   */
+  public static List<Double> boundingBoxCenter(Place place) {
+    if ( place == null ) {
+      return new ArrayList<>();
     }
-
-    /**
-     * Compute central coordinates from bounding box
-     * @param place the bounding box to use as the source
-     */
-    public static List<Double> boundingBoxCenter(Place place) {
-        if( place == null ) return new ArrayList<>();
-        if( place.getBoundingBox() == null ) return new ArrayList<>();
-        if( place.getBoundingBox().getCoordinates().size() != 1 ) return new ArrayList<>();
-        if( place.getBoundingBox().getCoordinates().get(0).size() != 4 ) return new ArrayList<>();
-        List<Double> lats = new ArrayList<>();
-        List<Double> lons = new ArrayList<>();
-        for( List<Double> point : place.getBoundingBox().getCoordinates().get(0)) {
-            lats.add(point.get(0));
-            lons.add(point.get(1));
-        }
-        List<Double> result = new ArrayList<>();
-        result.add(mean(lats));
-        result.add(mean(lons));
-        return result;
+    if ( place.getBoundingBox() == null ) {
+      return new ArrayList<>();
+    }
+    if ( place.getBoundingBox().getCoordinates().size() != 1 ) {
+      return new ArrayList<>();
+    }
+    if ( place.getBoundingBox().getCoordinates().get(0).size() != 4 ) {
+      return new ArrayList<>();
+    }
+    List<Double> lats = new ArrayList<>();
+    List<Double> lons = new ArrayList<>();
+    for ( List<Double> point : place.getBoundingBox().getCoordinates().get(0)) {
+      lats.add(point.get(0));
+      lons.add(point.get(1));
     }
+    List<Double> result = new ArrayList<>();
+    result.add(mean(lats));
+    result.add(mean(lons));
+    return result;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java
index 046cb76..c1c205b 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java
@@ -19,9 +19,6 @@
 
 package org.apache.streams.twitter.processor;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Lists;
-import java.util.List;
 import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
@@ -31,11 +28,14 @@ import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.twitter.TwitterConfiguration;
 import org.apache.streams.twitter.TwitterStreamConfiguration;
+import org.apache.streams.twitter.converter.TwitterDocumentClassifier;
 import org.apache.streams.twitter.pojo.Delete;
 import org.apache.streams.twitter.pojo.Retweet;
 import org.apache.streams.twitter.pojo.Tweet;
-import org.apache.streams.twitter.provider.TwitterEventClassifier;
 import org.apache.streams.twitter.provider.TwitterProviderUtil;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import twitter4j.Status;
@@ -45,6 +45,8 @@ import twitter4j.TwitterFactory;
 import twitter4j.TwitterObjectFactory;
 import twitter4j.conf.ConfigurationBuilder;
 
+import java.util.List;
+
 import static org.apache.streams.twitter.converter.util.TwitterActivityUtil.getProvider;
 import static org.apache.streams.twitter.converter.util.TwitterActivityUtil.updateActivity;
 
@@ -54,132 +56,132 @@ import static org.apache.streams.twitter.converter.util.TwitterActivityUtil.upda
  */
 public class FetchAndReplaceTwitterProcessor implements StreamsProcessor {
 
-    private static final String PROVIDER_ID = getProvider().getId();
-    private static final Logger LOGGER = LoggerFactory.getLogger(FetchAndReplaceTwitterProcessor.class);
-
-    //Default number of attempts before allowing the document through
-    private static final int MAX_ATTEMPTS = 5;
-    //Start the backoff at 4 minutes.  This results in a wait period of 4, 8, 12, 16 & 20 min with an attempt of 5
-    public static final int BACKOFF = 1000 * 60 * 4;
-
-    private final TwitterConfiguration config;
-    private Twitter client;
-    private ObjectMapper mapper;
-    private int retryCount;
-
-    public FetchAndReplaceTwitterProcessor() {
-        this(new ComponentConfigurator<>(TwitterStreamConfiguration.class).detectConfiguration(StreamsConfigurator.config, "twitter"));
-    }
-
-    public FetchAndReplaceTwitterProcessor(TwitterStreamConfiguration config) {
-        this.config = config;
+  private static final String PROVIDER_ID = getProvider().getId();
+  private static final Logger LOGGER = LoggerFactory.getLogger(FetchAndReplaceTwitterProcessor.class);
+
+  //Default number of attempts before allowing the document through
+  private static final int MAX_ATTEMPTS = 5;
+  //Start the backoff at 4 minutes.  This results in a wait period of 4, 8, 12, 16 & 20 min with an attempt of 5
+  public static final int BACKOFF = 1000 * 60 * 4;
+
+  private final TwitterConfiguration config;
+  private Twitter client;
+  private ObjectMapper mapper;
+  private int retryCount;
+
+  public FetchAndReplaceTwitterProcessor() {
+    this(new ComponentConfigurator<>(TwitterStreamConfiguration.class).detectConfiguration(StreamsConfigurator.config, "twitter"));
+  }
+
+  public FetchAndReplaceTwitterProcessor(TwitterStreamConfiguration config) {
+    this.config = config;
+  }
+
+  @Override
+  public String getId() {
+    return getProvider().getId();
+  }
+
+  @Override
+  public List<StreamsDatum> process(StreamsDatum entry) {
+    if (entry.getDocument() instanceof Activity) {
+      Activity doc = (Activity)entry.getDocument();
+      String originalId = doc.getId();
+      if (PROVIDER_ID.equals(doc.getProvider().getId())) {
+        fetchAndReplace(doc, originalId);
+      }
+    } else {
+      throw new IllegalStateException("Requires an activity document");
     }
-
-    @Override
-    public String getId() {
-        return getProvider().getId();
+    return Lists.newArrayList(entry);
+  }
+
+
+  @Override
+  public void prepare(Object configurationObject) {
+    this.client = getTwitterClient();
+    this.mapper = StreamsJacksonMapper.getInstance();
+  }
+
+  @Override
+  public void cleanUp() {
+
+  }
+
+  protected void fetchAndReplace(Activity doc, String originalId) {
+    try {
+      String json = fetch(doc);
+      replace(doc, json);
+      doc.setId(originalId);
+      retryCount = 0;
+    } catch (TwitterException tw) {
+      if (tw.exceededRateLimitation()) {
+        sleepAndTryAgain(doc, originalId);
+      }
+    } catch (Exception ex) {
+      LOGGER.warn("Error fetching and replacing tweet for activity {}", doc.getId());
     }
-
-    @Override
-    public List<StreamsDatum> process(StreamsDatum entry) {
-        if(entry.getDocument() instanceof Activity) {
-            Activity doc = (Activity)entry.getDocument();
-            String originalId = doc.getId();
-            if(PROVIDER_ID.equals(doc.getProvider().getId())) {
-                fetchAndReplace(doc, originalId);
-            }
-        } else {
-            throw new IllegalStateException("Requires an activity document");
-        }
-        return Lists.newArrayList(entry);
+  }
+
+  protected void replace(Activity doc, String json) throws java.io.IOException, ActivityConversionException {
+    Class documentSubType = new TwitterDocumentClassifier().detectClasses(json).get(0);
+    Object object = mapper.readValue(json, documentSubType);
+
+    if (documentSubType.equals(Retweet.class) || documentSubType.equals(Tweet.class)) {
+      updateActivity((Tweet)object, doc);
+    } else if (documentSubType.equals(Delete.class)) {
+      updateActivity((Delete)object, doc);
+    } else {
+      LOGGER.info("Could not determine the correct update method for {}", documentSubType);
     }
+  }
 
+  protected String fetch(Activity doc) throws TwitterException {
+    String id = doc.getObject().getId();
+    LOGGER.debug("Fetching status from Twitter for {}", id);
+    Long tweetId = Long.valueOf(id.replace("id:twitter:tweets:", ""));
+    Status status = getTwitterClient().showStatus(tweetId);
+    return TwitterObjectFactory.getRawJSON(status);
+  }
 
-    @Override
-    public void prepare(Object configurationObject) {
-        this.client = getTwitterClient();
-        this.mapper = StreamsJacksonMapper.getInstance();
-    }
 
-    @Override
-    public void cleanUp() {
+  protected Twitter getTwitterClient() {
 
-    }
+    if (this.client == null) {
 
-    protected void fetchAndReplace(Activity doc, String originalId) {
-        try {
-            String json = fetch(doc);
-            replace(doc, json);
-            doc.setId(originalId);
-            retryCount = 0;
-        } catch(TwitterException tw) {
-            if(tw.exceededRateLimitation()) {
-                sleepAndTryAgain(doc, originalId);
-            }
-        } catch (Exception e) {
-            LOGGER.warn("Error fetching and replacing tweet for activity {}", doc.getId());
-        }
-    }
+      String baseUrl = TwitterProviderUtil.baseUrl(config);
 
-    protected void replace(Activity doc, String json) throws java.io.IOException, ActivityConversionException {
-        Class documentSubType = TwitterEventClassifier.detectClass(json);
-        Object object = mapper.readValue(json, documentSubType);
-
-        if(documentSubType.equals(Retweet.class) || documentSubType.equals(Tweet.class)) {
-            updateActivity((Tweet)object, doc);
-        } else if(documentSubType.equals(Delete.class)) {
-            updateActivity((Delete)object, doc);
-        } else {
-            LOGGER.info("Could not determine the correct update method for {}", documentSubType);
-        }
-    }
+      ConfigurationBuilder builder = new ConfigurationBuilder()
+          .setOAuthConsumerKey(config.getOauth().getConsumerKey())
+          .setOAuthConsumerSecret(config.getOauth().getConsumerSecret())
+          .setOAuthAccessToken(config.getOauth().getAccessToken())
+          .setOAuthAccessTokenSecret(config.getOauth().getAccessTokenSecret())
+          .setIncludeEntitiesEnabled(true)
+          .setJSONStoreEnabled(true)
+          .setAsyncNumThreads(1)
+          .setRestBaseURL(baseUrl)
+          .setIncludeMyRetweetEnabled(Boolean.TRUE)
+          .setPrettyDebugEnabled(Boolean.TRUE);
 
-    protected String fetch(Activity doc) throws TwitterException {
-        String id = doc.getObject().getId();
-        LOGGER.debug("Fetching status from Twitter for {}", id);
-        Long tweetId = Long.valueOf(id.replace("id:twitter:tweets:", ""));
-        Status status = getTwitterClient().showStatus(tweetId);
-        return TwitterObjectFactory.getRawJSON(status);
+      this.client = new TwitterFactory(builder.build()).getInstance();
     }
-
-
-    protected Twitter getTwitterClient()
-    {
-        if(this.client == null) {
-
-            String baseUrl = TwitterProviderUtil.baseUrl(config);
-
-            ConfigurationBuilder builder = new ConfigurationBuilder()
-                    .setOAuthConsumerKey(config.getOauth().getConsumerKey())
-                    .setOAuthConsumerSecret(config.getOauth().getConsumerSecret())
-                    .setOAuthAccessToken(config.getOauth().getAccessToken())
-                    .setOAuthAccessTokenSecret(config.getOauth().getAccessTokenSecret())
-                    .setIncludeEntitiesEnabled(true)
-                    .setJSONStoreEnabled(true)
-                    .setAsyncNumThreads(1)
-                    .setRestBaseURL(baseUrl)
-                    .setIncludeMyRetweetEnabled(Boolean.TRUE)
-                    .setPrettyDebugEnabled(Boolean.TRUE);
-
-            this.client = new TwitterFactory(builder.build()).getInstance();
-        }
-        return this.client;
-    }
-
-    //Hardcore sleep to allow for catch up
-    protected void sleepAndTryAgain(Activity doc, String originalId) {
-        try {
-            //Attempt to fetchAndReplace with a backoff up to the limit then just reset the count and let the process continue
-            if(retryCount < MAX_ATTEMPTS) {
-                retryCount++;
-                LOGGER.debug("Sleeping for {} min due to excessive calls to Twitter API", (retryCount * 4));
-                Thread.sleep(BACKOFF * retryCount);
-                fetchAndReplace(doc, originalId);
-            } else {
-                retryCount = 0;
-            }
-        } catch (InterruptedException e) {
-            LOGGER.warn("Thread sleep interrupted while waiting for twitter backoff");
-        }
+    return this.client;
+  }
+
+  //Hardcore sleep to allow for catch up
+  protected void sleepAndTryAgain(Activity doc, String originalId) {
+    try {
+      //Attempt to fetchAndReplace with a backoff up to the limit then just reset the count and let the process continue
+      if (retryCount < MAX_ATTEMPTS) {
+        retryCount++;
+        LOGGER.debug("Sleeping for {} min due to excessive calls to Twitter API", (retryCount * 4));
+        Thread.sleep(BACKOFF * retryCount);
+        fetchAndReplace(doc, originalId);
+      } else {
+        retryCount = 0;
+      }
+    } catch (InterruptedException ex) {
+      LOGGER.warn("Thread sleep interrupted while waiting for twitter backoff");
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
deleted file mode 100644
index ed6b90a..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.processor;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Lists;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.twitter.converter.StreamsTwitterMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-/**
- * This class performs conversion of a twitter event to a specified outClass
- *
- * Deprecated: use TypeConverterProcessor and ActivityConverterProcessor instead
- */
-@Deprecated
-public class TwitterEventProcessor implements StreamsProcessor {
-
-    private final static String STREAMS_ID = "TwitterEventProcessor";
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(TwitterEventProcessor.class);
-
-    private ObjectMapper mapper = new StreamsTwitterMapper();
-
-    private Class inClass;
-    private Class outClass;
-
-    public TwitterEventProcessor(Class inClass, Class outClass) {
-        this.inClass = inClass;
-        this.outClass = outClass;
-    }
-
-    public TwitterEventProcessor( Class outClass) {
-        this(null, outClass);
-    }
-
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
-
-    @Override
-    public List<StreamsDatum> process(StreamsDatum entry) {
-
-        LOGGER.error("You are calling a deprecated / defunct class.  Modify your stream to use ActivityConverterProcessor.");
-
-        LOGGER.debug("CONVERT FAILED");
-
-        return Lists.newArrayList();
-
-    }
-
-    @Override
-    public void prepare(Object configurationObject) {
-        mapper = StreamsJacksonMapper.getInstance();
-    }
-
-    @Override
-    public void cleanUp() {
-
-    }
-};

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterProfileProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterProfileProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterProfileProcessor.java
deleted file mode 100644
index d49a54f..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterProfileProcessor.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.processor;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
-import org.apache.streams.twitter.pojo.Retweet;
-import org.apache.streams.twitter.pojo.Tweet;
-import org.apache.streams.twitter.pojo.User;
-import org.apache.streams.twitter.provider.TwitterEventClassifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Queue;
-import java.util.Random;
-
-public class TwitterProfileProcessor implements StreamsProcessor, Runnable {
-
-    private final static String STREAMS_ID = "TwitterProfileProcessor";
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(TwitterProfileProcessor.class);
-
-    private ObjectMapper mapper = StreamsJacksonMapper.getInstance(TwitterDateTimeFormat.TWITTER_FORMAT);
-
-    private Queue<StreamsDatum> inQueue;
-    private Queue<StreamsDatum> outQueue;
-
-    private final static String TERMINATE = "TERMINATE";
-
-    @Override
-    public void run() {
-
-        while(true) {
-            StreamsDatum item;
-            try {
-                item = inQueue.poll();
-                if(item.getDocument() instanceof String && item.equals(TERMINATE)) {
-                    LOGGER.info("Terminating!");
-                    break;
-                }
-
-                Thread.sleep(new Random().nextInt(100));
-
-                for( StreamsDatum entry : process(item)) {
-                    outQueue.offer(entry);
-                }
-
-
-            } catch (Exception e) {
-                e.printStackTrace();
-
-            }
-        }
-    }
-
-    public StreamsDatum createStreamsDatum(User user) {
-        return new StreamsDatum(user, user.getIdStr());
-    }
-
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
-
-    @Override
-    public List<StreamsDatum> process(StreamsDatum entry) {
-
-        List<StreamsDatum> result = new ArrayList<>();
-        String item;
-        try {
-            // first check for valid json
-            // since data is coming from outside provider, we don't know what type the events are
-            if( entry.getDocument() instanceof String) {
-                item = (String) entry.getDocument();
-            } else {
-                item = mapper.writeValueAsString(entry.getDocument());
-            }
-
-            Class inClass = TwitterEventClassifier.detectClass(item);
-
-            User user;
-
-            if ( inClass.equals( Tweet.class )) {
-                LOGGER.debug("TWEET");
-                Tweet tweet = mapper.readValue(item, Tweet.class);
-                user = tweet.getUser();
-                result.add(createStreamsDatum(user));
-            }
-            else if ( inClass.equals( Retweet.class )) {
-                LOGGER.debug("RETWEET");
-                Retweet retweet = mapper.readValue(item, Retweet.class);
-                user = retweet.getRetweetedStatus().getUser();
-                result.add(createStreamsDatum(user));
-            } else if ( inClass.equals( User.class )) {
-                LOGGER.debug("USER");
-                user = mapper.readValue(item, User.class);
-                result.add(createStreamsDatum(user));
-            } else {
-                return new ArrayList<>();
-            }
-
-            return result;
-        } catch (Exception e) {
-            e.printStackTrace();
-            LOGGER.warn("Error processing " + entry.toString());
-            return new ArrayList<>();
-        }
-    }
-
-    @Override
-    public void prepare(Object o) {
-
-    }
-
-    @Override
-    public void cleanUp() {
-
-    }
-};

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
index cc1ecb1..d51e4e7 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
@@ -21,13 +21,14 @@ package org.apache.streams.twitter.processor;
 import org.apache.streams.converter.ActivityConverterProcessor;
 
 /**
- * This class performs conversion of a twitter event to a specified outClass
+ * This class performs conversion of a twitter event to a specified outClass.
  *
+ * <p/>
  * Deprecated: use TypeConverterProcessor and ActivityConverterProcessor instead
  */
 @Deprecated
 public class TwitterTypeConverter extends ActivityConverterProcessor {
 
-    public final static String STREAMS_ID = "TwitterTypeConverter";
+  public static final String STREAMS_ID = "TwitterTypeConverter";
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java
index 30db471..0dd43bb 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java
@@ -18,63 +18,71 @@
 
 package org.apache.streams.twitter.processor;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 import org.apache.streams.components.http.HttpProcessorConfiguration;
 import org.apache.streams.components.http.processor.SimpleHTTPGetProcessor;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
 import org.apache.streams.pojo.json.Activity;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 /**
- * Class gets a global share count from Twitter API for links on Activity datums
+ * Class gets a global share count from Twitter API for links on Activity datums.
  */
 public class TwitterUrlApiProcessor extends SimpleHTTPGetProcessor implements StreamsProcessor {
 
-    private final static String STREAMS_ID = "TwitterUrlApiProcessor";
+  private static final String STREAMS_ID = "TwitterUrlApiProcessor";
 
-    public TwitterUrlApiProcessor() {
-        super();
-        this.configuration.setHostname("urls.api.twitter.com");
-        this.configuration.setResourcePath("/1/urls/count.json");
-        this.configuration.setEntity(HttpProcessorConfiguration.Entity.ACTIVITY);
-        this.configuration.setExtension("twitter_url_count");
-    }
+  /**
+   * TwitterUrlApiProcessor constructor.
+   */
+  public TwitterUrlApiProcessor() {
+    super();
+    this.configuration.setHostname("urls.api.twitter.com");
+    this.configuration.setResourcePath("/1/urls/count.json");
+    this.configuration.setEntity(HttpProcessorConfiguration.Entity.ACTIVITY);
+    this.configuration.setExtension("twitter_url_count");
+  }
 
-    public TwitterUrlApiProcessor(HttpProcessorConfiguration processorConfiguration) {
-        super(processorConfiguration);
-        this.configuration.setHostname("urls.api.twitter.com");
-        this.configuration.setResourcePath("/1/urls/count.json");
-        this.configuration.setEntity(HttpProcessorConfiguration.Entity.ACTIVITY);
-        this.configuration.setExtension("twitter_url_count");
-    }
+  /**
+   * TwitterUrlApiProcessor constructor.
+   */
+  public TwitterUrlApiProcessor(HttpProcessorConfiguration processorConfiguration) {
+    super(processorConfiguration);
+    this.configuration.setHostname("urls.api.twitter.com");
+    this.configuration.setResourcePath("/1/urls/count.json");
+    this.configuration.setEntity(HttpProcessorConfiguration.Entity.ACTIVITY);
+    this.configuration.setExtension("twitter_url_count");
+  }
 
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
 
-    @Override
-    public List<StreamsDatum> process(StreamsDatum entry) {
-        Preconditions.checkArgument(entry.getDocument() instanceof Activity);
-        Activity activity = mapper.convertValue(entry.getDocument(), Activity.class);
-        if( activity.getLinks() != null && activity.getLinks().size() > 0)
-            return super.process(entry);
-        else
-            return Lists.newArrayList(entry);
+  @Override
+  public List<StreamsDatum> process(StreamsDatum entry) {
+    Preconditions.checkArgument(entry.getDocument() instanceof Activity);
+    Activity activity = mapper.convertValue(entry.getDocument(), Activity.class);
+    if ( activity.getLinks() != null && activity.getLinks().size() > 0) {
+      return super.process(entry);
+    } else {
+      return Lists.newArrayList(entry);
     }
+  }
 
-    @Override
-    protected Map<String, String> prepareParams(StreamsDatum entry) {
+  @Override
+  protected Map<String, String> prepareParams(StreamsDatum entry) {
 
-        Map<String, String> params = new HashMap<>();
+    Map<String, String> params = new HashMap<>();
 
-        params.put("url", mapper.convertValue(entry.getDocument(), Activity.class).getLinks().get(0));
+    params.put("url", mapper.convertValue(entry.getDocument(), Activity.class).getLinks().get(0));
 
-        return params;
-    }
+    return params;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java
index 90f6b62..ec43fba 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java
@@ -21,120 +21,113 @@ package org.apache.streams.twitter.provider;
 import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.twitter.TwitterConfiguration;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import twitter4j.RateLimitStatus;
 import twitter4j.Twitter;
 import twitter4j.TwitterException;
-import twitter4j.RateLimitStatus;
 
 /**
  *  Handle expected and unexpected exceptions.
  */
-public class TwitterErrorHandler
-{
-    private final static Logger LOGGER = LoggerFactory.getLogger(TwitterErrorHandler.class);
-
-    // selected because 3 * 5 + n >= 15 for positive n
-    protected static long retry =
-            new ComponentConfigurator<TwitterConfiguration>(TwitterConfiguration.class).detectConfiguration(
-                    StreamsConfigurator.getConfig().getConfig("twitter")
-            ).getRetrySleepMs();
-    protected static long retryMax =
-            new ComponentConfigurator<TwitterConfiguration>(TwitterConfiguration.class).detectConfiguration(
-                    StreamsConfigurator.getConfig().getConfig("twitter")
-            ).getRetryMax();
-
-    @Deprecated
-    public static int handleTwitterError(Twitter twitter, Exception exception) {
-        return handleTwitterError( twitter, null, exception);
-    }
+public class TwitterErrorHandler {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(TwitterErrorHandler.class);
+
+  // selected because 3 * 5 + n >= 15 for positive n
+  protected static long retry =
+      new ComponentConfigurator<TwitterConfiguration>(TwitterConfiguration.class).detectConfiguration(
+          StreamsConfigurator.getConfig().getConfig("twitter")
+      ).getRetrySleepMs();
+  protected static long retryMax =
+      new ComponentConfigurator<TwitterConfiguration>(TwitterConfiguration.class).detectConfiguration(
+          StreamsConfigurator.getConfig().getConfig("twitter")
+      ).getRetryMax();
+
+  @Deprecated
+  public static int handleTwitterError(Twitter twitter, Exception exception) {
+    return handleTwitterError( twitter, null, exception);
+  }
+
+  /**
+   * handleTwitterError.
+   * @param twitter Twitter
+   * @param id id
+   * @param exception exception
+   * @return
+   */
+  public static int handleTwitterError(Twitter twitter, Long id, Exception exception) {
+
+    if (exception instanceof TwitterException) {
+      TwitterException twitterException = (TwitterException)exception;
+
+      if (twitterException.exceededRateLimitation()) {
+
+        long millisUntilReset = retry;
+
+        final RateLimitStatus rateLimitStatus = twitterException.getRateLimitStatus();
+        if (rateLimitStatus != null) {
+          millisUntilReset = rateLimitStatus.getSecondsUntilReset() * 1000;
+        }
+
+        LOGGER.warn("Rate Limit Exceeded. Will retry in {} seconds...", millisUntilReset / 1000);
+
+        try {
+          Thread.sleep(millisUntilReset);
+        } catch (InterruptedException e1) {
+          Thread.currentThread().interrupt();
+        }
 
-    public static int handleTwitterError(Twitter twitter, Long id, Exception exception)
-    {
-        if(exception instanceof TwitterException)
-        {
-            TwitterException e = (TwitterException)exception;
-            if(e.exceededRateLimitation())
-            {
-                long millisUntilReset = retry;
-
-                final RateLimitStatus rateLimitStatus = e.getRateLimitStatus();
-                if (rateLimitStatus != null) {
-                    millisUntilReset = rateLimitStatus.getSecondsUntilReset() * 1000;
-                }
-
-                LOGGER.warn("Rate Limit Exceeded. Will retry in {} seconds...", millisUntilReset / 1000);
-
-                try {
-                    Thread.sleep(millisUntilReset);
-                } catch (InterruptedException e1) {
-                    Thread.currentThread().interrupt();
-                }
-
-                return 1;
-            }
-            else if(e.isCausedByNetworkIssue())
-            {
-                LOGGER.info("Twitter Network Issues Detected. Backing off...");
-                LOGGER.info("{} - {}", e.getExceptionCode(), e.getLocalizedMessage());
-                try {
-                    Thread.sleep(retry);
-                } catch (InterruptedException e1) {
-                    Thread.currentThread().interrupt();
-                }
-                return 1;
-            }
-            else if(e.isErrorMessageAvailable())
-            {
-                if(e.getMessage().toLowerCase().contains("does not exist"))
-                {
-                    if( id != null )
-                        LOGGER.warn("User does not exist: {}", id);
-                    else
-                        LOGGER.warn("User does not exist");
-                    return (int)retryMax;
-                }
-                else
-                {
-                    return (int)retryMax/3;
-                }
-            }
-            else
-            {
-                if(e.getExceptionCode().equals("ced778ef-0c669ac0"))
-                {
-                    // This is a known weird issue, not exactly sure the cause, but you'll never be able to get the data.
-                    return (int)retryMax/3;
-                }
-                else if(e.getExceptionCode().equals("4be80492-0a7bf7c7")) {
-                    // This is a 401 reflecting credentials don't have access to the requested resource.
-                    if( id != null )
-                        LOGGER.warn("Authentication Exception accessing id: {}", id);
-                    else
-                        LOGGER.warn("Authentication Exception");
-                    return (int)retryMax;
-                }
-                else
-                {
-                    LOGGER.warn("Unknown Twitter Exception...");
-                    LOGGER.warn("  Account: {}", twitter);
-                    LOGGER.warn("   Access: {}", e.getAccessLevel());
-                    LOGGER.warn("     Code: {}", e.getExceptionCode());
-                    LOGGER.warn("  Message: {}", e.getLocalizedMessage());
-                    return (int)retryMax/10;
-                }
-            }
+        return 1;
+      } else if (twitterException.isCausedByNetworkIssue()) {
+        LOGGER.info("Twitter Network Issues Detected. Backing off...");
+        LOGGER.info("{} - {}", twitterException.getExceptionCode(), twitterException.getLocalizedMessage());
+        try {
+          Thread.sleep(retry);
+        } catch (InterruptedException e1) {
+          Thread.currentThread().interrupt();
         }
-        else if(exception instanceof RuntimeException)
-        {
-            LOGGER.warn("TwitterGrabber: Unknown Runtime Error", exception.getMessage());
-            return (int)retryMax/3;
+        return 1;
+      } else if (twitterException.isErrorMessageAvailable()) {
+        if (twitterException.getMessage().toLowerCase().contains("does not exist")) {
+          if ( id != null ) {
+            LOGGER.warn("User does not exist: {}", id);
+          } else {
+            LOGGER.warn("User does not exist");
+          }
+          return (int)retryMax;
+        } else {
+          return (int)retryMax / 3;
         }
-        else
-        {
-            LOGGER.info("Completely Unknown Exception: {}", exception);
-            return (int)retryMax/3;
+      } else {
+        if (twitterException.getExceptionCode().equals("ced778ef-0c669ac0")) {
+          // This is a known weird issue, not exactly sure the cause, but you'll never be able to get the data.
+          return (int)retryMax / 3;
+        } else if (twitterException.getExceptionCode().equals("4be80492-0a7bf7c7")) {
+          // This is a 401 reflecting credentials don't have access to the requested resource.
+          if ( id != null ) {
+            LOGGER.warn("Authentication Exception accessing id: {}", id);
+          } else {
+            LOGGER.warn("Authentication Exception");
+          }
+          return (int)retryMax;
+        } else {
+          LOGGER.warn("Unknown Twitter Exception...");
+          LOGGER.warn("  Account: {}", twitter);
+          LOGGER.warn("   Access: {}", twitterException.getAccessLevel());
+          LOGGER.warn("     Code: {}", twitterException.getExceptionCode());
+          LOGGER.warn("  Message: {}", twitterException.getLocalizedMessage());
+          return (int)retryMax / 10;
         }
+      }
+    } else if (exception instanceof RuntimeException) {
+      LOGGER.warn("TwitterGrabber: Unknown Runtime Error", exception.getMessage());
+      return (int)retryMax / 3;
+    } else {
+      LOGGER.info("Completely Unknown Exception: {}", exception);
+      return (int)retryMax / 3;
     }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java
deleted file mode 100644
index 9466c2e..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.provider;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.StringUtils;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
-import org.apache.streams.twitter.pojo.Delete;
-import org.apache.streams.twitter.pojo.FriendList;
-import org.apache.streams.twitter.pojo.Retweet;
-import org.apache.streams.twitter.pojo.Tweet;
-import org.apache.streams.twitter.pojo.User;
-import org.apache.streams.twitter.pojo.UserstreamEvent;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-/**
- * TwitterEventClassifier classifies twitter events
- *
- * @Deprecated - replaced by TwitterDocumentClassifier - use ActivityConverterProcessor
- */
-public class TwitterEventClassifier implements Serializable {
-
-    private static ObjectMapper mapper = new StreamsJacksonMapper(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT));
-
-    public static Class detectClass( String json ) {
-        Preconditions.checkNotNull(json);
-        Preconditions.checkArgument(StringUtils.isNotEmpty(json));
-
-        ObjectNode objectNode;
-        try {
-            objectNode = (ObjectNode) mapper.readTree(json);
-        } catch (IOException e) {
-            e.printStackTrace();
-            return null;
-        }
-
-        if( objectNode.findValue("retweeted_status") != null && objectNode.get("retweeted_status") != null)
-            return Retweet.class;
-        else if( objectNode.findValue("delete") != null )
-            return Delete.class;
-        else if( objectNode.findValue("friends") != null ||
-                objectNode.findValue("friends_str") != null )
-            return FriendList.class;
-        else if( objectNode.findValue("target_object") != null )
-            return UserstreamEvent.class;
-        else if ( objectNode.findValue("location") != null && objectNode.findValue("user") == null)
-            return User.class;
-        else
-            return Tweet.class;
-    }
-
-}