You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/04/10 20:44:33 UTC
git commit: additional pom cleanup and processor work
Repository: incubator-streams
Updated Branches:
refs/heads/springcleaning 27e67162f -> 56a395fa3
additional pom cleanup and processor work
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/56a395fa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/56a395fa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/56a395fa
Branch: refs/heads/springcleaning
Commit: 56a395fa36c27477a45fa133a2b49d02da7be5c0
Parents: 27e6716
Author: sblackmon <sb...@w2odigital.com>
Authored: Thu Apr 10 12:44:25 2014 -0600
Committer: sblackmon <sb...@w2odigital.com>
Committed: Thu Apr 10 12:44:25 2014 -0600
----------------------------------------------------------------------
pom.xml | 3 +-
streams-contrib/pom.xml | 2 +-
.../streams-persist-elasticsearch/pom.xml | 16 +-
streams-contrib/streams-persist-hbase/pom.xml | 11 +-
streams-contrib/streams-persist-hdfs/pom.xml | 4 +
streams-contrib/streams-persist-kafka/pom.xml | 9 +-
streams-contrib/streams-persist-mongo/pom.xml | 6 +-
streams-contrib/streams-processor-tika/pom.xml | 3 +-
.../org/apache/streams/tika/LinkExpander.java | 245 ------------
.../org/apache/streams/tika/TikaProcessor.java | 5 +-
streams-contrib/streams-processor-urls/pom.xml | 55 +++
.../main/java/org/apache/streams/urls/Link.java | 57 ---
.../org/apache/streams/urls/LinkResolver.java | 356 ++++++++++++++++++
.../streams/urls/LinkResolverProcessor.java | 108 ++++++
.../org/apache/streams/urls/LinkUnwinder.java | 372 -------------------
.../streams/urls/LinkUnwinderProcessor.java | 123 ------
.../streams/urls/TestLinkUnwinderProcessor.java | 7 +-
.../streams-provider-twitter/pom.xml | 5 +
streams-runtimes/streams-runtime-pig/pom.xml | 34 +-
streams-runtimes/streams-runtime-storm/pom.xml | 35 +-
20 files changed, 599 insertions(+), 857 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/56a395fa/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 1afae06..78f5d6f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -83,8 +83,7 @@
<guava.version>16.0.1</guava.version>
<scala.version>2.8.0</scala.version>
<clojure.version>1.4.0</clojure.version>
- <kafka.version>0.8.1</kafka.version>
- <zookeeper.version>3.4.5-cdh4.5.0</zookeeper.version>
+ <zookeeper.version>3.4.5</zookeeper.version>
<netty.version>3.8.0.Final</netty.version>
<json-path.version>0.9.1</json-path.version>
<build-helper.version>1.8</build-helper.version>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/56a395fa/streams-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml
index a796dad..d80fc63 100644
--- a/streams-contrib/pom.xml
+++ b/streams-contrib/pom.xml
@@ -45,7 +45,7 @@
<module>streams-persist-kafka</module>
<module>streams-persist-mongo</module>
<!--<module>streams-processor-lucene</module>-->
- <module>streams-processor-tika</module>
+ <!--<module>streams-processor-tika</module>-->
<module>streams-processor-urls</module>
<module>streams-provider-datasift</module>
<module>streams-provider-facebook</module>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/56a395fa/streams-contrib/streams-persist-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/pom.xml b/streams-contrib/streams-persist-elasticsearch/pom.xml
index 60932c8..8325243 100644
--- a/streams-contrib/streams-persist-elasticsearch/pom.xml
+++ b/streams-contrib/streams-persist-elasticsearch/pom.xml
@@ -11,6 +11,10 @@
<artifactId>streams-persist-elasticsearch</artifactId>
+ <properties>
+ <elasticsearch.version>1.1.0</elasticsearch.version>
+ </properties>
+
<dependencies>
<dependency>
<groupId>org.apache.streams</groupId>
@@ -35,16 +39,24 @@
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
- <version>1.0.1</version>
<scope>compile</scope>
<type>jar</type>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
- <version>20090211</version>
+ <version>${orgjson.version}</version>
</dependency>
</dependencies>
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.elasticsearch</groupId>
+ <artifactId>elasticsearch</artifactId>
+ <version>${elasticsearch.version}</version>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
<build>
<plugins>
<plugin>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/56a395fa/streams-contrib/streams-persist-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hbase/pom.xml b/streams-contrib/streams-persist-hbase/pom.xml
index 5b92591..18ec32e 100644
--- a/streams-contrib/streams-persist-hbase/pom.xml
+++ b/streams-contrib/streams-persist-hbase/pom.xml
@@ -11,6 +11,12 @@
<artifactId>streams-persist-hbase</artifactId>
+ <properties>
+ <hadoop-common.version>2.0.0-cdh4.5.0</hadoop-common.version>
+ <hbase.version>0.94.6-cdh4.5.0</hbase.version>
+ <zookeeper.version>3.4.5-cdh4.5.0.1-SNAPSHOT</zookeeper.version>
+ </properties>
+
<repositories>
<repository>
<id>cloudera</id>
@@ -42,14 +48,14 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
- <version>2.0.0-cdh4.5.0</version>
+ <version>${hadoop-common.version}</version>
<scope>compile</scope>
<type>jar</type>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
- <version>0.94.6-cdh4.5.0</version>
+ <version>${hbase.version}</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
@@ -66,7 +72,6 @@
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
- <version>1.8</version>
<executions>
<execution>
<id>add-source</id>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/56a395fa/streams-contrib/streams-persist-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/pom.xml b/streams-contrib/streams-persist-hdfs/pom.xml
index 6819ada..6b84733 100644
--- a/streams-contrib/streams-persist-hdfs/pom.xml
+++ b/streams-contrib/streams-persist-hdfs/pom.xml
@@ -11,6 +11,10 @@
<artifactId>streams-persist-hdfs</artifactId>
+ <properties>
+ <hdfs.version>1.1.0</hdfs.version>
+ </properties>
+
<repositories>
<repository>
<id>cloudera</id>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/56a395fa/streams-contrib/streams-persist-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/pom.xml b/streams-contrib/streams-persist-kafka/pom.xml
index 4cb494e..b0e621e 100644
--- a/streams-contrib/streams-persist-kafka/pom.xml
+++ b/streams-contrib/streams-persist-kafka/pom.xml
@@ -11,6 +11,11 @@
<artifactId>streams-persist-kafka</artifactId>
+ <properties>
+ <scala.version>2.9.2</scala.version>
+ <kafka.version>0.8.0</kafka.version>
+ </properties>
+
<dependencies>
<dependency>
<groupId>org.apache.streams</groupId>
@@ -34,8 +39,8 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.10</artifactId>
- <version>0.8.0</version>
+ <artifactId>kafka_${scala.version}</artifactId>
+ <version>${kafka.version}</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/56a395fa/streams-contrib/streams-persist-mongo/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-mongo/pom.xml b/streams-contrib/streams-persist-mongo/pom.xml
index 72a8043..6237132 100644
--- a/streams-contrib/streams-persist-mongo/pom.xml
+++ b/streams-contrib/streams-persist-mongo/pom.xml
@@ -11,6 +11,10 @@
<artifactId>streams-persist-mongo</artifactId>
+ <properties>
+ <mongo-driver.version>2.12.0-rc0</mongo-driver.version>
+ </properties>
+
<dependencies>
<dependency>
<groupId>org.apache.streams</groupId>
@@ -35,7 +39,7 @@
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
- <version>2.12.0-rc0</version>
+ <version>${mongo-driver.version}</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/56a395fa/streams-contrib/streams-processor-tika/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-tika/pom.xml b/streams-contrib/streams-processor-tika/pom.xml
index b320d38..81026a3 100644
--- a/streams-contrib/streams-processor-tika/pom.xml
+++ b/streams-contrib/streams-processor-tika/pom.xml
@@ -84,7 +84,6 @@
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
- <version>1.8</version>
<executions>
<execution>
<id>add-source</id>
@@ -119,7 +118,7 @@
<addCompileSourceRoot>true</addCompileSourceRoot>
<generateBuilders>true</generateBuilders>
<sourcePaths>
- <sourcePath>src/main/jsonschema/org/apache/streams/tika/BoilerPipeArticle.json</sourcePath>
+ <sourcePath>src/main/jsonschema/org/apache/streams/tika/LinkDetails.json</sourcePath>
</sourcePaths>
<outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
<targetPackage>org.apache.streams.tika</targetPackage>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/56a395fa/streams-contrib/streams-processor-tika/src/main/java/org/apache/streams/tika/LinkExpander.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-tika/src/main/java/org/apache/streams/tika/LinkExpander.java b/streams-contrib/streams-processor-tika/src/main/java/org/apache/streams/tika/LinkExpander.java
deleted file mode 100644
index e4c0cef..0000000
--- a/streams-contrib/streams-processor-tika/src/main/java/org/apache/streams/tika/LinkExpander.java
+++ /dev/null
@@ -1,245 +0,0 @@
-package org.apache.streams.tika;
-
-import org.apache.streams.urls.LinkUnwinder;
-import org.apache.streams.util.DateUtil;
-import org.apache.streams.tika.BoilerPipeArticle;
-import org.apache.streams.tika.LanguageDetected;
-import org.apache.tika.exception.TikaException;
-import org.apache.tika.language.LanguageIdentifier;
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.parser.AutoDetectParser;
-import org.apache.tika.parser.ParseContext;
-import org.apache.tika.parser.html.BoilerpipeContentHandler;
-import org.apache.tika.parser.html.HtmlParser;
-import org.apache.tika.sax.BodyContentHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.xml.sax.SAXException;
-
-import de.l3s.boilerpipe.document.TextBlock;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.StringWriter;
-import java.net.URL;
-import java.net.URLConnection;
-import java.text.ParseException;
-import java.util.*;
-
-
-/**
- * Helpful resources for this class:
- *
- * // TODO: This needs to be rethought.
- *
- * URL:
- * Tika UI: http://www.apache.org/dyn/closer.cgi/tika/tika-app-1.4.jar
- * Tika: http://tika.apache.org/
- * Dublin Core: http://dublincore.org/documents/dces/
- */
-
-public class LinkExpander extends LinkUnwinder
-{
- private final static Logger LOGGER = LoggerFactory.getLogger(LinkExpander.class);
-
- private static final AutoDetectParser AUTO_DETECT_PARSER = new AutoDetectParser();
-
- private final Map<String, String> metaData = new HashMap<String, String>();
-
- private final Set<String> keywords = new HashSet<String>();
-
- private BoilerPipeArticle article = new BoilerPipeArticle();
-
- private static final Collection<String> AUTHOR_SEARCH = new ArrayList<String>() {{
- add("og:author");
- add("dc:author");
- add("author");
- }};
-
- private static final Collection<String> DESCRIPTION_SEARCH = new ArrayList<String>() {{
- add("og:description");
- add("dc:description");
- add("description");
- }};
-
- private static final Collection<String> MEDIUM_SEARCH = new ArrayList<String>() {{
- add("og:medium");
- add("dc:medium");
- add("medium");
- }};
-
- private static final Collection<String> IMAGE_SEARCH = new ArrayList<String>() {{
- add("og:image");
- add("twitter:image");
- add("image");
- }};
-
- private static final Collection<String> KEYWORDS_SEARCH = new ArrayList<String>() {{
- add("keywords");
- add("news_keywords");
- }};
-
- private static final Collection<String> PUB_DATE_SEARCH = new ArrayList<String>() {{
- add("pubdate");
- add("os:pubdate");
- add("dc:pubdate");
- }};
-
- private static final Collection<String> MODIFIED_DATE_SEARCH = new ArrayList<String>() {{
- add("lastmod");
- add("last-modified");
- }};
-
- private static final Collection<String> LOCALE_SEARCH = new ArrayList<String>() {{
- add("locale");
- add("os:locale");
- add("dc:local");
- }};
-
- // Social Searchers
- private static final Collection<String> FACEBOOK_PAGE_SEARCH = new ArrayList<String>() {{
- add("fb:page_id");
- }};
-
- private static final Collection<String> FACEBOOK_APP_SEARCH = new ArrayList<String>() {{
- add("fb:app_id");
- }};
-
- private static final Collection<String> TWITTER_SITE_SEARCH = new ArrayList<String>() {{
- add("twitter:site:id");
- add("twitter:site");
- }};
-
- private static final Collection<String> TWITTER_CREATOR_SEARCH = new ArrayList<String>() {{
- add("twitter:creator:id");
- add("twitter:creator");
- }};
-
-
- public LinkExpander(String url) {
- super(url);
- }
-
- public void run() {
- super.run();
- expandLink();
- }
-
- public BoilerPipeArticle getArticle() {
- return article;
- }
-
- private void expandLink()
- {
- InputStream is = null;
-
- try
- {
- URL url = new URL(this.getFinalURL());
- URLConnection con = url.openConnection();
- con.setConnectTimeout(10000);
- is = con.getInputStream();
-
- parseMainContent(is);
- parsePlainText(is);
- detectLanguage(article.getPlainText());
-
- }
- // Handle all Exceptions by just reporting that the site status was an error.
- catch (IOException e) {
- article.setSiteStatus(BoilerPipeArticle.SiteStatus.ERROR);
- }
- catch (TikaException e) {
- article.setSiteStatus(BoilerPipeArticle.SiteStatus.ERROR);
- }
- catch (SAXException e) {
- article.setSiteStatus(BoilerPipeArticle.SiteStatus.ERROR);
- }
- catch (Exception e) {
- article.setSiteStatus(BoilerPipeArticle.SiteStatus.ERROR);
- }
- finally {
- if (!(is == null)) {
- try {
- is.close();
- }
- catch(IOException e) {
- LOGGER.warn("Problem closing the input stream: {}", e.getMessage());
- }
- }
- }
- }
-
- private void parseMainContent(InputStream is) throws IOException, SAXException, TikaException, ParseException
- {
- Metadata rawMetaData = new Metadata();
- StringWriter stringWriter = new StringWriter();
-
- BoilerpipeContentHandler boilerpipeContentHandler = new BoilerpipeContentHandler(stringWriter);
-
- AUTO_DETECT_PARSER.parse(is,
- boilerpipeContentHandler,
- rawMetaData);
-
- article.setBody(boilerpipeContentHandler.getTextDocument().getContent());
- article.setTitle(boilerpipeContentHandler.getTextDocument().getTitle());
-
- // this map is for ourselves so we convert it to lower-case to make it easier to search.
- // the meta data that is going to be returned will be unmodified meta data.
- for(String name : rawMetaData.names())
- if(rawMetaData.get(name) != null) {
- this.metaData.put(name.toLowerCase(), rawMetaData.get(name));
- article.setAdditionalProperty(name.toLowerCase(), rawMetaData.get(name));
- }
-
- article.setAuthor(metaDataSearcher(LinkExpander.AUTHOR_SEARCH));
- article.setDescription(metaDataSearcher(LinkExpander.DESCRIPTION_SEARCH));
- article.setMedium(metaDataSearcher(LinkExpander.MEDIUM_SEARCH));
- article.setImageURL(metaDataSearcher(LinkExpander.IMAGE_SEARCH));
- article.setLocale(metaDataSearcher(LinkExpander.LOCALE_SEARCH));
-
- article.setFacebookApp(metaDataSearcher(LinkExpander.FACEBOOK_APP_SEARCH));
- article.setFacebookPage(metaDataSearcher(LinkExpander.FACEBOOK_PAGE_SEARCH));
-
- article.setTwitterCreator(metaDataSearcher(LinkExpander.TWITTER_CREATOR_SEARCH));
- article.setTwitterSite(metaDataSearcher(LinkExpander.TWITTER_SITE_SEARCH));
-
- mergeSet(LinkExpander.KEYWORDS_SEARCH, this.keywords);
-
- article.setPublishedDate(DateUtil.determineDate(metaDataSearcher(LinkExpander.PUB_DATE_SEARCH)));
- article.setLastModifiedDate(DateUtil.determineDate(metaDataSearcher(LinkExpander.MODIFIED_DATE_SEARCH)));
-
- if(article.getBody().length() > 50)
- article.setSiteStatus(BoilerPipeArticle.SiteStatus.SUCCESS);
- }
-
- private void parsePlainText(InputStream is) throws Exception {
- BodyContentHandler handler = new BodyContentHandler();
- Metadata metadata = new Metadata();
- new HtmlParser().parse(is, handler, metadata, new ParseContext());
- article.setPlainText(handler.toString());
- }
-
- private void detectLanguage(String plainText) throws Exception {
- LanguageDetected languageDetected = new LanguageDetected();
- LanguageIdentifier languageIdentifier = new LanguageIdentifier(plainText);
- languageDetected.setLanguageCode(languageIdentifier.getLanguage());
- languageDetected.setIsLanguageReasonablyCertain(languageIdentifier.isReasonablyCertain());
- article.setLanguageDetected(languageDetected);
- }
-
- private String metaDataSearcher(Collection<String> itemsToSearch) {
- for(String s : itemsToSearch)
- if(this.metaData.containsKey(s))
- return this.metaData.get(s);
-
- // the meta searcher returned nothing.
- return null;
- }
-
- private void mergeSet(Collection<String> itemsToSearch, Set<String> set) {
- for(String s : itemsToSearch)
- Collections.addAll(set, s == null || s.equals("") ? new String[]{} : s.split(","));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/56a395fa/streams-contrib/streams-processor-tika/src/main/java/org/apache/streams/tika/TikaProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-tika/src/main/java/org/apache/streams/tika/TikaProcessor.java b/streams-contrib/streams-processor-tika/src/main/java/org/apache/streams/tika/TikaProcessor.java
index 7609635..1e694c5 100644
--- a/streams-contrib/streams-processor-tika/src/main/java/org/apache/streams/tika/TikaProcessor.java
+++ b/streams-contrib/streams-processor-tika/src/main/java/org/apache/streams/tika/TikaProcessor.java
@@ -9,7 +9,6 @@ import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProcessor;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.pojo.json.Activity;
-import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -27,7 +26,7 @@ import java.util.List;
public class TikaProcessor implements StreamsProcessor
{
- private final static String STREAMS_ID = "LinkExpanderProcessor";
+ private final static String STREAMS_ID = "LinkCrawlerProcessor";
private final static Logger LOGGER = LoggerFactory.getLogger(TikaProcessor.class);
@@ -85,7 +84,7 @@ public class TikaProcessor implements StreamsProcessor
private StreamsDatum expandLink(String link, StreamsDatum input) {
- LinkExpander expander = new LinkExpander((String)link);
+ LinkCrawler expander = new LinkCrawler((String)link);
expander.run();
StreamsDatum datum = null;
if(input.getId() == null)
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/56a395fa/streams-contrib/streams-processor-urls/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-urls/pom.xml b/streams-contrib/streams-processor-urls/pom.xml
index 966155f..b320ca5 100644
--- a/streams-contrib/streams-processor-urls/pom.xml
+++ b/streams-contrib/streams-processor-urls/pom.xml
@@ -61,5 +61,60 @@
<directory>src/test/resources</directory>
</testResource>
</testResources>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>target/generated-sources/jsonschema2pojo/**/*.java</source>
+ </sources>
+ </configuration>
+ </execution>
+ <execution>
+ <id>add-source-jaxb2</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>target/generated-sources/jaxb2</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.jsonschema2pojo</groupId>
+ <artifactId>jsonschema2pojo-maven-plugin</artifactId>
+ <configuration>
+ <addCompileSourceRoot>true</addCompileSourceRoot>
+ <generateBuilders>true</generateBuilders>
+ <sourcePaths>
+ <sourcePath>src/main/jsonschema/org/apache/streams/urls/LinkDetails.json</sourcePath>
+ </sourcePaths>
+ <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
+ <targetPackage>org.apache.streams.urls</targetPackage>
+ <useLongIntegers>true</useLongIntegers>
+ <useJodaDates>true</useJodaDates>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>generate</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
</build>
+
</project>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/56a395fa/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/Link.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/Link.java b/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/Link.java
deleted file mode 100644
index ec282f5..0000000
--- a/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/Link.java
+++ /dev/null
@@ -1,57 +0,0 @@
-package org.apache.streams.urls;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.util.List;
-
-public interface Link
-{
- @JsonProperty("status")
- public LinkStatus getStatus();
-
- @JsonProperty("originalUrl")
- public String getOriginalURL();
-
- @JsonProperty("wasRedirected")
- public boolean wasRedirected();
-
- @JsonProperty("finalUrl")
- public String getFinalURL();
-
- @JsonProperty("domain")
- public String getDomain();
-
- @JsonProperty("normalizedUrl")
- public String getNormalizedURL();
-
- @JsonProperty("urlParts")
- public List<String> getUrlParts();
-
- @JsonProperty("finalStatusCode")
- public String getFinalResponseCode();
-
- @JsonProperty("isTracked")
- public boolean isTracked();
-
- @JsonProperty("redirects")
- public List<String> getRedirects();
-
- @JsonProperty("tookInMillis")
- public long getTookInMillis();
-
- public void run();
-
- public enum LinkStatus {
- SUCCESS,
- ERROR,
- MALFORMED_URL,
- NOT_FOUND,
- FORBIDDEN,
- REDIRECT_ERROR,
- UNAUTHORIZED,
- LOOP,
- HTTP_ERROR_STATUS,
- EXCEPTION
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/56a395fa/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkResolver.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkResolver.java b/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkResolver.java
new file mode 100644
index 0000000..7b5b012
--- /dev/null
+++ b/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkResolver.java
@@ -0,0 +1,356 @@
+package org.apache.streams.urls;
+
+import com.google.common.base.Preconditions;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.HttpURLConnection;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLDecoder;
+import java.util.*;
+
+/**
+ * References:
+ * Some helpful references to help
+ * Purpose URL
+ * ------------- ----------------------------------------------------------------
+ * [Status Codes] http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html
+ * [Test Cases] http://greenbytes.de/tech/tc/httpredirects/
+ * [t.co behavior] https://dev.twitter.com/docs/tco-redirection-behavior
+ */
+
+public class LinkResolver
+{
+ private final static Logger LOGGER = LoggerFactory.getLogger(LinkResolver.class);
+
+ private static final int MAX_ALLOWED_REDIRECTS = 30;
+ private static final int DEFAULT_HTTP_TIMEOUT = 5000; //originally 30000
+ private static final String LOCATION_IDENTIFIER = "location";
+ private static final String SET_COOKIE_IDENTIFIER = "set-cookie";
+
+ private LinkDetails linkDetails = new LinkDetails();
+
+ private static final Collection<String> BOTS_ARE_OK = new ArrayList<String>() {{
+ add("t.co");
+ }};
+
+ private static final Collection<String> URL_TRACKING_TO_REMOVE = new ArrayList<String>() {{
+ /******************************************************************
+ * Google uses parameters in the URL string to track referrers
+ * on their Google Analytics and promotions. These are the
+ * identified URL patterns.
+ *
+ * URL:
+ * https://support.google.com/analytics/answer/1033867?hl=en
+ *****************************************************************/
+
+ // Required. Use utm_source to identify a search engine, newsletter name, or other source.
+ add("([\\?&])utm_source(=)[^&?]*");
+
+ // Required. Use utm_medium to identify a medium such as email or cost-per- click.
+ add("([\\?&])utm_medium(=)[^&?]*");
+
+ // Used for paid search. Use utm_term to note the keywords for this ad.
+ add("([\\?&])utm_term(=)[^&?]*");
+
+ // Used for A/B testing and content-targeted ads. Use utm_content to differentiate ads or links that point to the same
+ add("([\\?&])utm_content(=)[^&?]*");
+
+ // Used for keyword analysis. Use utm_campaign to identify a specific product promotion or strategic campaign.
+ add("([\\?&])utm_campaign(=)[^&?]*");
+ }};
+
+ public LinkDetails getLinkDetails() { return linkDetails; }
+
+ public LinkResolver(String originalURL) {
+ linkDetails.setOriginalURL(originalURL);
+ }
+
+ public void run() {
+
+ Preconditions.checkNotNull(linkDetails.getOriginalURL());
+
+ linkDetails.setStartTime(DateTime.now());
+ // we are going to try three times just incase we catch the service off-guard
+ // this is mainly to help us with our tests.
+ for(int i = 0; (i < 3) && linkDetails.getFinalURL() == null ; i++) {
+ if(linkDetails.getLinkStatus() != LinkDetails.LinkStatus.SUCCESS)
+ unwindLink(linkDetails.getOriginalURL());
+ }
+
+ linkDetails.setFinalURL(cleanURL(linkDetails.getFinalURL()));
+ linkDetails.setNormalizedURL(normalizeURL(linkDetails.getFinalURL()));
+ linkDetails.setUrlParts(tokenizeURL(linkDetails.getNormalizedURL()));
+
+ this.updateTookInMillis();
+ }
+
+ protected void updateTookInMillis() {
+ Preconditions.checkNotNull(linkDetails.getStartTime());
+ linkDetails.setTookInMills(DateTime.now().minus(linkDetails.getStartTime().getMillis()).getMillis());
+ }
+
+ public void unwindLink(String url)
+ {
+ Preconditions.checkNotNull(linkDetails);
+
+ // Check to see if they wound up in a redirect loop
+ if((linkDetails.getRedirectCount() != null && linkDetails.getRedirectCount().longValue() > 0 && (linkDetails.getOriginalURL().equals(url) || linkDetails.getRedirects().contains(url))) || (linkDetails.getRedirectCount().longValue() > MAX_ALLOWED_REDIRECTS))
+ {
+ linkDetails.setLinkStatus(LinkDetails.LinkStatus.LOOP);
+ return;
+ }
+
+ if(!linkDetails.getOriginalURL().equals(url))
+ linkDetails.getRedirects().add(url);
+
+ HttpURLConnection connection = null;
+
+ try
+ {
+ URL thisURL = new URL(url);
+ connection = (HttpURLConnection)new URL(url).openConnection();
+
+ // now we are going to pretend that we are a browser...
+ // This is the way my mac works.
+ if(!BOTS_ARE_OK.contains(thisURL.getHost()))
+ {
+ connection.addRequestProperty("Host", thisURL.getHost());
+ connection.addRequestProperty("Connection", "Keep-Alive");
+ connection.addRequestProperty("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.48 Safari/537.36");
+ connection.addRequestProperty("Accept-Language", "en-US,en;q=0.8,zh;q=0.6");
+
+ // the test to seattlemamadoc.com prompted this change.
+ // they auto detect bots by checking the referrer chain and the 'user-agent'
+ // this broke the t.co test. t.co URLs are EXPLICITLY ok with bots
+ // there is a list for URLS that behave this way at the top in BOTS_ARE_OK
+ // smashew 2013-13-2013
+
+ if(linkDetails.getRedirectCount() > 0 && BOTS_ARE_OK.contains(thisURL.getHost()))
+ connection.addRequestProperty("Referrer", linkDetails.getOriginalURL());
+ }
+
+ connection.setReadTimeout(DEFAULT_HTTP_TIMEOUT);
+ connection.setConnectTimeout(DEFAULT_HTTP_TIMEOUT);
+
+ connection.setInstanceFollowRedirects(false);
+
+ if(linkDetails.getCookies() != null)
+ for (String cookie : linkDetails.getCookies())
+ connection.addRequestProperty("Cookie", cookie.split(";", 1)[0]);
+
+ connection.connect();
+
+ linkDetails.setFinalResponseCode((long)connection.getResponseCode());
+
+ /**************
+ *
+ */
+ Map<String,List<String>> headers = createCaseInsensitiveMap(connection.getHeaderFields());
+ /******************************************************************
+ * If they want us to set cookies, well, then we will set cookies
+ * Example URL:
+ * http://nyti.ms/1bCpesx
+ *****************************************************************/
+ if(headers.containsKey(SET_COOKIE_IDENTIFIER))
+ linkDetails.getCookies().add(headers.get(SET_COOKIE_IDENTIFIER).get(0));
+
+ switch (linkDetails.getFinalResponseCode().intValue())
+ {
+ case 200: // HTTP OK
+ linkDetails.setFinalURL(connection.getURL().toString());
+ linkDetails.setDomain(new URL(linkDetails.getFinalURL()).getHost());
+ linkDetails.setLinkStatus(LinkDetails.LinkStatus.SUCCESS);
+ break;
+ case 300: // Multiple choices
+ case 301: // URI has been moved permanently
+ case 302: // Found
+ case 303: // Primarily for a HTTP Post
+ case 304: // Not Modified
+ case 306: // This status code is unused but in the redirect block.
+ case 307: // Temporary re-direct
+ /*******************************************************************
+ * Author:
+ * Smashew
+ *
+ * Date: 2013-11-15
+ *
+ * Note:
+ * It is possible that we have already found our final URL. In
+ * the event that we have found our final URL, we are going to
+ * save this URL as long as it isn't the original URL.
+ * We are still going to ask the browser to re-direct, but in the
+ * case of yet another redirect, seen with the redbull test
+ * this can be followed by a 304, a browser, by W3C standards would
+ * still render the page with it's content, but for us to assert
+ * a success, we are really hoping for a 304 message.
+ *******************************************************************/
+ if(!linkDetails.getOriginalURL().toLowerCase().equals(connection.getURL().toString().toLowerCase()))
+ linkDetails.setFinalURL(connection.getURL().toString());
+ if(!headers.containsKey(LOCATION_IDENTIFIER))
+ {
+ LOGGER.info("Headers: {}", headers);
+ linkDetails.setLinkStatus(LinkDetails.LinkStatus.REDIRECT_ERROR);
+ }
+ else
+ {
+ linkDetails.setRedirected(Boolean.TRUE);
+ linkDetails.setRedirectCount(linkDetails.getRedirectCount().longValue()+1);
+ unwindLink(connection.getHeaderField(LOCATION_IDENTIFIER));
+ }
+ break;
+ case 305: // User must use the specified proxy (deprecated by W3C)
+ break;
+ case 401: // Unauthorized (nothing we can do here)
+ linkDetails.setLinkStatus(LinkDetails.LinkStatus.UNAUTHORIZED);
+ break;
+ case 403: // HTTP Forbidden (Nothing we can do here)
+ linkDetails.setLinkStatus(LinkDetails.LinkStatus.FORBIDDEN);
+ break;
+ case 404: // Not Found (Page is not found, nothing we can do with a 404)
+ linkDetails.setLinkStatus(LinkDetails.LinkStatus.NOT_FOUND);
+ break;
+ case 500: // Internal Server Error
+ case 501: // Not Implemented
+ case 502: // Bad Gateway
+ case 503: // Service Unavailable
+ case 504: // Gateway Timeout
+ case 505: // Version not supported
+ linkDetails.setLinkStatus(LinkDetails.LinkStatus.HTTP_ERROR_STATUS);
+ break;
+ default:
+ LOGGER.info("Unrecognized HTTP Response Code: {}", linkDetails.getFinalResponseCode());
+ linkDetails.setLinkStatus(LinkDetails.LinkStatus.NOT_FOUND);
+ break;
+ }
+ }
+ catch (MalformedURLException e)
+ {
+ // the URL is trash, so, it can't load it.
+ linkDetails.setLinkStatus(LinkDetails.LinkStatus.MALFORMED_URL);
+ }
+ catch (IOException ex)
+ {
+ // there was an issue we are going to set to error.
+ linkDetails.setLinkStatus(LinkDetails.LinkStatus.ERROR);
+ }
+ catch (Exception ex)
+ {
+ // there was an unknown issue we are going to set to exception.
+ linkDetails.setLinkStatus(LinkDetails.LinkStatus.EXCEPTION);
+ }
+ finally
+ {
+ if (connection != null)
+ connection.disconnect();
+ }
+ }
+
+ private Map<String,List<String>> createCaseInsensitiveMap(Map<String,List<String>> input) {
+ Map<String,List<String>> toReturn = new HashMap<String, List<String>>();
+ for(String k : input.keySet())
+ if(k != null && input.get(k) != null)
+ toReturn.put(k.toLowerCase(), input.get(k));
+ return toReturn;
+ }
+
+ private String cleanURL(String url)
+ {
+ // If they pass us a null URL then we are going to pass that right back to them.
+ if(url == null)
+ return null;
+
+ // remember how big the URL was at the start
+ int startLength = url.length();
+
+ // Iterate through all the known URL parameters of tracking URLs
+ for(String pattern : URL_TRACKING_TO_REMOVE)
+ url = url.replaceAll(pattern, "");
+
+ // If the URL is smaller than when it came in. Then it had tracking information
+ if(url.length() < startLength)
+ linkDetails.setTracked(Boolean.TRUE);
+
+ // return our url.
+ return url;
+ }
+
+ /**
+ * Removes the protocol, if it exists, from the front and
+ * removes any random encoding characters
+ * Extend this to do other url cleaning/pre-processing
+ * @param url - The String URL to normalize
+ * @return normalizedUrl - The String URL that has no junk or surprises
+ */
+ public static String normalizeURL(String url)
+ {
+ // Decode URL to remove any %20 type stuff
+ String normalizedUrl = url;
+ try {
+ // I've used a URLDecoder that's part of Java here,
+ // but this functionality exists in most modern languages
+ // and is universally called url decoding
+ normalizedUrl = URLDecoder.decode(url, "UTF-8");
+ }
+ catch(UnsupportedEncodingException uee)
+ {
+ System.err.println("Unable to Decode URL. Decoding skipped.");
+ uee.printStackTrace();
+ }
+
+ // Remove the protocol, http:// ftp:// or similar from the front
+ if (normalizedUrl.contains("://"))
+ normalizedUrl = normalizedUrl.split(":/{2}")[1];
+
+ // Room here to do more pre-processing
+
+ return normalizedUrl;
+ }
+
+ /**
+ * Goal is to get the different parts of the URL path. This can be used
+ * in a classifier to help us determine if we are working with
+ *
+ * Reference:
+ * http://stackoverflow.com/questions/10046178/pattern-matching-for-url-classification
+ * @param url - Url to be tokenized
+ * @return tokens - A String array of all the tokens
+ */
+ public static List<String> tokenizeURL(String url)
+ {
+ url = normalizeURL(url);
+ // I assume that we're going to use the whole URL to find tokens in
+ // If you want to just look in the GET parameters, or you want to ignore the domain
+ // or you want to use the domain as a token itself, that would have to be
+ // processed above the next line, and only the remaining parts split
+ List<String> toReturn = new ArrayList<String>();
+
+ // Split the URL by forward slashes. Most modern browsers will accept a URL
+ // this malformed such as http://www.smashew.com/hello//how////are/you
+ // hence the '+' in the regular expression.
+ for(String part: url.split("/+"))
+ toReturn.add(part.toLowerCase());
+
+ // return our object.
+ return toReturn;
+
+ // One could alternatively use a more complex regex to remove more invalid matches
+ // but this is subject to your (?:in)?ability to actually write the regex you want
+
+ // These next two get rid of tokens that are too short, also.
+
+ // Destroys anything that's not alphanumeric and things that are
+ // alphanumeric but only 1 character long
+ //String[] tokens = url.split("(?:[\\W_]+\\w)*[\\W_]+");
+
+ // Destroys anything that's not alphanumeric and things that are
+ // alphanumeric but only 1 or 2 characters long
+ //String[] tokens = url.split("(?:[\\W_]+\\w{1,2})*[\\W_]+");
+ }
+
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/56a395fa/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkResolverProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkResolverProcessor.java b/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkResolverProcessor.java
new file mode 100644
index 0000000..929d7cd
--- /dev/null
+++ b/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkResolverProcessor.java
@@ -0,0 +1,108 @@
+package org.apache.streams.urls;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.pojo.json.Activity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+/**
+ * References:
+ * Some helpful references to help
+ * Purpose URL
+ * ------------- ----------------------------------------------------------------
+ * [Status Codes] http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html
+ * [Test Cases] http://greenbytes.de/tech/tc/httpredirects/
+ * [t.co behavior] https://dev.twitter.com/docs/tco-redirection-behavior
+ */
+
+public class LinkResolverProcessor implements StreamsProcessor
+{
+ private final static String STREAMS_ID = "LinkResolverProcessor";
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(LinkResolverProcessor.class);
+
+ private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
+
+ List<StreamsDatum> result = Lists.newArrayList();
+
+ LOGGER.debug("{} processing {}", STREAMS_ID, entry.getDocument().getClass());
+
+ Activity activity;
+
+ // get list of shared urls
+ if( entry.getDocument() instanceof Activity) {
+ activity = (Activity) entry.getDocument();
+
+ activity.setLinks(unwind(activity.getLinks()));
+
+ entry.setDocument(activity);
+
+ result.add(entry);
+
+ return result;
+ } else if( entry.getDocument() instanceof String ) {
+
+ try {
+ activity = mapper.readValue((String) entry.getDocument(), Activity.class);
+ } catch (Exception e) {
+ e.printStackTrace();
+ LOGGER.warn(e.getMessage());
+ return(Lists.newArrayList(entry));
+ }
+
+ activity.setLinks(unwind(activity.getLinks()));
+
+ try {
+ entry.setDocument(mapper.writeValueAsString(activity));
+ } catch (Exception e) {
+ e.printStackTrace();
+ LOGGER.warn(e.getMessage());
+ return(Lists.newArrayList());
+ }
+
+ result.add(entry);
+
+ return result;
+
+ }
+ else {
+ //return(Lists.newArrayList(entry));
+ return( Lists.newArrayList());
+ }
+ }
+
+ @Override
+ public void prepare(Object o) {
+ }
+
+ @Override
+ public void cleanUp() {
+
+ }
+
+ private List<String> unwind(List<String> inputLinks) {
+ List<String> outputLinks = Lists.newArrayList();
+ for( String link : inputLinks ) {
+ try {
+ LinkResolver unwinder = new LinkResolver(link);
+ unwinder.run();
+ outputLinks.add(unwinder.getLinkDetails().getFinalURL());
+ } catch (Exception e) {
+ //if unwindable drop
+ LOGGER.debug("Failed to unwind link : {}", link);
+ LOGGER.debug("Exception unwinding link : {}", e);
+ e.printStackTrace();
+ }
+ }
+ return outputLinks;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/56a395fa/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkUnwinder.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkUnwinder.java b/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkUnwinder.java
deleted file mode 100644
index a4a28f1..0000000
--- a/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkUnwinder.java
+++ /dev/null
@@ -1,372 +0,0 @@
-package org.apache.streams.urls;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.net.HttpURLConnection;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLDecoder;
-import java.util.*;
-
-/**
- * References:
- * Some helpful references to help
- * Purpose URL
- * ------------- ----------------------------------------------------------------
- * [Status Codes] http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html
- * [Test Cases] http://greenbytes.de/tech/tc/httpredirects/
- * [t.co behavior] https://dev.twitter.com/docs/tco-redirection-behavior
- */
-
-public class LinkUnwinder implements Link
-{
- private final static Logger LOGGER = LoggerFactory.getLogger(LinkUnwinder.class);
-
- private static final int MAX_ALLOWED_REDIRECTS = 30;
- private static final int DEFAULT_HTTP_TIMEOUT = 5000; //originally 30000
- private static final String LOCATION_IDENTIFIER = "location";
- private static final String SET_COOKIE_IDENTIFIER = "set-cookie";
-
- private Date startTime = new Date();
- private String originalURL;
- private LinkStatus status;
- private String finalURL;
- private String domain;
- private boolean wasRedirected;
- private List<String> redirects = new ArrayList<String>();
- private boolean isTracked = false;
- private int finalResponseCode;
- private Collection<String> cookies;
-
- private String normalizedUrl;
- private List<String> urlParts;
-
- private int redirectCount = 0;
- private long tookInMillis = 0;
-
- private static final Collection<String> BOTS_ARE_OK = new ArrayList<String>() {{
- add("t.co");
- }};
-
- private static final Collection<String> URL_TRACKING_TO_REMOVE = new ArrayList<String>() {{
- /******************************************************************
- * Google uses parameters in the URL string to track referrers
- * on their Google Analytics and promotions. These are the
- * identified URL patterns.
- *
- * URL:
- * https://support.google.com/analytics/answer/1033867?hl=en
- *****************************************************************/
-
- // Required. Use utm_source to identify a search engine, newsletter name, or other source.
- add("([\\?&])utm_source(=)[^&?]*");
-
- // Required. Use utm_medium to identify a medium such as email or cost-per- click.
- add("([\\?&])utm_medium(=)[^&?]*");
-
- // Used for paid search. Use utm_term to note the keywords for this ad.
- add("([\\?&])utm_term(=)[^&?]*");
-
- // Used for A/B testing and content-targeted ads. Use utm_content to differentiate ads or links that point to the same
- add("([\\?&])utm_content(=)[^&?]*");
-
- // Used for keyword analysis. Use utm_campaign to identify a specific product promotion or strategic campaign.
- add("([\\?&])utm_campaign(=)[^&?]*");
- }};
-
- public boolean isFailure() { return false; }
- public String getOriginalURL() { return this.originalURL; }
- public LinkStatus getStatus() { return this.status; }
- public String getDomain() { return this.domain; }
- public String getFinalURL() { return this.finalURL; }
- public List<String> getRedirects() { return this.redirects; }
- public boolean wasRedirected() { return this.wasRedirected; }
- public boolean isTracked() { return this.isTracked; }
- public String getFinalResponseCode() { return Integer.toString(this.finalResponseCode); }
- public long getTookInMillis() { return this.tookInMillis; }
- public String getNormalizedURL() { return this.normalizedUrl; }
- public List<String> getUrlParts() { return this.urlParts; }
-
- public LinkUnwinder(String originalURL) {
- this.originalURL = originalURL;
- }
-
- public void run() {
- // we are going to try three times just incase we catch the service off-guard
- // this is mainly to help us with our tests.
- for(int i = 0; (i < 3) && this.finalURL == null ; i++) {
- if(this.status != LinkStatus.SUCCESS)
- unwindLink(this.originalURL);
- }
- this.finalURL = cleanURL(this.finalURL);
- this.normalizedUrl = normalizeURL(this.finalURL);
- this.urlParts = tokenizeURL(this.normalizedUrl);
-
- this.updateTookInMillis();
- }
-
- protected void updateTookInMillis() {
- this.tookInMillis = new Date().getTime() - this.startTime.getTime();
- }
-
- public void unwindLink(String url)
- {
- // Check to see if they wound up in a redirect loop
- if((this.redirectCount > 0 && (this.originalURL.equals(url) || this.redirects.contains(url))) || (this.redirectCount > MAX_ALLOWED_REDIRECTS))
- {
- this.status = LinkStatus.LOOP;
- return;
- }
-
- if(!this.originalURL.equals(url))
- this.redirects.add(url);
-
- HttpURLConnection connection = null;
-
- try
- {
- URL thisURL = new URL(url);
- connection = (HttpURLConnection)new URL(url).openConnection();
-
- // now we are going to pretend that we are a browser...
- // This is the way my mac works.
- if(!BOTS_ARE_OK.contains(thisURL.getHost()))
- {
- connection.addRequestProperty("Host", thisURL.getHost());
- connection.addRequestProperty("Connection", "Keep-Alive");
- connection.addRequestProperty("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.48 Safari/537.36");
- connection.addRequestProperty("Accept-Language", "en-US,en;q=0.8,zh;q=0.6");
-
- // the test to seattlemamadoc.com prompted this change.
- // they auto detect bots by checking the referrer chain and the 'user-agent'
- // this broke the t.co test. t.co URLs are EXPLICITLY ok with bots
- // there is a list for URLS that behave this way at the top in BOTS_ARE_OK
- // smashew 2013-13-2013
-
- if(this.redirectCount > 0 && BOTS_ARE_OK.contains(thisURL.getHost()))
- connection.addRequestProperty("Referrer", this.originalURL);
- }
-
- connection.setReadTimeout(DEFAULT_HTTP_TIMEOUT);
- connection.setConnectTimeout(DEFAULT_HTTP_TIMEOUT);
-
- connection.setInstanceFollowRedirects(false);
-
- if(this.cookies != null)
- for (String cookie : cookies)
- connection.addRequestProperty("Cookie", cookie.split(";", 1)[0]);
-
- connection.connect();
-
- this.finalResponseCode = connection.getResponseCode();
-
- /**************
- *
- */
- Map<String,List<String>> headers = createCaseInsenitiveMap(connection.getHeaderFields());
- /******************************************************************
- * If they want us to set cookies, well, then we will set cookies
- * Example URL:
- * http://nyti.ms/1bCpesx
- *****************************************************************/
- if(headers.containsKey(SET_COOKIE_IDENTIFIER))
- this.cookies = headers.get(SET_COOKIE_IDENTIFIER);
-
- switch (this.finalResponseCode)
- {
- case 200: // HTTP OK
- this.finalURL = connection.getURL().toString();
- this.domain = new URL(this.finalURL).getHost();
- this.status = LinkStatus.SUCCESS;
- break;
- case 300: // Multiple choices
- case 301: // URI has been moved permanently
- case 302: // Found
- case 303: // Primarily for a HTTP Post
- case 304: // Not Modified
- case 306: // This status code is unused but in the redirect block.
- case 307: // Temporary re-direct
- /*******************************************************************
- * Author:
- * Smashew
- *
- * Date: 2013-11-15
- *
- * Note:
- * It is possible that we have already found our final URL. In
- * the event that we have found our final URL, we are going to
- * save this URL as long as it isn't the original URL.
- * We are still going to ask the browser to re-direct, but in the
- * case of yet another redirect, seen with the redbull test
- * this can be followed by a 304, a browser, by W3C standards would
- * still render the page with it's content, but for us to assert
- * a success, we are really hoping for a 304 message.
- *******************************************************************/
- if(!this.originalURL.toLowerCase().equals(connection.getURL().toString().toLowerCase()))
- this.finalURL = connection.getURL().toString();
- if(!headers.containsKey(LOCATION_IDENTIFIER))
- {
- LOGGER.info("Headers: {}", headers);
- this.status = LinkStatus.REDIRECT_ERROR;
- }
- else
- {
- this.wasRedirected = true;
- this.redirectCount++;
- unwindLink(connection.getHeaderField(LOCATION_IDENTIFIER));
- }
- break;
- case 305: // User must use the specified proxy (deprecated by W3C)
- break;
- case 401: // Unauthorized (nothing we can do here)
- this.status = LinkStatus.UNAUTHORIZED;
- break;
- case 403: // HTTP Forbidden (Nothing we can do here)
- this.status = LinkStatus.FORBIDDEN;
- break;
- case 404: // Not Found (Page is not found, nothing we can do with a 404)
- this.status = LinkStatus.NOT_FOUND;
- break;
- case 500: // Internal Server Error
- case 501: // Not Implemented
- case 502: // Bad Gateway
- case 503: // Service Unavailable
- case 504: // Gateway Timeout
- case 505: // Version not supported
- this.status = LinkStatus.HTTP_ERROR_STATUS;
- break;
- default:
- LOGGER.info("Unrecognized HTTP Response Code: {}", this.finalResponseCode);
- this.status = LinkStatus.NOT_FOUND;
- break;
- }
- }
- catch (MalformedURLException e)
- {
- // the URL is trash, so, it can't load it.
- this.status = LinkStatus.MALFORMED_URL;
- }
- catch (IOException ex)
- {
- // there was an issue we are going to set to error.
- this.status = LinkStatus.ERROR;
- }
- catch (Exception ex)
- {
- // there was an unknown issue we are going to set to exception.
- this.status = LinkStatus.EXCEPTION;
- }
- finally
- {
- if (connection != null)
- connection.disconnect();
- }
- }
-
- private Map<String,List<String>> createCaseInsenitiveMap(Map<String,List<String>> input) {
- Map<String,List<String>> toReturn = new HashMap<String, List<String>>();
- for(String k : input.keySet())
- if(k != null && input.get(k) != null)
- toReturn.put(k.toLowerCase(), input.get(k));
- return toReturn;
- }
-
- private String cleanURL(String url)
- {
- // If they pass us a null URL then we are going to pass that right back to them.
- if(url == null)
- return null;
-
- // remember how big the URL was at the start
- int startLength = url.length();
-
- // Iterate through all the known URL parameters of tracking URLs
- for(String pattern : URL_TRACKING_TO_REMOVE)
- url = url.replaceAll(pattern, "");
-
- // If the URL is smaller than when it came in. Then it had tracking information
- if(url.length() < startLength)
- this.isTracked = true;
-
- // return our url.
- return url;
- }
-
- /**
- * Removes the protocol, if it exists, from the front and
- * removes any random encoding characters
- * Extend this to do other url cleaning/pre-processing
- * @param url - The String URL to normalize
- * @return normalizedUrl - The String URL that has no junk or surprises
- */
- public static String normalizeURL(String url)
- {
- // Decode URL to remove any %20 type stuff
- String normalizedUrl = url;
- try {
- // I've used a URLDecoder that's part of Java here,
- // but this functionality exists in most modern languages
- // and is universally called url decoding
- normalizedUrl = URLDecoder.decode(url, "UTF-8");
- }
- catch(UnsupportedEncodingException uee)
- {
- System.err.println("Unable to Decode URL. Decoding skipped.");
- uee.printStackTrace();
- }
-
- // Remove the protocol, http:// ftp:// or similar from the front
- if (normalizedUrl.contains("://"))
- normalizedUrl = normalizedUrl.split(":/{2}")[1];
-
- // Room here to do more pre-processing
-
- return normalizedUrl;
- }
-
- /**
- * Goal is to get the different parts of the URL path. This can be used
- * in a classifier to help us determine if we are working with
- *
- * Reference:
- * http://stackoverflow.com/questions/10046178/pattern-matching-for-url-classification
- * @param url - Url to be tokenized
- * @return tokens - A String array of all the tokens
- */
- public static List<String> tokenizeURL(String url)
- {
- url = normalizeURL(url);
- // I assume that we're going to use the whole URL to find tokens in
- // If you want to just look in the GET parameters, or you want to ignore the domain
- // or you want to use the domain as a token itself, that would have to be
- // processed above the next line, and only the remaining parts split
- List<String> toReturn = new ArrayList<String>();
-
- // Split the URL by forward slashes. Most modern browsers will accept a URL
- // this malformed such as http://www.smashew.com/hello//how////are/you
- // hence the '+' in the regular expression.
- for(String part: url.split("/+"))
- toReturn.add(part.toLowerCase());
-
- // return our object.
- return toReturn;
-
- // One could alternatively use a more complex regex to remove more invalid matches
- // but this is subject to your (?:in)?ability to actually write the regex you want
-
- // These next two get rid of tokens that are too short, also.
-
- // Destroys anything that's not alphanumeric and things that are
- // alphanumeric but only 1 character long
- //String[] tokens = url.split("(?:[\\W_]+\\w)*[\\W_]+");
-
- // Destroys anything that's not alphanumeric and things that are
- // alphanumeric but only 1 or 2 characters long
- //String[] tokens = url.split("(?:[\\W_]+\\w{1,2})*[\\W_]+");
- }
-
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/56a395fa/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkUnwinderProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkUnwinderProcessor.java b/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkUnwinderProcessor.java
deleted file mode 100644
index 77134b9..0000000
--- a/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkUnwinderProcessor.java
+++ /dev/null
@@ -1,123 +0,0 @@
-package org.apache.streams.urls;
-
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.PropertyAccessor;
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.DeserializationContext;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.JsonMappingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
-import com.google.common.collect.Lists;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.jackson.StreamsJacksonModule;
-import org.apache.streams.urls.Link;
-import org.apache.streams.urls.LinkUnwinder;
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.pojo.json.Activity;
-import org.joda.time.DateTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.*;
-
-/**
- * References:
- * Some helpful references to help
- * Purpose URL
- * ------------- ----------------------------------------------------------------
- * [Status Codes] http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html
- * [Test Cases] http://greenbytes.de/tech/tc/httpredirects/
- * [t.co behavior] https://dev.twitter.com/docs/tco-redirection-behavior
- */
-
-public class LinkUnwinderProcessor implements StreamsProcessor
-{
- private final static String STREAMS_ID = "LinkUnwinderProcessor";
-
- private final static Logger LOGGER = LoggerFactory.getLogger(LinkUnwinderProcessor.class);
-
- private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-
- @Override
- public List<StreamsDatum> process(StreamsDatum entry) {
-
- List<StreamsDatum> result = Lists.newArrayList();
-
- LOGGER.debug("{} processing {}", STREAMS_ID, entry.getDocument().getClass());
-
- Activity activity;
-
- // get list of shared urls
- if( entry.getDocument() instanceof Activity) {
- activity = (Activity) entry.getDocument();
-
- activity.setLinks(unwind(activity.getLinks()));
-
- entry.setDocument(activity);
-
- result.add(entry);
-
- return result;
- } else if( entry.getDocument() instanceof String ) {
-
- try {
- activity = mapper.readValue((String) entry.getDocument(), Activity.class);
- } catch (Exception e) {
- e.printStackTrace();
- LOGGER.warn(e.getMessage());
- return(Lists.newArrayList(entry));
- }
-
- activity.setLinks(unwind(activity.getLinks()));
-
- try {
- entry.setDocument(mapper.writeValueAsString(activity));
- } catch (Exception e) {
- e.printStackTrace();
- LOGGER.warn(e.getMessage());
- return(Lists.newArrayList());
- }
-
- result.add(entry);
-
- return result;
-
- }
- else {
- //return(Lists.newArrayList(entry));
- return( Lists.newArrayList());
- }
- }
-
- @Override
- public void prepare(Object o) {
- }
-
- @Override
- public void cleanUp() {
-
- }
-
- private List<String> unwind(List<String> inputLinks) {
- List<String> outputLinks = Lists.newArrayList();
- for( String link : inputLinks ) {
- try {
- LinkUnwinder unwinder = new LinkUnwinder(link);
- unwinder.run();
- outputLinks.add(unwinder.getFinalURL());
- } catch (Exception e) {
- //if unwindable drop
- LOGGER.debug("Failed to unwind link : {}", link);
- LOGGER.debug("Exception unwinding link : {}", e);
- e.printStackTrace();
- }
- }
- return outputLinks;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/56a395fa/streams-contrib/streams-processor-urls/src/test/java/org/apache/streams/urls/TestLinkUnwinderProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-urls/src/test/java/org/apache/streams/urls/TestLinkUnwinderProcessor.java b/streams-contrib/streams-processor-urls/src/test/java/org/apache/streams/urls/TestLinkUnwinderProcessor.java
index c6ccf24..d659af1 100644
--- a/streams-contrib/streams-processor-urls/src/test/java/org/apache/streams/urls/TestLinkUnwinderProcessor.java
+++ b/streams-contrib/streams-processor-urls/src/test/java/org/apache/streams/urls/TestLinkUnwinderProcessor.java
@@ -6,13 +6,10 @@ import com.google.common.collect.Sets;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.jackson.StreamsJacksonModule;
import org.apache.streams.pojo.json.Activity;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Test;
import static org.junit.Assert.*;
import java.util.List;
-import java.util.Scanner;
/**
* Created by rebanks on 2/27/14.
@@ -64,7 +61,7 @@ public class TestLinkUnwinderProcessor {
Activity activity = new Activity();
activity.setLinks(input);
StreamsDatum datum = new StreamsDatum(activity);
- LinkUnwinderProcessor processor = new LinkUnwinderProcessor();
+ LinkResolverProcessor processor = new LinkResolverProcessor();
processor.prepare(null);
List<StreamsDatum> result = processor.process(datum);
assertNotNull(result);
@@ -87,7 +84,7 @@ public class TestLinkUnwinderProcessor {
activity.setLinks(input);
String str = mapper.writeValueAsString(activity);
StreamsDatum datum = new StreamsDatum(str);
- LinkUnwinderProcessor processor = new LinkUnwinderProcessor();
+ LinkResolverProcessor processor = new LinkResolverProcessor();
processor.prepare(null);
List<StreamsDatum> result = processor.process(datum);
assertNotNull(result);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/56a395fa/streams-contrib/streams-provider-twitter/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/pom.xml b/streams-contrib/streams-provider-twitter/pom.xml
index 93dc38b..3c27b8c 100644
--- a/streams-contrib/streams-provider-twitter/pom.xml
+++ b/streams-contrib/streams-provider-twitter/pom.xml
@@ -49,6 +49,11 @@
<artifactId>streams-config</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-util</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/56a395fa/streams-runtimes/streams-runtime-pig/pom.xml
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/pom.xml b/streams-runtimes/streams-runtime-pig/pom.xml
index 4cbf488..e62a7c8 100644
--- a/streams-runtimes/streams-runtime-pig/pom.xml
+++ b/streams-runtimes/streams-runtime-pig/pom.xml
@@ -28,6 +28,11 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>streams-runtime-pig</artifactId>
+ <properties>
+ <hadoop-client.version>2.0.0-cdh4.5.0.1-SNAPSHOT</hadoop-client.version>
+ <pig.version>0.11.0-cdh4.5.0.1-SNAPSHOT</pig.version>
+ </properties>
+
<dependencies>
<dependency>
<groupId>org.apache.streams</groupId>
@@ -74,20 +79,14 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
- <version>2.0.0-cdh4.5.0.1-SNAPSHOT</version>
- <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.pig</groupId>
<artifactId>pig</artifactId>
- <version>0.11.0-cdh4.5.0.1-SNAPSHOT</version>
- <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.pig</groupId>
<artifactId>pigunit</artifactId>
- <version>0.11.0-cdh4.5.0.1-SNAPSHOT</version>
- <scope>test</scope>
</dependency>
<dependency>
<groupId>jline</groupId>
@@ -103,6 +102,29 @@
</dependency>
</dependencies>
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>${hadoop-client.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.pig</groupId>
+ <artifactId>pig</artifactId>
+ <version>${pig.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.pig</groupId>
+ <artifactId>pigunit</artifactId>
+ <version>${pig.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
<build>
<sourceDirectory>src/main/java</sourceDirectory>
<testSourceDirectory>src/test/java</testSourceDirectory>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/56a395fa/streams-runtimes/streams-runtime-storm/pom.xml
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-storm/pom.xml b/streams-runtimes/streams-runtime-storm/pom.xml
index dd0105a..edaa760 100644
--- a/streams-runtimes/streams-runtime-storm/pom.xml
+++ b/streams-runtimes/streams-runtime-storm/pom.xml
@@ -30,7 +30,7 @@
<properties>
<storm.version>0.9.1-incubating</storm.version>
- <scala.version>0.9.1</scala.version>
+ <scala.version>2.9.2</scala.version>
<zkclient.version>0.4</zkclient.version>
</properties>
@@ -68,36 +68,6 @@
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
- <scope>compile</scope>
- <type>jar</type>
- </dependency>
- <dependency>
- <groupId>com.101tec</groupId>
- <artifactId>zkclient</artifactId>
- <scope>compile</scope>
- <exclusions>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- </dependencies>
-
- <dependencyManagement>
- <dependencies>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-core</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<scope>compile</scope>
<type>jar</type>
@@ -118,8 +88,7 @@
</exclusion>
</exclusions>
</dependency>
- </dependencies>
- </dependencyManagement>
+ </dependencies>
<build>
<sourceDirectory>src/main/java</sourceDirectory>