You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/05/05 20:53:50 UTC

[03/52] [abbrv] git commit: making activity.links an array of strings, so it's useful adding processor-urls, still debugging

making activity.links an array of strings, so it's useful
adding processor-urls, still debugging


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/9e757aed
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/9e757aed
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/9e757aed

Branch: refs/heads/sblackmon
Commit: 9e757aed94d61c93391af2da1f7f3789dbcc1d93
Parents: b59bcd2
Author: sblackmon <sb...@w2odigital.com>
Authored: Fri Mar 28 21:26:49 2014 -0400
Committer: sblackmon <sb...@w2odigital.com>
Committed: Fri Mar 28 21:26:49 2014 -0400

----------------------------------------------------------------------
 streams-contrib/pom.xml                         |   2 +-
 streams-contrib/streams-processor-urls/pom.xml  |  65 ++
 .../main/java/org/apache/streams/urls/Link.java |  57 ++
 .../org/apache/streams/urls/LinkUnwinder.java   | 372 ++++++++++
 .../streams/urls/LinkUnwinderProcessor.java     |  76 ++
 .../streams/urls/TestLinkUnwinderProcessor.java |  76 ++
 .../serializer/DatasiftActivitySerializer.java  |   4 +-
 .../TwitterJsonActivitySerializer.java          | 124 ++++
 .../TwitterJsonTweetActivitySerializer.java     |   6 +-
 .../twitter/test/TweetActivitySerDeTest.java    | 118 ++++
 .../streams/twitter/test/TweetSerDeTest.java    |  14 +
 .../src/test/resources/testtweets.txt           | 695 +++++++++++++++++++
 .../jackson/StreamsDateTimeDeserializer.java    |  32 +
 .../jackson/StreamsDateTimeSerializer.java      |  28 +
 .../streams/jackson/StreamsJacksonModule.java   |  16 +
 .../org/apache/streams/pojo/json/activity.json  |   4 +-
 16 files changed, 1681 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9e757aed/streams-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml
index 4f7a22f..67948c2 100644
--- a/streams-contrib/pom.xml
+++ b/streams-contrib/pom.xml
@@ -43,7 +43,7 @@
         <module>streams-persist-hdfs</module>
         <module>streams-persist-kafka</module>
         <module>streams-persist-mongo</module>
-        <module>streams-processor-urlredirect</module>
+        <module>streams-processor-urls</module>
         <module>streams-provider-datasift</module>
         <module>streams-provider-facebook</module>
         <module>streams-provider-google</module>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9e757aed/streams-contrib/streams-processor-urls/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-urls/pom.xml b/streams-contrib/streams-processor-urls/pom.xml
