You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/04/17 22:28:16 UTC
[43/53] [abbrv] git commit: hdfs-pullarticles working
hdfs-pullarticles working
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/cc9bc046
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/cc9bc046
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/cc9bc046
Branch: refs/heads/master
Commit: cc9bc04683dcfb7d6150223f541e4e82fc40588c
Parents: 0c31066
Author: sblackmon <sb...@w2odigital.com>
Authored: Thu Apr 3 02:38:47 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Thu Apr 3 02:38:47 2014 -0500
----------------------------------------------------------------------
streams-contrib/pom.xml | 1 +
.../org/apache/streams/tika/LinkExpander.java | 12 +---
.../org/apache/streams/tika/TikaProcessor.java | 69 +++++++++++++-------
.../apache/streams/tika/BoilerPipeArticle.json | 7 --
4 files changed, 48 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cc9bc046/streams-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml
index 2d2d27c..a796dad 100644
--- a/streams-contrib/pom.xml
+++ b/streams-contrib/pom.xml
@@ -44,6 +44,7 @@
<module>streams-persist-hdfs</module>
<module>streams-persist-kafka</module>
<module>streams-persist-mongo</module>
+ <!--<module>streams-processor-lucene</module>-->
<module>streams-processor-tika</module>
<module>streams-processor-urls</module>
<module>streams-provider-datasift</module>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cc9bc046/streams-contrib/streams-processor-tika/src/main/java/org/apache/streams/tika/LinkExpander.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-tika/src/main/java/org/apache/streams/tika/LinkExpander.java b/streams-contrib/streams-processor-tika/src/main/java/org/apache/streams/tika/LinkExpander.java
index fe0e898..e4c0cef 100644
--- a/streams-contrib/streams-processor-tika/src/main/java/org/apache/streams/tika/LinkExpander.java
+++ b/streams-contrib/streams-processor-tika/src/main/java/org/apache/streams/tika/LinkExpander.java
@@ -50,14 +50,6 @@ public class LinkExpander extends LinkUnwinder
private BoilerPipeArticle article = new BoilerPipeArticle();
- // sblackmon: I put this here so I wouldn't get NullPointerExceptions when serializing results
- public TextBlock getContentTextBlock() {
- for(TextBlock textBlock : article.getTextBlocks())
- if(textBlock.isContent())
- return textBlock;
- return null;
- }
-
private static final Collection<String> AUTHOR_SEARCH = new ArrayList<String>() {{
add("og:author");
add("dc:author");
@@ -133,6 +125,9 @@ public class LinkExpander extends LinkUnwinder
expandLink();
}
+ public BoilerPipeArticle getArticle() {
+ return article;
+ }
private void expandLink()
{
@@ -186,7 +181,6 @@ public class LinkExpander extends LinkUnwinder
boilerpipeContentHandler,
rawMetaData);
- article.setTextBlocks(boilerpipeContentHandler.getTextDocument().getTextBlocks());
article.setBody(boilerpipeContentHandler.getTextDocument().getContent());
article.setTitle(boilerpipeContentHandler.getTextDocument().getTitle());
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cc9bc046/streams-contrib/streams-processor-tika/src/main/java/org/apache/streams/tika/TikaProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-tika/src/main/java/org/apache/streams/tika/TikaProcessor.java b/streams-contrib/streams-processor-tika/src/main/java/org/apache/streams/tika/TikaProcessor.java
index b2f337d..7609635 100644
--- a/streams-contrib/streams-processor-tika/src/main/java/org/apache/streams/tika/TikaProcessor.java
+++ b/streams-contrib/streams-processor-tika/src/main/java/org/apache/streams/tika/TikaProcessor.java
@@ -1,5 +1,6 @@
package org.apache.streams.tika;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule;
import com.google.common.collect.Lists;
@@ -39,38 +40,46 @@ public class TikaProcessor implements StreamsProcessor
LOGGER.debug("{} processing {}", STREAMS_ID, entry.getDocument().getClass());
+ Activity activity;
+
+ System.out.println( STREAMS_ID + " processing " + entry.getDocument().getClass());
// get list of shared urls
if( entry.getDocument() instanceof Activity) {
- Activity input = (Activity) entry.getDocument();
-
- List<String> outputLinks = input.getLinks();
- // for each
- for( String link : outputLinks ) {
- if( link instanceof String ) {
- // expand
- try {
- StreamsDatum outputDatum = expandLink((String) link, entry);
- result.add(outputDatum);
- } catch (Exception e) {
- //drop unexpandable links
- LOGGER.debug("Failed to expand link : {}", link);
- LOGGER.debug("Excpetion expanding link : {}", e);
- }
- }
- else {
- LOGGER.warn("Expected Links to be of type java.lang.String, but received {}", link.getClass().toString());
- }
- }
-
+ activity = (Activity) entry.getDocument();
}
else if(entry.getDocument() instanceof String) {
- StreamsDatum outputDatum = expandLink((String) entry.getDocument(), entry);
- result.add(outputDatum);
+
+ try {
+ activity = mapper.readValue((String) entry.getDocument(), Activity.class);
+ } catch (Exception e) {
+ e.printStackTrace();
+ LOGGER.warn(e.getMessage());
+ return(Lists.newArrayList(entry));
+ }
+
}
else throw new NotImplementedException();
+ List<String> outputLinks = activity.getLinks();
+ // for each
+ for( String link : outputLinks ) {
+
+ System.out.println( "pulling " + link);
+
+ try {
+ StreamsDatum outputDatum = expandLink(link, entry);
+ if( outputDatum != null )
+ result.add(outputDatum);
+ } catch (Exception e) {
+ //drop unexpandable links
+ LOGGER.debug("Failed to expand link : {}", link);
+ LOGGER.debug("Excpetion expanding link : {}", e);
+ }
+
+ }
+
return result;
}
@@ -80,9 +89,19 @@ public class TikaProcessor implements StreamsProcessor
expander.run();
StreamsDatum datum = null;
if(input.getId() == null)
- datum = new StreamsDatum(this.mapper.convertValue(expander, JSONObject.class).toString(), expander.getFinalURL());
+ try {
+ datum = new StreamsDatum(this.mapper.writeValueAsString(expander.getArticle()), expander.getFinalURL());
+ } catch (JsonProcessingException e) {
+ e.printStackTrace();
+ return null;
+ }
else
- datum = new StreamsDatum(this.mapper.convertValue(expander, JSONObject.class).toString(), input.getId());
+ try {
+ datum = new StreamsDatum(this.mapper.writeValueAsString(expander.getArticle()), input.getId());
+ } catch (JsonProcessingException e) {
+ e.printStackTrace();
+ return null;
+ }
datum.setSequenceid(input.getSequenceid());
datum.setMetadata(input.getMetadata());
datum.setTimestamp(input.getTimestamp());
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cc9bc046/streams-contrib/streams-processor-tika/src/main/jsonschema/org/apache/streams/tika/BoilerPipeArticle.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-tika/src/main/jsonschema/org/apache/streams/tika/BoilerPipeArticle.json b/streams-contrib/streams-processor-tika/src/main/jsonschema/org/apache/streams/tika/BoilerPipeArticle.json
index a23b13e..137a4be 100644
--- a/streams-contrib/streams-processor-tika/src/main/jsonschema/org/apache/streams/tika/BoilerPipeArticle.json
+++ b/streams-contrib/streams-processor-tika/src/main/jsonschema/org/apache/streams/tika/BoilerPipeArticle.json
@@ -50,13 +50,6 @@
}
}
},
- "textBlocks": {
- "type": "array",
- "items": {
- "javaType": "de.l3s.boilerpipe.document.TextBlock",
- "type": "object"
- }
- },
"keywords": {
"type": "array",
"uniqueItems": true,