You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/04/17 22:28:16 UTC

[43/53] [abbrv] git commit: hdfs-pullarticles working

hdfs-pullarticles working


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/cc9bc046
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/cc9bc046
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/cc9bc046

Branch: refs/heads/master
Commit: cc9bc04683dcfb7d6150223f541e4e82fc40588c
Parents: 0c31066
Author: sblackmon <sb...@w2odigital.com>
Authored: Thu Apr 3 02:38:47 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Thu Apr 3 02:38:47 2014 -0500

----------------------------------------------------------------------
 streams-contrib/pom.xml                         |  1 +
 .../org/apache/streams/tika/LinkExpander.java   | 12 +---
 .../org/apache/streams/tika/TikaProcessor.java  | 69 +++++++++++++-------
 .../apache/streams/tika/BoilerPipeArticle.json  |  7 --
 4 files changed, 48 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cc9bc046/streams-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml
index 2d2d27c..a796dad 100644
--- a/streams-contrib/pom.xml
+++ b/streams-contrib/pom.xml
@@ -44,6 +44,7 @@
         <module>streams-persist-hdfs</module>
         <module>streams-persist-kafka</module>
         <module>streams-persist-mongo</module>
+        <!--<module>streams-processor-lucene</module>-->
         <module>streams-processor-tika</module>
         <module>streams-processor-urls</module>
         <module>streams-provider-datasift</module>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cc9bc046/streams-contrib/streams-processor-tika/src/main/java/org/apache/streams/tika/LinkExpander.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-tika/src/main/java/org/apache/streams/tika/LinkExpander.java b/streams-contrib/streams-processor-tika/src/main/java/org/apache/streams/tika/LinkExpander.java
index fe0e898..e4c0cef 100644
--- a/streams-contrib/streams-processor-tika/src/main/java/org/apache/streams/tika/LinkExpander.java
+++ b/streams-contrib/streams-processor-tika/src/main/java/org/apache/streams/tika/LinkExpander.java
@@ -50,14 +50,6 @@ public class LinkExpander extends LinkUnwinder
 
     private BoilerPipeArticle article = new BoilerPipeArticle();
 