new file mode 100644
index 0000000..966155f
--- /dev/null
+++ b/streams-contrib/streams-processor-urls/pom.xml
@@ -0,0 +1,65 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>streams-processor-urls</artifactId>
+    <version>0.1-SNAPSHOT</version>
+
+    <parent>
+        <groupId>org.apache.streams</groupId>
+        <artifactId>streams-contrib</artifactId>
+        <version>0.1-SNAPSHOT</version>
+    </parent>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-config</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-pojo</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-annotations</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.jsonschema2pojo</groupId>
+            <artifactId>jsonschema2pojo-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <sourceDirectory>src/main/java</sourceDirectory>
+        <testSourceDirectory>src/test/java</testSourceDirectory>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+            </resource>
+        </resources>
+        <testResources>
+            <testResource>
+                <directory>src/test/resources</directory>
+            </testResource>
+        </testResources>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9e757aed/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/Link.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/Link.java b/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/Link.java
new file mode 100644
index 0000000..ec282f5
--- /dev/null
+++ b/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/Link.java
@@ -0,0 +1,57 @@
+package org.apache.streams.urls;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+
+public interface Link
+{
+    @JsonProperty("status")
+    public LinkStatus getStatus();
+
+    @JsonProperty("originalUrl")
+    public String getOriginalURL();
+
+    @JsonProperty("wasRedirected")
+    public boolean wasRedirected();
+
+    @JsonProperty("finalUrl")
+    public String getFinalURL();
+
+    @JsonProperty("domain")
+    public String getDomain();
+
+    @JsonProperty("normalizedUrl")
+    public String getNormalizedURL();
+
+    @JsonProperty("urlParts")
+    public List<String> getUrlParts();
+
+    @JsonProperty("finalStatusCode")
+    public String getFinalResponseCode();
+
+    @JsonProperty("isTracked")
+    public boolean isTracked();
+
+    @JsonProperty("redirects")
+    public List<String> getRedirects();
+
+    @JsonProperty("tookInMillis")
+    public long getTookInMillis();
+
+    public void run();
+
+    public enum LinkStatus {
+        SUCCESS,
+        ERROR,
+        MALFORMED_URL,
+        NOT_FOUND,
+        FORBIDDEN,
+        REDIRECT_ERROR,
+        UNAUTHORIZED,
+        LOOP,
+        HTTP_ERROR_STATUS,
+        EXCEPTION
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9e757aed/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkUnwinder.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkUnwinder.java b/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkUnwinder.java
new file mode 100644
index 0000000..a4a28f1
--- /dev/null
+++ b/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkUnwinder.java
@@ -0,0 +1,372 @@
+package org.apache.streams.urls;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.HttpURLConnection;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLDecoder;
+import java.util.*;
+
+/**
+ * References:
+ * Some helpful references to help
+ * Purpose              URL
+ * -------------        ----------------------------------------------------------------
+ * [Status Codes]       http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html
+ * [Test Cases]         http://greenbytes.de/tech/tc/httpredirects/
+ * [t.co behavior]      https://dev.twitter.com/docs/tco-redirection-behavior
+ */
+
+public class LinkUnwinder implements Link
+{
+    private final static Logger LOGGER = LoggerFactory.getLogger(LinkUnwinder.class);
+
+    private static final int MAX_ALLOWED_REDIRECTS = 30;
+    private static final int DEFAULT_HTTP_TIMEOUT = 5000; //originally 30000
+    private static final String LOCATION_IDENTIFIER = "location";
+    private static final String SET_COOKIE_IDENTIFIER = "set-cookie";
+
+    private Date startTime = new Date();
+    private String originalURL;
+    private LinkStatus status;
+    private String finalURL;
+    private String domain;
+    private boolean wasRedirected;
+    private List<String> redirects = new ArrayList<String>();
+    private boolean isTracked = false;
+    private int finalResponseCode;
+    private Collection<String> cookies;
+
+    private String normalizedUrl;
+    private List<String> urlParts;
+
+    private int redirectCount = 0;
+    private long tookInMillis = 0;
+
+    private static final Collection<String> BOTS_ARE_OK = new ArrayList<String>() {{
+       add("t.co");
+    }};
+
+    private static final Collection<String> URL_TRACKING_TO_REMOVE = new ArrayList<String>() {{
+        /******************************************************************
+         * Google uses parameters in the URL string to track referrers
+         * on their Google Analytics and promotions. These are the
+         * identified URL patterns.
+         *
+         * URL:
+         * https://support.google.com/analytics/answer/1033867?hl=en
+         *****************************************************************/
+
+        // Required. Use utm_source to identify a search engine, newsletter name, or other source.
+        add("([\\?&])utm_source(=)[^&?]*");
+
+        // Required. Use utm_medium to identify a medium such as email or cost-per- click.
+        add("([\\?&])utm_medium(=)[^&?]*");
+
+        // Used for paid search. Use utm_term to note the keywords for this ad.
+        add("([\\?&])utm_term(=)[^&?]*");
+
+        // Used for A/B testing and content-targeted ads. Use utm_content to differentiate ads or links that point to the same
+        add("([\\?&])utm_content(=)[^&?]*");
+
+        // Used for keyword analysis. Use utm_campaign to identify a specific product promotion or strategic campaign.
+        add("([\\?&])utm_campaign(=)[^&?]*");
+    }};
+
+    public boolean isFailure()              { return false; }
+    public String getOriginalURL()          { return this.originalURL; }
+    public LinkStatus getStatus()           { return this.status; }
+    public String getDomain()               { return this.domain; }
+    public String getFinalURL()             { return this.finalURL; }
+    public List<String> getRedirects()      { return this.redirects; }
+    public boolean wasRedirected()          { return this.wasRedirected; }
+    public boolean isTracked()              { return this.isTracked; }
+    public String getFinalResponseCode()    { return Integer.toString(this.finalResponseCode); }
+    public long getTookInMillis()           { return this.tookInMillis; }
+    public String getNormalizedURL()        { return this.normalizedUrl; }
+    public List<String> getUrlParts()       { return this.urlParts; }
+
+    public LinkUnwinder(String originalURL) {
+        this.originalURL = originalURL;
+    }
+
+    public void run() {
+        // we are going to try three times just incase we catch the service off-guard
+        // this is mainly to help us with our tests.
+        for(int i = 0; (i < 3) && this.finalURL == null ; i++) {
+            if(this.status != LinkStatus.SUCCESS)
+                unwindLink(this.originalURL);
+        }
+        this.finalURL = cleanURL(this.finalURL);
+        this.normalizedUrl = normalizeURL(this.finalURL);
+        this.urlParts = tokenizeURL(this.normalizedUrl);
+
+        this.updateTookInMillis();
+    }
+
+    protected void updateTookInMillis() {
+        this.tookInMillis = new Date().getTime() - this.startTime.getTime();
+    }
+
+    public void unwindLink(String url)
+    {
+        // Check to see if they wound up in a redirect loop
+        if((this.redirectCount > 0 && (this.originalURL.equals(url) || this.redirects.contains(url))) || (this.redirectCount > MAX_ALLOWED_REDIRECTS))
+        {
+            this.status = LinkStatus.LOOP;
+            return;
+        }
+
+        if(!this.originalURL.equals(url))
+            this.redirects.add(url);
+
+        HttpURLConnection connection = null;
+
+        try
+        {
+            URL thisURL = new URL(url);
+            connection = (HttpURLConnection)new URL(url).openConnection();
+
+            // now we are going to pretend that we are a browser...
+            // This is the way my mac works.
+            if(!BOTS_ARE_OK.contains(thisURL.getHost()))
+            {
+                connection.addRequestProperty("Host", thisURL.getHost());
+                connection.addRequestProperty("Connection", "Keep-Alive");
+                connection.addRequestProperty("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.48 Safari/537.36");
+                connection.addRequestProperty("Accept-Language", "en-US,en;q=0.8,zh;q=0.6");
+
+                // the test to seattlemamadoc.com prompted this change.
+                // they auto detect bots by checking the referrer chain and the 'user-agent'
+                // this broke the t.co test. t.co URLs are EXPLICITLY ok with bots
+                // there is a list for URLS that behave this way at the top in BOTS_ARE_OK
+                // smashew 2013-13-2013
+
+                if(this.redirectCount > 0 && BOTS_ARE_OK.contains(thisURL.getHost()))
+                    connection.addRequestProperty("Referrer", this.originalURL);
+            }
+
+            connection.setReadTimeout(DEFAULT_HTTP_TIMEOUT);
+            connection.setConnectTimeout(DEFAULT_HTTP_TIMEOUT);
+
+            connection.setInstanceFollowRedirects(false);
+
+            if(this.cookies != null)
+                for (String cookie : cookies)
+                    connection.addRequestProperty("Cookie", cookie.split(";", 1)[0]);
+
+            connection.connect();
+
+            this.finalResponseCode = connection.getResponseCode();
+
+            /**************
+             *
+             */
+            Map<String,List<String>> headers = createCaseInsenitiveMap(connection.getHeaderFields());
+            /******************************************************************
+             * If they want us to set cookies, well, then we will set cookies
+             * Example URL:
+             * http://nyti.ms/1bCpesx
+             *****************************************************************/
+            if(headers.containsKey(SET_COOKIE_IDENTIFIER))
+                this.cookies = headers.get(SET_COOKIE_IDENTIFIER);
+
+            switch (this.finalResponseCode)
+            {
+                case 200: // HTTP OK
+                    this.finalURL = connection.getURL().toString();
+                    this.domain = new URL(this.finalURL).getHost();
+                    this.status = LinkStatus.SUCCESS;
+                    break;
+                case 300: // Multiple choices
+                case 301: // URI has been moved permanently
+                case 302: // Found
+                case 303: // Primarily for a HTTP Post
+                case 304: // Not Modified
+                case 306: // This status code is unused but in the redirect block.
+                case 307: // Temporary re-direct
+                    /*******************************************************************
+                     * Author:
+                     * Smashew
+                     *
+                     * Date: 2013-11-15
+                     *
+                     * Note:
+                     * It is possible that we have already found our final URL. In
+                     * the event that we have found our final URL, we are going to
+                     * save this URL as long as it isn't the original URL.
+                     * We are still going to ask the browser to re-direct, but in the
+                     * case of yet another redirect, seen with the redbull test
+                     * this can be followed by a 304, a browser, by W3C standards would
+                     * still render the page with it's content, but for us to assert
+                     * a success, we are really hoping for a 304 message.
+                     *******************************************************************/
+                    if(!this.originalURL.toLowerCase().equals(connection.getURL().toString().toLowerCase()))
+                        this.finalURL = connection.getURL().toString();
+                    if(!headers.containsKey(LOCATION_IDENTIFIER))
+                    {
+                        LOGGER.info("Headers: {}", headers);
+                        this.status = LinkStatus.REDIRECT_ERROR;
+                    }
+                    else
+                    {
+                        this.wasRedirected = true;
+                        this.redirectCount++;
+                        unwindLink(connection.getHeaderField(LOCATION_IDENTIFIER));
+                    }
+                    break;
+                case 305: // User must use the specified proxy (deprecated by W3C)
+                    break;
+                case 401: // Unauthorized (nothing we can do here)
+                    this.status = LinkStatus.UNAUTHORIZED;
+                    break;
+                case 403: // HTTP Forbidden (Nothing we can do here)
+                    this.status = LinkStatus.FORBIDDEN;
+                    break;
+                case 404: // Not Found (Page is not found, nothing we can do with a 404)
+                    this.status = LinkStatus.NOT_FOUND;
+                    break;
+                case 500: // Internal Server Error
+                case 501: // Not Implemented
+                case 502: // Bad Gateway
+                case 503: // Service Unavailable
+                case 504: // Gateway Timeout
+                case 505: // Version not supported
+                    this.status = LinkStatus.HTTP_ERROR_STATUS;
+                    break;
+                default:
+                    LOGGER.info("Unrecognized HTTP Response Code: {}", this.finalResponseCode);
+                    this.status = LinkStatus.NOT_FOUND;
+                    break;
+            }
+        }
+        catch (MalformedURLException e)
+        {
+            // the URL is trash, so, it can't load it.
+            this.status = LinkStatus.MALFORMED_URL;
+        }
+        catch (IOException ex)
+        {
+            // there was an issue we are going to set to error.
+            this.status = LinkStatus.ERROR;
+        }
+        catch (Exception ex)
+        {
+            // there was an unknown issue we are going to set to exception.
+            this.status = LinkStatus.EXCEPTION;
+        }
+        finally
+        {
+            if (connection != null)
+                connection.disconnect();
+        }
+    }
+
+    private Map<String,List<String>> createCaseInsenitiveMap(Map<String,List<String>> input) {
+        Map<String,List<String>> toReturn = new HashMap<String, List<String>>();
+        for(String k : input.keySet())
+            if(k != null && input.get(k) != null)
+                toReturn.put(k.toLowerCase(), input.get(k));
+        return toReturn;
+    }
+
+    private String cleanURL(String url)
+    {
+        // If they pass us a null URL then we are going to pass that right back to them.
+        if(url == null)
+            return null;
+
+        // remember how big the URL was at the start
+        int startLength = url.length();
+
+        // Iterate through all the known URL parameters of tracking URLs
+        for(String pattern : URL_TRACKING_TO_REMOVE)
+            url = url.replaceAll(pattern, "");
+
+        // If the URL is smaller than when it came in. Then it had tracking information
+        if(url.length() < startLength)
+            this.isTracked = true;
+
+        // return our url.
+        return url;
+    }
+
+    /**
+     * Removes the protocol, if it exists, from the front and
+     * removes any random encoding characters
+     * Extend this to do other url cleaning/pre-processing
+     * @param url - The String URL to normalize
+     * @return normalizedUrl - The String URL that has no junk or surprises
+     */
+    public static String normalizeURL(String url)
+    {
+        // Decode URL to remove any %20 type stuff
+        String normalizedUrl = url;
+        try {
+            // I've used a URLDecoder that's part of Java here,
+            // but this functionality exists in most modern languages
+            // and is universally called url decoding
+            normalizedUrl = URLDecoder.decode(url, "UTF-8");
+        }
+        catch(UnsupportedEncodingException uee)
+        {
+            System.err.println("Unable to Decode URL. Decoding skipped.");
+            uee.printStackTrace();
+        }
+
+        // Remove the protocol, http:// ftp:// or similar from the front
+        if (normalizedUrl.contains("://"))
+            normalizedUrl = normalizedUrl.split(":/{2}")[1];
+
+        // Room here to do more pre-processing
+
+        return normalizedUrl;
+    }
+
+    /**
+     * Goal is to get the different parts of the URL path. This can be used
+     * in a classifier to help us determine if we are working with
+     *
+     * Reference:
+     * http://stackoverflow.com/questions/10046178/pattern-matching-for-url-classification
+     * @param url - Url to be tokenized
+     * @return tokens - A String array of all the tokens
+     */
+    public static List<String> tokenizeURL(String url)
+    {
+        url = normalizeURL(url);
+        // I assume that we're going to use the whole URL to find tokens in
+        // If you want to just look in the GET parameters, or you want to ignore the domain
+        // or you want to use the domain as a token itself, that would have to be
+        // processed above the next line, and only the remaining parts split
+        List<String> toReturn = new ArrayList<String>();
+
+        // Split the URL by forward slashes. Most modern browsers will accept a URL
+        // this malformed such as http://www.smashew.com/hello//how////are/you
+        // hence the '+' in the regular expression.
+        for(String part: url.split("/+"))
+            toReturn.add(part.toLowerCase());
+
+        // return our object.
+        return toReturn;
+
+        // One could alternatively use a more complex regex to remove more invalid matches
+        // but this is subject to your (?:in)?ability to actually write the regex you want
+
+        // These next two get rid of tokens that are too short, also.
+
+        // Destroys anything that's not alphanumeric and things that are
+        // alphanumeric but only 1 character long
+        //String[] tokens = url.split("(?:[\\W_]+\\w)*[\\W_]+");
+
+        // Destroys anything that's not alphanumeric and things that are
+        // alphanumeric but only 1 or 2 characters long
+        //String[] tokens = url.split("(?:[\\W_]+\\w{1,2})*[\\W_]+");
+    }
+
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9e757aed/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkUnwinderProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkUnwinderProcessor.java b/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkUnwinderProcessor.java
new file mode 100644
index 0000000..45ec04d
--- /dev/null
+++ b/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkUnwinderProcessor.java
@@ -0,0 +1,76 @@
+package org.apache.streams.urls;
+
+import com.google.common.collect.Lists;
+import org.apache.streams.urls.Link;
+import org.apache.streams.urls.LinkUnwinder;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.pojo.json.Activity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+/**
+ * References:
+ * Some helpful references to help
+ * Purpose              URL
+ * -------------        ----------------------------------------------------------------
+ * [Status Codes]       http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html
+ * [Test Cases]         http://greenbytes.de/tech/tc/httpredirects/
+ * [t.co behavior]      https://dev.twitter.com/docs/tco-redirection-behavior
+ */
+
+public class LinkUnwinderProcessor implements StreamsProcessor
+{
+    private final static String STREAMS_ID = "LinkUnwinderProcessor";
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(LinkUnwinderProcessor.class);
+
+
+
+    @Override
+    public List<StreamsDatum> process(StreamsDatum entry) {
+
+        List<StreamsDatum> result = Lists.newArrayList();
+
+        LOGGER.debug("{} processing {}", STREAMS_ID, entry.getDocument().getClass());
+
+        // get list of shared urls
+        if( entry.getDocument() instanceof Activity) {
+            Activity activity = (Activity) entry.getDocument();
+            List<String> inputLinks = activity.getLinks();
+            List<String> outputLinks = Lists.newArrayList();
+            for( String link : inputLinks ) {
+                try {
+                    LinkUnwinder unwinder = new LinkUnwinder((String)link);
+                    unwinder.run();
+                    if( !unwinder.isFailure()) {
+                        outputLinks.add(unwinder.getFinalURL());
+                    }
+                } catch (Exception e) {
+                    //if unwindable drop
+                    LOGGER.debug("Failed to unwind link : {}", link);
+                    LOGGER.debug("Excpetion unwind link : {}", e);
+                }
+            }
+            activity.setLinks(outputLinks);
+            entry.setDocument(activity);
+            result.add(entry);
+
+            return result;
+        }
+        else throw new NotImplementedException();
+    }
+
+    @Override
+    public void prepare(Object o) {
+    }
+
+    @Override
+    public void cleanUp() {
+
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9e757aed/streams-contrib/streams-processor-urls/src/test/java/org/apache/streams/urls/TestLinkUnwinderProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-urls/src/test/java/org/apache/streams/urls/TestLinkUnwinderProcessor.java b/streams-contrib/streams-processor-urls/src/test/java/org/apache/streams/urls/TestLinkUnwinderProcessor.java
new file mode 100644
index 0000000..94ae2d2
--- /dev/null
+++ b/streams-contrib/streams-processor-urls/src/test/java/org/apache/streams/urls/TestLinkUnwinderProcessor.java
@@ -0,0 +1,76 @@
+package org.apache.streams.urls;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.jackson.StreamsJacksonModule;
+import org.apache.streams.pojo.json.Activity;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import java.util.List;
+import java.util.Scanner;
+
+/**
+ * Created by rebanks on 2/27/14.
+ */
+public class TestLinkUnwinderProcessor {
+
+    private static String activityString;
+
+    @Test
+    public void testActivityLinkUnwinderProcessorBitly() throws Exception{
+        testActivityUnwinderHelper(Lists.newArrayList("http://bit.ly/1cX5Rh4"), Lists.newArrayList("http://www.wcgworld.com/"));
+    }
+
+    @Test
+    public void testActivityLinkUnwinderProcessorGoogle() throws Exception{
+        testActivityUnwinderHelper(Lists.newArrayList("http://goo.gl/wSrHDA"), Lists.newArrayList("http://www.wcgworld.com/"));
+    }
+
+    @Test
+    public void testActivityLinkUnwinderProcessorOwly() throws Exception{
+        testActivityUnwinderHelper(Lists.newArrayList("http://ow.ly/u4Kte"), Lists.newArrayList("http://www.wcgworld.com/"));
+    }
+
+    @Test
+    public void testActivityLinkUnwinderProcessorGoDaddy() throws Exception{
+        testActivityUnwinderHelper(Lists.newArrayList("http://x.co/3yapt"), Lists.newArrayList("http://www.wcgworld.com/"));
+    }
+
+    @Test
+    public void testActivityLinkUnwinderProcessorMulti() throws Exception{
+        testActivityUnwinderHelper(Lists.newArrayList("http://x.co/3yapt", "http://ow.ly/u4Kte", "http://goo.gl/wSrHDA"), Lists.newArrayList("http://www.wcgworld.com/", "http://www.wcgworld.com/", "http://www.wcgworld.com/"));
+    }
+
+    @Test
+    public void testActivityLinkUnwinderProcessorUnwindable() throws Exception{
+        testActivityUnwinderHelper(Lists.newArrayList("http://bit.ly/1cX5Rh4", "http://nope@#$%"), Lists.newArrayList("http://www.wcgworld.com/"));
+    }
+
+    public void testActivityUnwinderHelper(List<String> input, List<String> expected) throws Exception{
+        ObjectMapper mapper = new ObjectMapper();
+        mapper.disable(com.fasterxml.jackson.databind.SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+        mapper.registerModule(new StreamsJacksonModule());
+        Activity activity = new Activity();
+        activity.setLinks(input);
+        StreamsDatum datum = new StreamsDatum(activity);
+        LinkUnwinderProcessor processor = new LinkUnwinderProcessor();
+        processor.prepare(null);
+        List<StreamsDatum> result = processor.process(datum);
+        assertNotNull(result);
+        assertEquals(1, result.size());
+        StreamsDatum resultDatum = result.get(0);
+        assertNotNull(resultDatum);
+        assertTrue(resultDatum.getDocument() instanceof Activity);
+        Activity resultActivity = (Activity) resultDatum.getDocument();
+        assertNotNull(resultActivity.getLinks());
+        List<String> resultLinks = resultActivity.getLinks();
+        assertEquals(expected.size(), resultLinks.size());
+        assertEquals(Sets.newHashSet(expected), Sets.newHashSet(resultLinks));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9e757aed/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java
index 455a579..de65636 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java
@@ -159,8 +159,8 @@ public class DatasiftActivitySerializer implements ActivitySerializer<Datasift>,
         return actObj;
     }
 
-    public static List<Object> getLinks(Interaction interaction) {
-        List<Object> links = Lists.newArrayList();
+    public static List<String> getLinks(Interaction interaction) {
+        List<String> links = Lists.newArrayList();
         return links;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9e757aed/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java
new file mode 100644
index 0000000..fceff2c
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java
@@ -0,0 +1,124 @@
+package org.apache.streams.twitter.serializer;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.AnnotationIntrospector;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.jackson.StreamsJacksonModule;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.Provider;
+import org.apache.streams.twitter.pojo.Delete;
+import org.apache.streams.twitter.pojo.Retweet;
+import org.apache.streams.twitter.pojo.Tweet;
+import org.apache.streams.twitter.provider.TwitterEventClassifier;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.ISODateTimeFormat;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by sblackmon on 3/26/14.
+ */
+public class TwitterJsonActivitySerializer implements ActivitySerializer<String>
+{
+
+    public TwitterJsonActivitySerializer() {
+
+    }
+
+    public static final DateTimeFormatter TWITTER_FORMAT = DateTimeFormat.forPattern("EEE MMM dd HH:mm:ss Z yyyy");
+    public static final DateTimeFormatter ACTIVITY_FORMAT = ISODateTimeFormat.basicDateTime();
+
+    public static ObjectMapper mapper;
+    static {
+        mapper = new ObjectMapper();
+        mapper.disable(com.fasterxml.jackson.databind.SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+        mapper.registerModule(new StreamsJacksonModule() {
+            {
+                addDeserializer(DateTime.class, new StdDeserializer<DateTime>(DateTime.class) {
+                    @Override
+                    public DateTime deserialize(JsonParser jpar, DeserializationContext context) throws IOException, JsonProcessingException {
+                        return TWITTER_FORMAT.parseDateTime(jpar.getValueAsString());
+                    }
+                });
+            }
+        });
+        //AnnotationIntrospector introspector = new JaxbAnnotationIntrospector(mapper.getTypeFactory());
+        //mapper.setAnnotationIntrospector(introspector);
+        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.TRUE);
+        mapper.configure(DeserializationFeature.FAIL_ON_INVALID_SUBTYPE, Boolean.TRUE);
+        mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
+        mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
+        mapper.configure(DeserializationFeature.WRAP_EXCEPTIONS, Boolean.FALSE);
+        mapper.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, Boolean.TRUE);
+        mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
+
+    }
+
+    TwitterJsonTweetActivitySerializer tweetActivitySerializer = new TwitterJsonTweetActivitySerializer();
+    TwitterJsonRetweetActivitySerializer retweetActivitySerializer = new TwitterJsonRetweetActivitySerializer();
+    TwitterJsonDeleteActivitySerializer deleteActivitySerializer = new TwitterJsonDeleteActivitySerializer();
+
+    @Override
+    public String serializationFormat() {
+        return null;
+    }
+
+    @Override
+    public String serialize(Activity deserialized) throws ActivitySerializerException {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public Activity deserialize(String serialized) throws ActivitySerializerException {
+
+        Class documentSubType = TwitterEventClassifier.detectClass(serialized);
+
+        Activity activity;
+        if( documentSubType == Tweet.class )
+            activity = tweetActivitySerializer.deserialize(serialized);
+        else if( documentSubType == Retweet.class )
+            activity = retweetActivitySerializer.deserialize(serialized);
+        else if( documentSubType == Delete.class )
+            activity = deleteActivitySerializer.deserialize(serialized);
+        else throw new ActivitySerializerException("unrecognized type");
+
+        return activity;
+    }
+
+    @Override
+    public List<Activity> deserializeAll(List<String> serializedList) {
+        throw new NotImplementedException();
+    }
+
+    public static Provider getProvider() {
+        Provider provider = new Provider();
+        provider.setId("id:providers:twitter");
+        return provider;
+    }
+
+    public static void addTwitterExtension(Activity activity, ObjectNode event) {
+        Map<String, Object> extensions = org.apache.streams.data.util.ActivityUtil.ensureExtensions(activity);
+        extensions.put("twitter", event);
+    }
+
+    public static String formatId(String... idparts) {
+        return Joiner.on(":").join(Lists.asList("id:twitter", idparts));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9e757aed/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
index 8bcb60b..a038792 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
@@ -81,8 +81,6 @@ public class TwitterJsonTweetActivitySerializer implements ActivitySerializer<St
         activity.setUrl("http://twitter.com/" + tweet.getIdStr());
         activity.setLinks(getLinks(tweet));
 
-        System.out.println("12");
-
         addTwitterExtension(activity, TwitterJsonActivitySerializer.mapper.convertValue(tweet, ObjectNode.class));
         addLocationExtension(activity, tweet);
         return activity;
@@ -109,8 +107,8 @@ public class TwitterJsonTweetActivitySerializer implements ActivitySerializer<St
         return actor;
     }
 
-    public static List<Object> getLinks(Tweet tweet) {
-        List<Object> links = Lists.newArrayList();
+    public static List<String> getLinks(Tweet tweet) {
+        List<String> links = Lists.newArrayList();
         if( tweet.getEntities().getUrls() != null ) {
             for (Url url : tweet.getEntities().getUrls()) {
                 links.add(url.getExpandedUrl());

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9e757aed/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetActivitySerDeTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetActivitySerDeTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetActivitySerDeTest.java
new file mode 100644
index 0000000..494f698
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetActivitySerDeTest.java
@@ -0,0 +1,118 @@
+package org.apache.streams.twitter.test;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.lang.StringUtils;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.twitter.pojo.Retweet;
+import org.apache.streams.twitter.pojo.Tweet;
+import org.apache.streams.twitter.provider.TwitterEventClassifier;
+import org.apache.streams.twitter.serializer.TwitterJsonActivitySerializer;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+import static java.util.regex.Pattern.matches;
+import static org.hamcrest.CoreMatchers.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+* Created with IntelliJ IDEA.
+* User: sblackmon
+* Date: 8/20/13
+* Time: 5:57 PM
+* To change this template use File | Settings | File Templates.
+*/
+public class TweetActivitySerDeTest {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(TweetActivitySerDeTest.class);
+
+    private TwitterJsonActivitySerializer twitterJsonActivitySerializer = new TwitterJsonActivitySerializer();
+
+    //    @Ignore
+    @Test
+    public void Tests()
+    {
+        InputStream is = TweetActivitySerDeTest.class.getResourceAsStream("/testtweets.txt");
+        InputStreamReader isr = new InputStreamReader(is);
+        BufferedReader br = new BufferedReader(isr);
+
+        try {
+            while (br.ready()) {
+                String line = br.readLine();
+                if(!StringUtils.isEmpty(line))
+                {
+                    LOGGER.info("raw: {}", line);
+
+                    Class detected = TwitterEventClassifier.detectClass(line);
+
+                    Activity activity = twitterJsonActivitySerializer.deserialize(line);
+
+                    String activitystring = TwitterJsonActivitySerializer.mapper.writeValueAsString(activity);
+
+                    LOGGER.info("activity: {}", activitystring);
+
+                    assertThat(activity, is(not(nullValue())));
+
+                    assertThat(activity.getId(), is(not(nullValue())));
+                    assertThat(activity.getActor(), is(not(nullValue())));
+                    assertThat(activity.getActor().getId(), is(not(nullValue())));
+                    assertThat(activity.getVerb(), is(not(nullValue())));
+                    assertThat(activity.getProvider(), is(not(nullValue())));
+
+                    if( detected == Tweet.class ) {
+                        assertThat(activity.getObject(), is(nullValue()));
+
+                        assertEquals(activity.getVerb(), "post");
+
+                        Tweet tweet = TwitterJsonActivitySerializer.mapper.readValue(line, Tweet.class);
+
+                        if( tweet.getEntities() != null &&
+                            tweet.getEntities().getUrls() != null &&
+                            tweet.getEntities().getUrls().size() > 0 ) {
+
+
+                            assertThat(activity.getLinks(), is(not(nullValue())));
+                            assertEquals(tweet.getEntities().getUrls().size(), activity.getLinks().size());
+                        }
+
+                    } else if( detected == Retweet.class ) {
+
+                        Retweet retweet = TwitterJsonActivitySerializer.mapper.readValue(line, Retweet.class);
+
+                        assertThat(retweet.getRetweetedStatus(), is(not(nullValue())));
+
+                        assertEquals(activity.getVerb(), "share");
+
+                        assertThat(activity.getObject(), is(not(nullValue())));
+                        assertThat(activity.getObject().getObjectType(), is(not(nullValue())));
+                        assertThat(activity.getObject().getObjectType(), is(not(nullValue())));
+
+                        if( retweet.getRetweetedStatus().getEntities() != null &&
+                            retweet.getRetweetedStatus().getEntities().getUrls() != null &&
+                            retweet.getRetweetedStatus().getEntities().getUrls().size() > 0 ) {
+
+                            assertThat(activity.getLinks(), is(not(nullValue())));
+                            assertEquals(retweet.getRetweetedStatus().getEntities().getUrls().size(), activity.getLinks().size());
+                        }
+
+                    }
+
+
+
+                }
+            }
+        } catch( Exception e ) {
+            System.out.println(e);
+            e.printStackTrace();
+            Assert.fail();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9e757aed/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java
index c6dc0ad..bc7bcf7 100644
--- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java
@@ -3,6 +3,7 @@ package org.apache.streams.twitter.test;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Optional;
 import org.apache.commons.lang.StringUtils;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.twitter.pojo.Delete;
@@ -21,6 +22,7 @@ import java.io.InputStreamReader;
 
 import static java.util.regex.Pattern.matches;
 import static org.hamcrest.CoreMatchers.*;
+import static org.hamcrest.Matchers.greaterThan;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 
@@ -50,6 +52,9 @@ public class TweetSerDeTest {
         InputStreamReader isr = new InputStreamReader(is);
         BufferedReader br = new BufferedReader(isr);
 
+        int tweetlinks = 0;
+        int retweetlinks = 0;
+
         try {
             while (br.ready()) {
                 String line = br.readLine();
@@ -72,6 +77,8 @@ public class TweetSerDeTest {
                         assertThat(tweet.getText(), is(not(nullValue())));
                         assertThat(tweet.getUser(), is(not(nullValue())));
 
+                        tweetlinks += Optional.fromNullable(tweet.getEntities().getUrls().size()).or(0);
+
                     } else if( detected == Retweet.class ) {
 
                         Retweet retweet = mapper.convertValue(event, Retweet.class);
@@ -83,6 +90,8 @@ public class TweetSerDeTest {
                         assertThat(retweet.getRetweetedStatus().getUser().getId(), is(not(nullValue())));
                         assertThat(retweet.getRetweetedStatus().getUser().getCreatedAt(), is(not(nullValue())));
 
+                        retweetlinks += Optional.fromNullable(retweet.getRetweetedStatus().getEntities().getUrls().size()).or(0);
+
                     } else if( detected == Delete.class ) {
 
                         Delete delete = mapper.convertValue(event, Delete.class);
@@ -95,6 +104,7 @@ public class TweetSerDeTest {
                     } else {
                         Assert.fail();
                     }
+
                 }
             }
         } catch( Exception e ) {
@@ -102,5 +112,9 @@ public class TweetSerDeTest {
             e.printStackTrace();
             Assert.fail();
         }
+
+        assertThat(tweetlinks, is(greaterThan(0)));
+        assertThat(retweetlinks, is(greaterThan(0)));
+
     }
 }