You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/05/05 20:53:50 UTC
[03/52] [abbrv] git commit: making activity.links an array of strings,
so it's useful adding processor-urls, still debugging
making activity.links an array of strings, so it's useful
adding processor-urls, still debugging
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/9e757aed
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/9e757aed
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/9e757aed
Branch: refs/heads/sblackmon
Commit: 9e757aed94d61c93391af2da1f7f3789dbcc1d93
Parents: b59bcd2
Author: sblackmon <sb...@w2odigital.com>
Authored: Fri Mar 28 21:26:49 2014 -0400
Committer: sblackmon <sb...@w2odigital.com>
Committed: Fri Mar 28 21:26:49 2014 -0400
----------------------------------------------------------------------
streams-contrib/pom.xml | 2 +-
streams-contrib/streams-processor-urls/pom.xml | 65 ++
.../main/java/org/apache/streams/urls/Link.java | 57 ++
.../org/apache/streams/urls/LinkUnwinder.java | 372 ++++++++++
.../streams/urls/LinkUnwinderProcessor.java | 76 ++
.../streams/urls/TestLinkUnwinderProcessor.java | 76 ++
.../serializer/DatasiftActivitySerializer.java | 4 +-
.../TwitterJsonActivitySerializer.java | 124 ++++
.../TwitterJsonTweetActivitySerializer.java | 6 +-
.../twitter/test/TweetActivitySerDeTest.java | 118 ++++
.../streams/twitter/test/TweetSerDeTest.java | 14 +
.../src/test/resources/testtweets.txt | 695 +++++++++++++++++++
.../jackson/StreamsDateTimeDeserializer.java | 32 +
.../jackson/StreamsDateTimeSerializer.java | 28 +
.../streams/jackson/StreamsJacksonModule.java | 16 +
.../org/apache/streams/pojo/json/activity.json | 4 +-
16 files changed, 1681 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9e757aed/streams-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml
index 4f7a22f..67948c2 100644
--- a/streams-contrib/pom.xml
+++ b/streams-contrib/pom.xml
@@ -43,7 +43,7 @@
<module>streams-persist-hdfs</module>
<module>streams-persist-kafka</module>
<module>streams-persist-mongo</module>
- <module>streams-processor-urlredirect</module>
+ <module>streams-processor-urls</module>
<module>streams-provider-datasift</module>
<module>streams-provider-facebook</module>
<module>streams-provider-google</module>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9e757aed/streams-contrib/streams-processor-urls/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-urls/pom.xml b/streams-contrib/streams-processor-urls/pom.xml
new file mode 100644
index 0000000..966155f
--- /dev/null
+++ b/streams-contrib/streams-processor-urls/pom.xml
@@ -0,0 +1,65 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>streams-processor-urls</artifactId>
+ <version>0.1-SNAPSHOT</version>
+
+ <parent>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-contrib</artifactId>
+ <version>0.1-SNAPSHOT</version>
+ </parent>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-config</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-pojo</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jsonschema2pojo</groupId>
+ <artifactId>jsonschema2pojo-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <sourceDirectory>src/main/java</sourceDirectory>
+ <testSourceDirectory>src/test/java</testSourceDirectory>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ </resource>
+ </resources>
+ <testResources>
+ <testResource>
+ <directory>src/test/resources</directory>
+ </testResource>
+ </testResources>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9e757aed/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/Link.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/Link.java b/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/Link.java
new file mode 100644
index 0000000..ec282f5
--- /dev/null
+++ b/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/Link.java
@@ -0,0 +1,57 @@
+package org.apache.streams.urls;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+
+public interface Link
+{
+ @JsonProperty("status")
+ public LinkStatus getStatus();
+
+ @JsonProperty("originalUrl")
+ public String getOriginalURL();
+
+ @JsonProperty("wasRedirected")
+ public boolean wasRedirected();
+
+ @JsonProperty("finalUrl")
+ public String getFinalURL();
+
+ @JsonProperty("domain")
+ public String getDomain();
+
+ @JsonProperty("normalizedUrl")
+ public String getNormalizedURL();
+
+ @JsonProperty("urlParts")
+ public List<String> getUrlParts();
+
+ @JsonProperty("finalStatusCode")
+ public String getFinalResponseCode();
+
+ @JsonProperty("isTracked")
+ public boolean isTracked();
+
+ @JsonProperty("redirects")
+ public List<String> getRedirects();
+
+ @JsonProperty("tookInMillis")
+ public long getTookInMillis();
+
+ public void run();
+
+ public enum LinkStatus {
+ SUCCESS,
+ ERROR,
+ MALFORMED_URL,
+ NOT_FOUND,
+ FORBIDDEN,
+ REDIRECT_ERROR,
+ UNAUTHORIZED,
+ LOOP,
+ HTTP_ERROR_STATUS,
+ EXCEPTION
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9e757aed/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkUnwinder.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkUnwinder.java b/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkUnwinder.java
new file mode 100644
index 0000000..a4a28f1
--- /dev/null
+++ b/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkUnwinder.java
@@ -0,0 +1,372 @@
+package org.apache.streams.urls;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.HttpURLConnection;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLDecoder;
+import java.util.*;
+
+/**
+ * References:
+ * Some helpful references to help
+ * Purpose URL
+ * ------------- ----------------------------------------------------------------
+ * [Status Codes] http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html
+ * [Test Cases] http://greenbytes.de/tech/tc/httpredirects/
+ * [t.co behavior] https://dev.twitter.com/docs/tco-redirection-behavior
+ */
+
+public class LinkUnwinder implements Link
+{
+ private final static Logger LOGGER = LoggerFactory.getLogger(LinkUnwinder.class);
+
+ private static final int MAX_ALLOWED_REDIRECTS = 30;
+ private static final int DEFAULT_HTTP_TIMEOUT = 5000; //originally 30000
+ private static final String LOCATION_IDENTIFIER = "location";
+ private static final String SET_COOKIE_IDENTIFIER = "set-cookie";
+
+ private Date startTime = new Date();
+ private String originalURL;
+ private LinkStatus status;
+ private String finalURL;
+ private String domain;
+ private boolean wasRedirected;
+ private List<String> redirects = new ArrayList<String>();
+ private boolean isTracked = false;
+ private int finalResponseCode;
+ private Collection<String> cookies;
+
+ private String normalizedUrl;
+ private List<String> urlParts;
+
+ private int redirectCount = 0;
+ private long tookInMillis = 0;
+
+ private static final Collection<String> BOTS_ARE_OK = new ArrayList<String>() {{
+ add("t.co");
+ }};
+
+ private static final Collection<String> URL_TRACKING_TO_REMOVE = new ArrayList<String>() {{
+ /******************************************************************
+ * Google uses parameters in the URL string to track referrers
+ * on their Google Analytics and promotions. These are the
+ * identified URL patterns.
+ *
+ * URL:
+ * https://support.google.com/analytics/answer/1033867?hl=en
+ *****************************************************************/
+
+ // Required. Use utm_source to identify a search engine, newsletter name, or other source.
+ add("([\\?&])utm_source(=)[^&?]*");
+
+ // Required. Use utm_medium to identify a medium such as email or cost-per- click.
+ add("([\\?&])utm_medium(=)[^&?]*");
+
+ // Used for paid search. Use utm_term to note the keywords for this ad.
+ add("([\\?&])utm_term(=)[^&?]*");
+
+ // Used for A/B testing and content-targeted ads. Use utm_content to differentiate ads or links that point to the same
+ add("([\\?&])utm_content(=)[^&?]*");
+
+ // Used for keyword analysis. Use utm_campaign to identify a specific product promotion or strategic campaign.
+ add("([\\?&])utm_campaign(=)[^&?]*");
+ }};
+
+ public boolean isFailure() { return false; }
+ public String getOriginalURL() { return this.originalURL; }
+ public LinkStatus getStatus() { return this.status; }
+ public String getDomain() { return this.domain; }
+ public String getFinalURL() { return this.finalURL; }
+ public List<String> getRedirects() { return this.redirects; }
+ public boolean wasRedirected() { return this.wasRedirected; }
+ public boolean isTracked() { return this.isTracked; }
+ public String getFinalResponseCode() { return Integer.toString(this.finalResponseCode); }
+ public long getTookInMillis() { return this.tookInMillis; }
+ public String getNormalizedURL() { return this.normalizedUrl; }
+ public List<String> getUrlParts() { return this.urlParts; }
+
+ public LinkUnwinder(String originalURL) {
+ this.originalURL = originalURL;
+ }
+
+ public void run() {
+ // we are going to try three times just incase we catch the service off-guard
+ // this is mainly to help us with our tests.
+ for(int i = 0; (i < 3) && this.finalURL == null ; i++) {
+ if(this.status != LinkStatus.SUCCESS)
+ unwindLink(this.originalURL);
+ }
+ this.finalURL = cleanURL(this.finalURL);
+ this.normalizedUrl = normalizeURL(this.finalURL);
+ this.urlParts = tokenizeURL(this.normalizedUrl);
+
+ this.updateTookInMillis();
+ }
+
+ protected void updateTookInMillis() {
+ this.tookInMillis = new Date().getTime() - this.startTime.getTime();
+ }
+
+ public void unwindLink(String url)
+ {
+ // Check to see if they wound up in a redirect loop
+ if((this.redirectCount > 0 && (this.originalURL.equals(url) || this.redirects.contains(url))) || (this.redirectCount > MAX_ALLOWED_REDIRECTS))
+ {
+ this.status = LinkStatus.LOOP;
+ return;
+ }
+
+ if(!this.originalURL.equals(url))
+ this.redirects.add(url);
+
+ HttpURLConnection connection = null;
+
+ try
+ {
+ URL thisURL = new URL(url);
+ connection = (HttpURLConnection)new URL(url).openConnection();
+
+ // now we are going to pretend that we are a browser...
+ // This is the way my mac works.
+ if(!BOTS_ARE_OK.contains(thisURL.getHost()))
+ {
+ connection.addRequestProperty("Host", thisURL.getHost());
+ connection.addRequestProperty("Connection", "Keep-Alive");
+ connection.addRequestProperty("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.48 Safari/537.36");
+ connection.addRequestProperty("Accept-Language", "en-US,en;q=0.8,zh;q=0.6");
+
+ // the test to seattlemamadoc.com prompted this change.
+ // they auto detect bots by checking the referrer chain and the 'user-agent'
+ // this broke the t.co test. t.co URLs are EXPLICITLY ok with bots
+ // there is a list for URLS that behave this way at the top in BOTS_ARE_OK
+ // smashew 2013-13-2013
+
+ if(this.redirectCount > 0 && BOTS_ARE_OK.contains(thisURL.getHost()))
+ connection.addRequestProperty("Referrer", this.originalURL);
+ }
+
+ connection.setReadTimeout(DEFAULT_HTTP_TIMEOUT);
+ connection.setConnectTimeout(DEFAULT_HTTP_TIMEOUT);
+
+ connection.setInstanceFollowRedirects(false);
+
+ if(this.cookies != null)
+ for (String cookie : cookies)
+ connection.addRequestProperty("Cookie", cookie.split(";", 1)[0]);
+
+ connection.connect();
+
+ this.finalResponseCode = connection.getResponseCode();
+
+ /**************
+ *
+ */
+ Map<String,List<String>> headers = createCaseInsenitiveMap(connection.getHeaderFields());
+ /******************************************************************
+ * If they want us to set cookies, well, then we will set cookies
+ * Example URL:
+ * http://nyti.ms/1bCpesx
+ *****************************************************************/
+ if(headers.containsKey(SET_COOKIE_IDENTIFIER))
+ this.cookies = headers.get(SET_COOKIE_IDENTIFIER);
+
+ switch (this.finalResponseCode)
+ {
+ case 200: // HTTP OK
+ this.finalURL = connection.getURL().toString();
+ this.domain = new URL(this.finalURL).getHost();
+ this.status = LinkStatus.SUCCESS;
+ break;
+ case 300: // Multiple choices
+ case 301: // URI has been moved permanently
+ case 302: // Found
+ case 303: // Primarily for a HTTP Post
+ case 304: // Not Modified
+ case 306: // This status code is unused but in the redirect block.
+ case 307: // Temporary re-direct
+ /*******************************************************************
+ * Author:
+ * Smashew
+ *
+ * Date: 2013-11-15
+ *
+ * Note:
+ * It is possible that we have already found our final URL. In
+ * the event that we have found our final URL, we are going to
+ * save this URL as long as it isn't the original URL.
+ * We are still going to ask the browser to re-direct, but in the
+ * case of yet another redirect, seen with the redbull test
+ * this can be followed by a 304, a browser, by W3C standards would
+ * still render the page with it's content, but for us to assert
+ * a success, we are really hoping for a 304 message.
+ *******************************************************************/
+ if(!this.originalURL.toLowerCase().equals(connection.getURL().toString().toLowerCase()))
+ this.finalURL = connection.getURL().toString();
+ if(!headers.containsKey(LOCATION_IDENTIFIER))
+ {
+ LOGGER.info("Headers: {}", headers);
+ this.status = LinkStatus.REDIRECT_ERROR;
+ }
+ else
+ {
+ this.wasRedirected = true;
+ this.redirectCount++;
+ unwindLink(connection.getHeaderField(LOCATION_IDENTIFIER));
+ }
+ break;
+ case 305: // User must use the specified proxy (deprecated by W3C)
+ break;
+ case 401: // Unauthorized (nothing we can do here)
+ this.status = LinkStatus.UNAUTHORIZED;
+ break;
+ case 403: // HTTP Forbidden (Nothing we can do here)
+ this.status = LinkStatus.FORBIDDEN;
+ break;
+ case 404: // Not Found (Page is not found, nothing we can do with a 404)
+ this.status = LinkStatus.NOT_FOUND;
+ break;
+ case 500: // Internal Server Error
+ case 501: // Not Implemented
+ case 502: // Bad Gateway
+ case 503: // Service Unavailable
+ case 504: // Gateway Timeout
+ case 505: // Version not supported
+ this.status = LinkStatus.HTTP_ERROR_STATUS;
+ break;
+ default:
+ LOGGER.info("Unrecognized HTTP Response Code: {}", this.finalResponseCode);
+ this.status = LinkStatus.NOT_FOUND;
+ break;
+ }
+ }
+ catch (MalformedURLException e)
+ {
+ // the URL is trash, so, it can't load it.
+ this.status = LinkStatus.MALFORMED_URL;
+ }
+ catch (IOException ex)
+ {
+ // there was an issue we are going to set to error.
+ this.status = LinkStatus.ERROR;
+ }
+ catch (Exception ex)
+ {
+ // there was an unknown issue we are going to set to exception.
+ this.status = LinkStatus.EXCEPTION;
+ }
+ finally
+ {
+ if (connection != null)
+ connection.disconnect();
+ }
+ }
+
+ private Map<String,List<String>> createCaseInsenitiveMap(Map<String,List<String>> input) {
+ Map<String,List<String>> toReturn = new HashMap<String, List<String>>();
+ for(String k : input.keySet())
+ if(k != null && input.get(k) != null)
+ toReturn.put(k.toLowerCase(), input.get(k));
+ return toReturn;
+ }
+
+ private String cleanURL(String url)
+ {
+ // If they pass us a null URL then we are going to pass that right back to them.
+ if(url == null)
+ return null;
+
+ // remember how big the URL was at the start
+ int startLength = url.length();
+
+ // Iterate through all the known URL parameters of tracking URLs
+ for(String pattern : URL_TRACKING_TO_REMOVE)
+ url = url.replaceAll(pattern, "");
+
+ // If the URL is smaller than when it came in. Then it had tracking information
+ if(url.length() < startLength)
+ this.isTracked = true;
+
+ // return our url.
+ return url;
+ }
+
+ /**
+ * Removes the protocol, if it exists, from the front and
+ * removes any random encoding characters
+ * Extend this to do other url cleaning/pre-processing
+ * @param url - The String URL to normalize
+ * @return normalizedUrl - The String URL that has no junk or surprises
+ */
+ public static String normalizeURL(String url)
+ {
+ // Decode URL to remove any %20 type stuff
+ String normalizedUrl = url;
+ try {
+ // I've used a URLDecoder that's part of Java here,
+ // but this functionality exists in most modern languages
+ // and is universally called url decoding
+ normalizedUrl = URLDecoder.decode(url, "UTF-8");
+ }
+ catch(UnsupportedEncodingException uee)
+ {
+ System.err.println("Unable to Decode URL. Decoding skipped.");
+ uee.printStackTrace();
+ }
+
+ // Remove the protocol, http:// ftp:// or similar from the front
+ if (normalizedUrl.contains("://"))
+ normalizedUrl = normalizedUrl.split(":/{2}")[1];
+
+ // Room here to do more pre-processing
+
+ return normalizedUrl;
+ }
+
+ /**
+ * Goal is to get the different parts of the URL path. This can be used
+ * in a classifier to help us determine if we are working with
+ *
+ * Reference:
+ * http://stackoverflow.com/questions/10046178/pattern-matching-for-url-classification
+ * @param url - Url to be tokenized
+ * @return tokens - A String array of all the tokens
+ */
+ public static List<String> tokenizeURL(String url)
+ {
+ url = normalizeURL(url);
+ // I assume that we're going to use the whole URL to find tokens in
+ // If you want to just look in the GET parameters, or you want to ignore the domain
+ // or you want to use the domain as a token itself, that would have to be
+ // processed above the next line, and only the remaining parts split
+ List<String> toReturn = new ArrayList<String>();
+
+ // Split the URL by forward slashes. Most modern browsers will accept a URL
+ // this malformed such as http://www.smashew.com/hello//how////are/you
+ // hence the '+' in the regular expression.
+ for(String part: url.split("/+"))
+ toReturn.add(part.toLowerCase());
+
+ // return our object.
+ return toReturn;
+
+ // One could alternatively use a more complex regex to remove more invalid matches
+ // but this is subject to your (?:in)?ability to actually write the regex you want
+
+ // These next two get rid of tokens that are too short, also.
+
+ // Destroys anything that's not alphanumeric and things that are
+ // alphanumeric but only 1 character long
+ //String[] tokens = url.split("(?:[\\W_]+\\w)*[\\W_]+");
+
+ // Destroys anything that's not alphanumeric and things that are
+ // alphanumeric but only 1 or 2 characters long
+ //String[] tokens = url.split("(?:[\\W_]+\\w{1,2})*[\\W_]+");
+ }
+
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9e757aed/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkUnwinderProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkUnwinderProcessor.java b/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkUnwinderProcessor.java
new file mode 100644
index 0000000..45ec04d
--- /dev/null
+++ b/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkUnwinderProcessor.java
@@ -0,0 +1,76 @@
+package org.apache.streams.urls;
+
+import com.google.common.collect.Lists;
+import org.apache.streams.urls.Link;
+import org.apache.streams.urls.LinkUnwinder;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.pojo.json.Activity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+/**
+ * References:
+ * Some helpful references to help
+ * Purpose URL
+ * ------------- ----------------------------------------------------------------
+ * [Status Codes] http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html
+ * [Test Cases] http://greenbytes.de/tech/tc/httpredirects/
+ * [t.co behavior] https://dev.twitter.com/docs/tco-redirection-behavior
+ */
+
+public class LinkUnwinderProcessor implements StreamsProcessor
+{
+ private final static String STREAMS_ID = "LinkUnwinderProcessor";
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(LinkUnwinderProcessor.class);
+
+
+
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
+
+ List<StreamsDatum> result = Lists.newArrayList();
+
+ LOGGER.debug("{} processing {}", STREAMS_ID, entry.getDocument().getClass());
+
+ // get list of shared urls
+ if( entry.getDocument() instanceof Activity) {
+ Activity activity = (Activity) entry.getDocument();
+ List<String> inputLinks = activity.getLinks();
+ List<String> outputLinks = Lists.newArrayList();
+ for( String link : inputLinks ) {
+ try {
+ LinkUnwinder unwinder = new LinkUnwinder((String)link);
+ unwinder.run();
+ if( !unwinder.isFailure()) {
+ outputLinks.add(unwinder.getFinalURL());
+ }
+ } catch (Exception e) {
+ //if unwindable drop
+ LOGGER.debug("Failed to unwind link : {}", link);
+ LOGGER.debug("Excpetion unwind link : {}", e);
+ }
+ }
+ activity.setLinks(outputLinks);
+ entry.setDocument(activity);
+ result.add(entry);
+
+ return result;
+ }
+ else throw new NotImplementedException();
+ }
+
+ @Override
+ public void prepare(Object o) {
+ }
+
+ @Override
+ public void cleanUp() {
+
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9e757aed/streams-contrib/streams-processor-urls/src/test/java/org/apache/streams/urls/TestLinkUnwinderProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-urls/src/test/java/org/apache/streams/urls/TestLinkUnwinderProcessor.java b/streams-contrib/streams-processor-urls/src/test/java/org/apache/streams/urls/TestLinkUnwinderProcessor.java
new file mode 100644
index 0000000..94ae2d2
--- /dev/null
+++ b/streams-contrib/streams-processor-urls/src/test/java/org/apache/streams/urls/TestLinkUnwinderProcessor.java
@@ -0,0 +1,76 @@
+package org.apache.streams.urls;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.jackson.StreamsJacksonModule;
+import org.apache.streams.pojo.json.Activity;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import java.util.List;
+import java.util.Scanner;
+
+/**
+ * Created by rebanks on 2/27/14.
+ */
+public class TestLinkUnwinderProcessor {
+
+ private static String activityString;
+
+ @Test
+ public void testActivityLinkUnwinderProcessorBitly() throws Exception{
+ testActivityUnwinderHelper(Lists.newArrayList("http://bit.ly/1cX5Rh4"), Lists.newArrayList("http://www.wcgworld.com/"));
+ }
+
+ @Test
+ public void testActivityLinkUnwinderProcessorGoogle() throws Exception{
+ testActivityUnwinderHelper(Lists.newArrayList("http://goo.gl/wSrHDA"), Lists.newArrayList("http://www.wcgworld.com/"));
+ }
+
+ @Test
+ public void testActivityLinkUnwinderProcessorOwly() throws Exception{
+ testActivityUnwinderHelper(Lists.newArrayList("http://ow.ly/u4Kte"), Lists.newArrayList("http://www.wcgworld.com/"));
+ }
+
+ @Test
+ public void testActivityLinkUnwinderProcessorGoDaddy() throws Exception{
+ testActivityUnwinderHelper(Lists.newArrayList("http://x.co/3yapt"), Lists.newArrayList("http://www.wcgworld.com/"));
+ }
+
+ @Test
+ public void testActivityLinkUnwinderProcessorMulti() throws Exception{
+ testActivityUnwinderHelper(Lists.newArrayList("http://x.co/3yapt", "http://ow.ly/u4Kte", "http://goo.gl/wSrHDA"), Lists.newArrayList("http://www.wcgworld.com/", "http://www.wcgworld.com/", "http://www.wcgworld.com/"));
+ }
+
+ @Test
+ public void testActivityLinkUnwinderProcessorUnwindable() throws Exception{
+ testActivityUnwinderHelper(Lists.newArrayList("http://bit.ly/1cX5Rh4", "http://nope@#$%"), Lists.newArrayList("http://www.wcgworld.com/"));
+ }
+
+ public void testActivityUnwinderHelper(List<String> input, List<String> expected) throws Exception{
+ ObjectMapper mapper = new ObjectMapper();
+ mapper.disable(com.fasterxml.jackson.databind.SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+ mapper.registerModule(new StreamsJacksonModule());
+ Activity activity = new Activity();
+ activity.setLinks(input);
+ StreamsDatum datum = new StreamsDatum(activity);
+ LinkUnwinderProcessor processor = new LinkUnwinderProcessor();
+ processor.prepare(null);
+ List<StreamsDatum> result = processor.process(datum);
+ assertNotNull(result);
+ assertEquals(1, result.size());
+ StreamsDatum resultDatum = result.get(0);
+ assertNotNull(resultDatum);
+ assertTrue(resultDatum.getDocument() instanceof Activity);
+ Activity resultActivity = (Activity) resultDatum.getDocument();
+ assertNotNull(resultActivity.getLinks());
+ List<String> resultLinks = resultActivity.getLinks();
+ assertEquals(expected.size(), resultLinks.size());
+ assertEquals(Sets.newHashSet(expected), Sets.newHashSet(resultLinks));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9e757aed/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java
index 455a579..de65636 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java
@@ -159,8 +159,8 @@ public class DatasiftActivitySerializer implements ActivitySerializer<Datasift>,
return actObj;
}
- public static List<Object> getLinks(Interaction interaction) {
- List<Object> links = Lists.newArrayList();
+ public static List<String> getLinks(Interaction interaction) {
+ List<String> links = Lists.newArrayList();
return links;
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9e757aed/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java
new file mode 100644
index 0000000..fceff2c
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java
@@ -0,0 +1,124 @@
+package org.apache.streams.twitter.serializer;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.AnnotationIntrospector;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.jackson.StreamsJacksonModule;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.Provider;
+import org.apache.streams.twitter.pojo.Delete;
+import org.apache.streams.twitter.pojo.Retweet;
+import org.apache.streams.twitter.pojo.Tweet;
+import org.apache.streams.twitter.provider.TwitterEventClassifier;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.ISODateTimeFormat;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by sblackmon on 3/26/14.
+ */
+public class TwitterJsonActivitySerializer implements ActivitySerializer<String>
+{
+
+ public TwitterJsonActivitySerializer() {
+
+ }
+
+ public static final DateTimeFormatter TWITTER_FORMAT = DateTimeFormat.forPattern("EEE MMM dd HH:mm:ss Z yyyy");
+ public static final DateTimeFormatter ACTIVITY_FORMAT = ISODateTimeFormat.basicDateTime();
+
+ public static ObjectMapper mapper;
+ static {
+ mapper = new ObjectMapper();
+ mapper.disable(com.fasterxml.jackson.databind.SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+ mapper.registerModule(new StreamsJacksonModule() {
+ {
+ addDeserializer(DateTime.class, new StdDeserializer<DateTime>(DateTime.class) {
+ @Override
+ public DateTime deserialize(JsonParser jpar, DeserializationContext context) throws IOException, JsonProcessingException {
+ return TWITTER_FORMAT.parseDateTime(jpar.getValueAsString());
+ }
+ });
+ }
+ });
+ //AnnotationIntrospector introspector = new JaxbAnnotationIntrospector(mapper.getTypeFactory());
+ //mapper.setAnnotationIntrospector(introspector);
+ mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.TRUE);
+ mapper.configure(DeserializationFeature.FAIL_ON_INVALID_SUBTYPE, Boolean.TRUE);
+ mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
+ mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
+ mapper.configure(DeserializationFeature.WRAP_EXCEPTIONS, Boolean.FALSE);
+ mapper.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, Boolean.TRUE);
+ mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
+
+ }
+
+ TwitterJsonTweetActivitySerializer tweetActivitySerializer = new TwitterJsonTweetActivitySerializer();
+ TwitterJsonRetweetActivitySerializer retweetActivitySerializer = new TwitterJsonRetweetActivitySerializer();
+ TwitterJsonDeleteActivitySerializer deleteActivitySerializer = new TwitterJsonDeleteActivitySerializer();
+
+ @Override
+ public String serializationFormat() {
+ return null;
+ }
+
+ @Override
+ public String serialize(Activity deserialized) throws ActivitySerializerException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Activity deserialize(String serialized) throws ActivitySerializerException {
+
+ Class documentSubType = TwitterEventClassifier.detectClass(serialized);
+
+ Activity activity;
+ if( documentSubType == Tweet.class )
+ activity = tweetActivitySerializer.deserialize(serialized);
+ else if( documentSubType == Retweet.class )
+ activity = retweetActivitySerializer.deserialize(serialized);
+ else if( documentSubType == Delete.class )
+ activity = deleteActivitySerializer.deserialize(serialized);
+ else throw new ActivitySerializerException("unrecognized type");
+
+ return activity;
+ }
+
+ @Override
+ public List<Activity> deserializeAll(List<String> serializedList) {
+ throw new NotImplementedException();
+ }
+
+ public static Provider getProvider() {
+ Provider provider = new Provider();
+ provider.setId("id:providers:twitter");
+ return provider;
+ }
+
+ public static void addTwitterExtension(Activity activity, ObjectNode event) {
+ Map<String, Object> extensions = org.apache.streams.data.util.ActivityUtil.ensureExtensions(activity);
+ extensions.put("twitter", event);
+ }
+
+ public static String formatId(String... idparts) {
+ return Joiner.on(":").join(Lists.asList("id:twitter", idparts));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9e757aed/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
index 8bcb60b..a038792 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
@@ -81,8 +81,6 @@ public class TwitterJsonTweetActivitySerializer implements ActivitySerializer<St
activity.setUrl("http://twitter.com/" + tweet.getIdStr());
activity.setLinks(getLinks(tweet));
- System.out.println("12");
-
addTwitterExtension(activity, TwitterJsonActivitySerializer.mapper.convertValue(tweet, ObjectNode.class));
addLocationExtension(activity, tweet);
return activity;
@@ -109,8 +107,8 @@ public class TwitterJsonTweetActivitySerializer implements ActivitySerializer<St
return actor;
}
- public static List<Object> getLinks(Tweet tweet) {
- List<Object> links = Lists.newArrayList();
+ public static List<String> getLinks(Tweet tweet) {
+ List<String> links = Lists.newArrayList();
if( tweet.getEntities().getUrls() != null ) {
for (Url url : tweet.getEntities().getUrls()) {
links.add(url.getExpandedUrl());
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9e757aed/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetActivitySerDeTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetActivitySerDeTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetActivitySerDeTest.java
new file mode 100644
index 0000000..494f698
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetActivitySerDeTest.java
@@ -0,0 +1,118 @@
+package org.apache.streams.twitter.test;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.lang.StringUtils;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.twitter.pojo.Retweet;
+import org.apache.streams.twitter.pojo.Tweet;
+import org.apache.streams.twitter.provider.TwitterEventClassifier;
+import org.apache.streams.twitter.serializer.TwitterJsonActivitySerializer;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+import static java.util.regex.Pattern.matches;
+import static org.hamcrest.CoreMatchers.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+* Created with IntelliJ IDEA.
+* User: sblackmon
+* Date: 8/20/13
+* Time: 5:57 PM
+* To change this template use File | Settings | File Templates.
+*/
+public class TweetActivitySerDeTest {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(TweetActivitySerDeTest.class);
+
+ private TwitterJsonActivitySerializer twitterJsonActivitySerializer = new TwitterJsonActivitySerializer();
+
+ // @Ignore
+ @Test
+ public void Tests()
+ {
+ InputStream is = TweetActivitySerDeTest.class.getResourceAsStream("/testtweets.txt");
+ InputStreamReader isr = new InputStreamReader(is);
+ BufferedReader br = new BufferedReader(isr);
+
+ try {
+ while (br.ready()) {
+ String line = br.readLine();
+ if(!StringUtils.isEmpty(line))
+ {
+ LOGGER.info("raw: {}", line);
+
+ Class detected = TwitterEventClassifier.detectClass(line);
+
+ Activity activity = twitterJsonActivitySerializer.deserialize(line);
+
+ String activitystring = TwitterJsonActivitySerializer.mapper.writeValueAsString(activity);
+
+ LOGGER.info("activity: {}", activitystring);
+
+ assertThat(activity, is(not(nullValue())));
+
+ assertThat(activity.getId(), is(not(nullValue())));
+ assertThat(activity.getActor(), is(not(nullValue())));
+ assertThat(activity.getActor().getId(), is(not(nullValue())));
+ assertThat(activity.getVerb(), is(not(nullValue())));
+ assertThat(activity.getProvider(), is(not(nullValue())));
+
+ if( detected == Tweet.class ) {
+ assertThat(activity.getObject(), is(nullValue()));
+
+ assertEquals(activity.getVerb(), "post");
+
+ Tweet tweet = TwitterJsonActivitySerializer.mapper.readValue(line, Tweet.class);
+
+ if( tweet.getEntities() != null &&
+ tweet.getEntities().getUrls() != null &&
+ tweet.getEntities().getUrls().size() > 0 ) {
+
+
+ assertThat(activity.getLinks(), is(not(nullValue())));
+ assertEquals(tweet.getEntities().getUrls().size(), activity.getLinks().size());
+ }
+
+ } else if( detected == Retweet.class ) {
+
+ Retweet retweet = TwitterJsonActivitySerializer.mapper.readValue(line, Retweet.class);
+
+ assertThat(retweet.getRetweetedStatus(), is(not(nullValue())));
+
+ assertEquals(activity.getVerb(), "share");
+
+ assertThat(activity.getObject(), is(not(nullValue())));
+ assertThat(activity.getObject().getObjectType(), is(not(nullValue())));
+ assertThat(activity.getObject().getObjectType(), is(not(nullValue())));
+
+ if( retweet.getRetweetedStatus().getEntities() != null &&
+ retweet.getRetweetedStatus().getEntities().getUrls() != null &&
+ retweet.getRetweetedStatus().getEntities().getUrls().size() > 0 ) {
+
+ assertThat(activity.getLinks(), is(not(nullValue())));
+ assertEquals(retweet.getRetweetedStatus().getEntities().getUrls().size(), activity.getLinks().size());
+ }
+
+ }
+
+
+
+ }
+ }
+ } catch( Exception e ) {
+ System.out.println(e);
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9e757aed/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java
index c6dc0ad..bc7bcf7 100644
--- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java
@@ -3,6 +3,7 @@ package org.apache.streams.twitter.test;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Optional;
import org.apache.commons.lang.StringUtils;
import org.apache.streams.pojo.json.Activity;
import org.apache.streams.twitter.pojo.Delete;
@@ -21,6 +22,7 @@ import java.io.InputStreamReader;
import static java.util.regex.Pattern.matches;
import static org.hamcrest.CoreMatchers.*;
+import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
@@ -50,6 +52,9 @@ public class TweetSerDeTest {
InputStreamReader isr = new InputStreamReader(is);
BufferedReader br = new BufferedReader(isr);
+ int tweetlinks = 0;
+ int retweetlinks = 0;
+
try {
while (br.ready()) {
String line = br.readLine();
@@ -72,6 +77,8 @@ public class TweetSerDeTest {
assertThat(tweet.getText(), is(not(nullValue())));
assertThat(tweet.getUser(), is(not(nullValue())));
+ tweetlinks += Optional.fromNullable(tweet.getEntities().getUrls().size()).or(0);
+
} else if( detected == Retweet.class ) {
Retweet retweet = mapper.convertValue(event, Retweet.class);
@@ -83,6 +90,8 @@ public class TweetSerDeTest {
assertThat(retweet.getRetweetedStatus().getUser().getId(), is(not(nullValue())));
assertThat(retweet.getRetweetedStatus().getUser().getCreatedAt(), is(not(nullValue())));
+ retweetlinks += Optional.fromNullable(retweet.getRetweetedStatus().getEntities().getUrls().size()).or(0);
+
} else if( detected == Delete.class ) {
Delete delete = mapper.convertValue(event, Delete.class);
@@ -95,6 +104,7 @@ public class TweetSerDeTest {
} else {
Assert.fail();
}
+
}
}
} catch( Exception e ) {
@@ -102,5 +112,9 @@ public class TweetSerDeTest {
e.printStackTrace();
Assert.fail();
}
+
+ assertThat(tweetlinks, is(greaterThan(0)));
+ assertThat(retweetlinks, is(greaterThan(0)));
+
}
}