-    // sblackmon: I put this here so I wouldn't get NullPointerExceptions when serializing results
-    public TextBlock getContentTextBlock() {
-        for(TextBlock textBlock : article.getTextBlocks())
-            if(textBlock.isContent())
-                return textBlock;
-        return null;
-    }
-
     private static final Collection<String> AUTHOR_SEARCH = new ArrayList<String>() {{
         add("og:author");
         add("dc:author");
@@ -133,6 +125,9 @@ public class LinkExpander extends LinkUnwinder
         expandLink();
     }
 
+    public BoilerPipeArticle getArticle() {
+        return article;
+    }
 
     private void expandLink()
     {
@@ -186,7 +181,6 @@ public class LinkExpander extends LinkUnwinder
                 boilerpipeContentHandler,
                 rawMetaData);
 
-        article.setTextBlocks(boilerpipeContentHandler.getTextDocument().getTextBlocks());
         article.setBody(boilerpipeContentHandler.getTextDocument().getContent());
         article.setTitle(boilerpipeContentHandler.getTextDocument().getTitle());
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cc9bc046/streams-contrib/streams-processor-tika/src/main/java/org/apache/streams/tika/TikaProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-tika/src/main/java/org/apache/streams/tika/TikaProcessor.java b/streams-contrib/streams-processor-tika/src/main/java/org/apache/streams/tika/TikaProcessor.java
index b2f337d..7609635 100644
--- a/streams-contrib/streams-processor-tika/src/main/java/org/apache/streams/tika/TikaProcessor.java
+++ b/streams-contrib/streams-processor-tika/src/main/java/org/apache/streams/tika/TikaProcessor.java
@@ -1,5 +1,6 @@
 package org.apache.streams.tika;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule;
 import com.google.common.collect.Lists;
@@ -39,38 +40,46 @@ public class TikaProcessor implements StreamsProcessor
 
         LOGGER.debug("{} processing {}", STREAMS_ID, entry.getDocument().getClass());
 
+        Activity activity;
+
+        System.out.println( STREAMS_ID + " processing " + entry.getDocument().getClass());
         // get list of shared urls
         if( entry.getDocument() instanceof Activity) {
 
-            Activity input = (Activity) entry.getDocument();
-
-            List<String> outputLinks = input.getLinks();
-            // for each
-            for( String link : outputLinks ) {
-                if( link instanceof String ) {
-                    // expand
-                    try {
-                        StreamsDatum outputDatum = expandLink((String) link, entry);
-                        result.add(outputDatum);
-                    } catch (Exception e) {
-                        //drop unexpandable links
-                        LOGGER.debug("Failed to expand link : {}", link);
-                        LOGGER.debug("Excpetion expanding link : {}", e);
-                    }
-                }
-                else {
-                    LOGGER.warn("Expected Links to be of type java.lang.String, but received {}", link.getClass().toString());
-                }
-            }
-
+            activity = (Activity) entry.getDocument();
 
         }
         else if(entry.getDocument() instanceof String) {
-            StreamsDatum outputDatum = expandLink((String) entry.getDocument(), entry);
-            result.add(outputDatum);
+
+            try {
+                activity = mapper.readValue((String) entry.getDocument(), Activity.class);
+            } catch (Exception e) {
+                e.printStackTrace();
+                LOGGER.warn(e.getMessage());
+                return(Lists.newArrayList(entry));
+            }
+
         }
         else throw new NotImplementedException();
 
+        List<String> outputLinks = activity.getLinks();
+        // for each
+        for( String link : outputLinks ) {
+
+            System.out.println( "pulling " + link);
+
+            try {
+                StreamsDatum outputDatum = expandLink(link, entry);
+                if( outputDatum != null )
+                    result.add(outputDatum);
+            } catch (Exception e) {
+                //drop unexpandable links
+                LOGGER.debug("Failed to expand link : {}", link);
+                LOGGER.debug("Excpetion expanding link : {}", e);
+            }
+
+        }
+
         return result;
     }
 
@@ -80,9 +89,19 @@ public class TikaProcessor implements StreamsProcessor
         expander.run();
         StreamsDatum datum = null;
         if(input.getId() == null)
-            datum = new StreamsDatum(this.mapper.convertValue(expander, JSONObject.class).toString(), expander.getFinalURL());
+            try {
+                datum = new StreamsDatum(this.mapper.writeValueAsString(expander.getArticle()), expander.getFinalURL());
+            } catch (JsonProcessingException e) {
+                e.printStackTrace();
+                return null;
+            }
         else
-            datum = new StreamsDatum(this.mapper.convertValue(expander, JSONObject.class).toString(), input.getId());
+            try {
+                datum = new StreamsDatum(this.mapper.writeValueAsString(expander.getArticle()), input.getId());
+            } catch (JsonProcessingException e) {
+                e.printStackTrace();
+                return null;
+            }
         datum.setSequenceid(input.getSequenceid());
         datum.setMetadata(input.getMetadata());
         datum.setTimestamp(input.getTimestamp());

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cc9bc046/streams-contrib/streams-processor-tika/src/main/jsonschema/org/apache/streams/tika/BoilerPipeArticle.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-tika/src/main/jsonschema/org/apache/streams/tika/BoilerPipeArticle.json b/streams-contrib/streams-processor-tika/src/main/jsonschema/org/apache/streams/tika/BoilerPipeArticle.json
index a23b13e..137a4be 100644
--- a/streams-contrib/streams-processor-tika/src/main/jsonschema/org/apache/streams/tika/BoilerPipeArticle.json
+++ b/streams-contrib/streams-processor-tika/src/main/jsonschema/org/apache/streams/tika/BoilerPipeArticle.json
@@ -50,13 +50,6 @@
                 }
             }
         },
-        "textBlocks": {
-            "type": "array",
-            "items": {
-                "javaType": "de.l3s.boilerpipe.document.TextBlock",
-                "type": "object"
-            }
-        },
         "keywords": {
             "type": "array",
             "uniqueItems": true,