You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/04/07 18:57:28 UTC

git commit: pushing progress on lucene and storm modules

Repository: incubator-streams
Updated Branches:
  refs/heads/springcleaning 10923617e -> 27e67162f


pushing progress on lucene and storm modules


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/27e67162
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/27e67162
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/27e67162

Branch: refs/heads/springcleaning
Commit: 27e67162f260926125b70a4b92ce5644923a6138
Parents: 1092361
Author: sblackmon <sb...@w2odigital.com>
Authored: Mon Apr 7 10:57:04 2014 -0600
Committer: sblackmon <sb...@w2odigital.com>
Committed: Mon Apr 7 10:57:04 2014 -0600

----------------------------------------------------------------------
 pom.xml                                         |   1 -
 .../streams-processor-lucene/pom.xml            | 139 ++++++++
 .../lucene/LuceneSimpleTaggingProcessor.java    | 336 +++++++++++++++++++
 .../lucene/LuceneTaggerConfiguration.json       |  26 ++
 .../lucene/TestLucenSimpleTaggingProcessor.java | 148 ++++++++
 .../src/test/resources/TestTags.tsv             |   2 +
 streams-runtimes/streams-runtime-storm/pom.xml  |  47 ++-
 .../streams/storm/PipelineGraphLauncher.java    | 196 -----------
 .../storm/trident/StreamsProcessorFunction.java |  54 +++
 .../storm/trident/StreamsProviderSpout.java     |   2 +-
 10 files changed, 746 insertions(+), 205 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/27e67162/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 01f2c2d..1afae06 100644
--- a/pom.xml
+++ b/pom.xml
@@ -83,7 +83,6 @@
         <guava.version>16.0.1</guava.version>
         <scala.version>2.8.0</scala.version>
         <clojure.version>1.4.0</clojure.version>
-        <storm.version>0.9.1-incubating</storm.version>
         <kafka.version>0.8.1</kafka.version>
         <zookeeper.version>3.4.5-cdh4.5.0</zookeeper.version>
         <netty.version>3.8.0.Final</netty.version>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/27e67162/streams-contrib/streams-processor-lucene/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-lucene/pom.xml b/streams-contrib/streams-processor-lucene/pom.xml
new file mode 100644
index 0000000..896744a
--- /dev/null
+++ b/streams-contrib/streams-processor-lucene/pom.xml
@@ -0,0 +1,139 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>streams-processor-lucene</artifactId>
+    <version>0.1-SNAPSHOT</version>
+
+    <parent>
+        <groupId>org.apache.streams</groupId>
+        <artifactId>streams-contrib</artifactId>
+        <version>0.1-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <properties>
+        <!-- This is the Version Number for the Apache Streams Project -->
+        <streams.version>0.1-SNAPSHOT</streams.version>
+    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-config</artifactId>
+            <version>${streams.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-core</artifactId>
+            <version>${streams.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-pojo</artifactId>
+            <version>${streams.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-annotations</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.jsonschema2pojo</groupId>
+            <artifactId>jsonschema2pojo-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.jayway.jsonpath</groupId>
+            <artifactId>json-path</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.jayway.jsonpath</groupId>
+            <artifactId>json-path-assert</artifactId>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <sourceDirectory>src/main/java</sourceDirectory>
+        <testSourceDirectory>src/test/java</testSourceDirectory>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+            </resource>
+        </resources>
+        <testResources>
+            <testResource>
+                <directory>src/test/resources</directory>
+            </testResource>
+        </testResources>
+    <plugins>
+        <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>build-helper-maven-plugin</artifactId>
+            <executions>
+                <execution>
+                    <id>add-source</id>
+                    <phase>generate-sources</phase>
+                    <goals>
+                        <goal>add-source</goal>
+                    </goals>
+                    <configuration>
+                        <sources>
+                            <source>target/generated-sources/jsonschema2pojo/**/*.java</source>
+                        </sources>
+                    </configuration>
+                </execution>
+                <execution>
+                    <id>add-source-jaxb2</id>
+                    <phase>generate-sources</phase>
+                    <goals>
+                        <goal>add-source</goal>
+                    </goals>
+                    <configuration>
+                        <sources>
+                            <source>target/generated-sources/jaxb2</source>
+                        </sources>
+                    </configuration>
+                </execution>
+            </executions>
+        </plugin>
+        <plugin>
+            <groupId>org.jsonschema2pojo</groupId>
+            <artifactId>jsonschema2pojo-maven-plugin</artifactId>
+            <configuration>
+                <addCompileSourceRoot>true</addCompileSourceRoot>
+                <generateBuilders>true</generateBuilders>
+                <sourcePaths>
+                    <sourcePath>src/main/jsonschema/org/apache/streams/lucene/LuceneTaggerConfiguration.json</sourcePath>
+                </sourcePaths>
+                <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
+                <targetPackage>org.apache.streams.lucene</targetPackage>
+                <useLongIntegers>true</useLongIntegers>
+                <useJodaDates>true</useJodaDates>
+            </configuration>
+            <executions>
+                <execution>
+                    <goals>
+                        <goal>generate</goal>
+                    </goals>
+                </execution>
+            </executions>
+        </plugin>
+    </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/27e67162/streams-contrib/streams-processor-lucene/src/main/java/org/apache/streams/lucene/LuceneSimpleTaggingProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-lucene/src/main/java/org/apache/streams/lucene/LuceneSimpleTaggingProcessor.java b/streams-contrib/streams-processor-lucene/src/main/java/org/apache/streams/lucene/LuceneSimpleTaggingProcessor.java
new file mode 100644
index 0000000..d6d9cd5
--- /dev/null
+++ b/streams-contrib/streams-processor-lucene/src/main/java/org/apache/streams/lucene/LuceneSimpleTaggingProcessor.java
@@ -0,0 +1,336 @@
+package org.apache.streams.lucene;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.inject.Injector;
+import com.jayway.jsonpath.InvalidPathException;
+import com.jayway.jsonpath.JsonPath;
+import com.w2olabs.core.graph.CommunityRepository;
+import com.w2olabs.core.graph.CommunityRepositoryResolver;
+import com.w2olabs.core.graph.entities.Entity;
+import com.w2olabs.streams.pojo.W2OActivity;
+import com.w2olabs.util.guice.GuiceInjector;
+import com.w2olabs.util.tagging.SimpleVerbatim;
+import com.w2olabs.util.tagging.engines.international.LanguageTag;
+import com.w2olabs.util.tagging.engines.international.TaggingEngineLanguage;
+import org.apache.avro.data.Json;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * References:
+ * Some helpful references to help
+ * Purpose              URL
+ * -------------        ----------------------------------------------------------------
+ * [Status Codes]       http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html
+ * [Test Cases]         http://greenbytes.de/tech/tc/httpredirects/
+ * [t.co behavior]      https://dev.twitter.com/docs/tco-redirection-behavior
+ */
+
+public class LuceneSimpleTaggingProcessor implements StreamsProcessor
+{
+    private final static String STREAMS_ID = "LuceneSimpleTaggingProcessor";
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(LuceneSimpleTaggingProcessor.class);
+
+    private ObjectMapper mapper;
+
+    private Queue<StreamsDatum> inQueue;
+    private Queue<StreamsDatum> outQueue;
+
+    private String community;
+    private JsonPath[] textPaths;
+    private String[] jsonPathsToText;
+    private List<LanguageTag> tags;
+    private String metaDataKey;
+
+    private static TaggingEngineLanguage<LanguageTag, SimpleVerbatim> taggingEngine;
+
+    /**
+     * Constructor for a tagging processor that will operate on the document of StreamsDatum at the paths noted by
+     * the json paths paramter
+     * @param community
+     * @param jsonPathsToText
+     */
+    public LuceneSimpleTaggingProcessor(String community, String[] jsonPathsToText) {
+        this(community, jsonPathsToText, null, null);
+    }
+
+    /**
+     * Constructor for a tagging processor that will operate on some meta data field of the StreamsDatum indicated by the
+     * meta data key.  The data in the meta data field is still expected to be json or able to be converted to json or
+     * a list of such data.
+     * @param community
+     * @param jsonPathsToText
+     * @param metaDataKey
+     */
+    public LuceneSimpleTaggingProcessor(String community, String[] jsonPathsToText, String metaDataKey) {
+        this(community, jsonPathsToText, metaDataKey, null);
+    }
+
+
+    /**
+     * For testing purposes. LanguageTag are not serializable and need to be set through the community resolver in production.
+     * @param community
+     * @param jsonPathsToText
+     * @param tags
+     */
+    public LuceneSimpleTaggingProcessor(String community, String[] jsonPathsToText, String metaDataKey, List<LanguageTag> tags) {
+        this.community = community;
+        this.jsonPathsToText = jsonPathsToText;
+        this.tags = tags;
+        this.metaDataKey = metaDataKey;
+        verifyJsonPathstoText(this.jsonPathsToText);
+    }
+
+//    public LuceneSimpleTaggingProcessor(Queue<StreamsDatum> inQueue) {
+//        this.inQueue = inQueue;
+//        this.outQueue = new LinkedBlockingQueue<StreamsDatum>();
+//    }
+
+//    public void stop() {
+//
+//    }
+
+    public String getCommunity() {
+        return community;
+    }
+
+    public void setCommunity(String community) {
+        this.community = community;
+    }
+
+    @Override
+    public List<StreamsDatum> process(StreamsDatum entry) {
+
+
+        LOGGER.debug("{} processing {}", STREAMS_ID, entry.getDocument().getClass());
+
+        List<StreamsDatum> result = Lists.newArrayList();
+
+        List<String> jsons = Lists.newLinkedList();
+        ObjectNode node;
+        // first check for valid json
+        if(this.metaDataKey == null)
+            jsons.add(getJson(entry.getDocument()));
+        else
+            getMetaDataJsons(entry, jsons);
+
+        for(String json : jsons) {
+            try {
+                node = (ObjectNode) mapper.readTree(json);
+            } catch (IOException e) {
+                e.printStackTrace();
+                return result;
+            }
+
+            List<SimpleVerbatim> verbatimList = convertEntryToWorkUnit(json);
+            Map<SimpleVerbatim, List<LanguageTag>> objectTags = taggingEngine.findMatches(verbatimList);
+            Set<String> tagSet = Sets.newHashSet();
+            for( List<LanguageTag> fieldtags : objectTags.values() ) {
+                for( LanguageTag tag : fieldtags ) {
+                    tagSet.add(tag.getTag());
+                }
+            }
+
+            ArrayNode tagArray = JsonNodeFactory.instance.arrayNode();
+            Set<String> tags = Sets.newHashSet();
+            for( String tag : tagSet ) {
+                if(tags.add(tag)){
+                    tagArray.add(tag);
+                }
+            }
+
+
+
+            // need utility methods for get / create specific node
+            ObjectNode extensions = (ObjectNode) node.get("extensions");
+            if(extensions == null) {
+                extensions = JsonNodeFactory.instance.objectNode();
+                node.put("extensions", extensions);
+            }
+            ObjectNode w2o = (ObjectNode) extensions.get("w2o");
+            if(w2o == null) {
+                w2o = JsonNodeFactory.instance.objectNode();
+                extensions.put("w2o", w2o);
+            }
+            w2o.put("tags", tagArray);
+            w2o.put("contentTags", tagArray);
+            if(entry.getDocument() instanceof W2OActivity) {
+                entry.setDocument(mapper.convertValue(node, W2OActivity.class));
+            }
+            else if(entry.getDocument() instanceof Activity) {
+                entry.setDocument(mapper.convertValue(node, Activity.class));
+            }
+            else if(entry.getDocument() instanceof String) {
+                try {
+                    entry.setDocument(mapper.writeValueAsString(node));
+                } catch (JsonProcessingException jpe) {
+                    LOGGER.error("Exception while converting ObjectNode to string. Outputing as ObjectNode. {}", jpe);
+                    entry.setDocument(node);
+                }
+            }
+            else {
+                entry.setDocument(node);
+            }
+            result.add(entry);
+        }
+        return result;
+    }
+
+    private void getMetaDataJsons(StreamsDatum datum, List<String> jsons) {
+       if(datum.getMetadata() == null)
+           return;
+        Object obj = datum.getMetadata().get(this.metaDataKey);
+        if(obj == null) {
+            LOGGER.debug("Object at key={} was NULL.", this.metaDataKey);
+            return;
+        }
+        if(obj instanceof List) {
+            List list = (List) obj;
+            String json;
+            for(Object o : list) {
+                json = getJson(o);
+                if(json != null) {
+                    jsons.add(json);
+                }
+            }
+        }
+        else {
+            String json = getJson(obj);
+            if(json != null) {
+                jsons.add(json);
+            }
+        }
+    }
+
+    private String getJson(Object object) {
+        String json = null;
+        if( object instanceof String ) {
+            json = (String) object;
+        } else if(object instanceof Activity){
+            try {
+                json = mapper.writeValueAsString(object);
+            } catch (JsonProcessingException jpe) {
+                json = null;
+                LOGGER.error("Failed to convert Activity to String : {}", jpe);
+            }
+        } else {
+            ObjectNode node = (ObjectNode) object;
+            json = node.asText();
+        }
+        return json;
+    }
+
+    @Override
+    public void prepare(Object o) {
+        mapper = StreamsJacksonMapper.getInstance();
+        if(this.tags == null) {
+            resolver = injector.getInstance(CommunityRepositoryResolver.class);
+            repo = resolver.get(community);
+            List<Entity> entities = repo.getTaggableEntities();
+            taggingEngine = new TaggingEngineLanguage<LanguageTag, SimpleVerbatim>(createLanguageTagsFromRexsterTags(entities));
+        }
+        else {
+            taggingEngine = new TaggingEngineLanguage<LanguageTag, SimpleVerbatim>(this.tags);
+        }
+        compileTextJsonPaths();
+    }
+
+    @Override
+    public void cleanUp() {
+
+    }
+
+    private static List<LanguageTag> createLanguageTagsFromRexsterTags(List<Entity> graphTags) {
+        List<LanguageTag> result = new ArrayList<LanguageTag>(graphTags.size());
+        LOGGER.info("Attempting to convert {} Graph tags into Language tags.", graphTags.size());
+        String tagLang;
+        for(Entity tag : graphTags) {
+//            net.sf.json.JSONObject json = null;
+            try {
+//                json = (net.sf.json.JSONObject) tag;
+                tagLang = tag.getAdditionalProperties().get("language") == null ? "en" : tag.getAdditionalProperties().get("language").toString();
+            } catch (Exception e) {
+                tagLang = "en";
+            }
+            LOGGER.debug("Tag : {}\n{}\n{}", new String[] {tag.getIdentifier(),tag.getQuery(),tagLang});
+            if(TaggingEngineLanguage.SUPPORTED.contains(tagLang)) {
+                try {
+                    result.add(new LanguageTag(tag.getIdentifier(),tag.getQuery(), tagLang));
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    //result.add(new LanguageTag(tag.getIdentifier(), tag.getQuery(), tagLang));
+                }
+            }
+            else {
+                LOGGER.warn("Attempted to load a tag for Language, {}, but that language is not currently supported. SimpleTag is being ignored!");
+            }
+        }
+        LOGGER.info("Loaded {} Language Tags. {} tags were not supported.", result.size(), graphTags.size() - result.size());
+        return result;
+    }
+
+    // Does basic verification that paths are not null and in JsonPath syntax.
+    private String[] verifyJsonPathstoText(String[] jsonPathsToText) {
+        RuntimeException e = null;
+        for(String path : jsonPathsToText) {
+            if(StringUtils.isEmpty(path) || !path.matches("[$][.][a-zA-z0-9.]+")) {
+                LOGGER.error("Invalid JsonPath path : {}", path);
+                e = new RuntimeException("Invalid JsonPath paths!");
+            }
+        }
+        if(e != null)
+            throw e;
+        return jsonPathsToText;
+    }
+
+    // Compiles jsonPathToText to JsonPath
+    private void compileTextJsonPaths() {
+        this.textPaths = new JsonPath[this.jsonPathsToText.length];
+        for(int i=0; i < this.jsonPathsToText.length; ++i) {
+            this.textPaths[i] = JsonPath.compile(this.jsonPathsToText[i]);
+        }
+    }
+
+    private List<SimpleVerbatim> convertEntryToWorkUnit(String json) {
+        List<SimpleVerbatim> textFields = new ArrayList<SimpleVerbatim>();
+        for(JsonPath path : this.textPaths) {
+            try {
+                Object pathObject = path.read(json);
+                if( pathObject instanceof String )
+                    textFields.add(new SimpleVerbatim((String) pathObject) );
+                else if( pathObject instanceof List ) {
+                    List<String> pathObjectList = (List<String>) pathObject;
+                    for( String pathItem : pathObjectList ) {
+                        textFields.add(new SimpleVerbatim(pathItem) );
+                    }
+                }
+            } catch( InvalidPathException x ) {
+                LOGGER.debug("{}: {}", x.getMessage(), path.getPath());
+            } catch( ClassCastException x ) {
+                LOGGER.warn(x.getMessage());
+            }
+        }
+
+        if(textFields.size() == 0)
+            return null;
+
+        return textFields;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/27e67162/streams-contrib/streams-processor-lucene/src/main/jsonschema/org/apache/streams/lucene/LuceneTaggerConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-lucene/src/main/jsonschema/org/apache/streams/lucene/LuceneTaggerConfiguration.json b/streams-contrib/streams-processor-lucene/src/main/jsonschema/org/apache/streams/lucene/LuceneTaggerConfiguration.json
new file mode 100644
index 0000000..19fcd76
--- /dev/null
+++ b/streams-contrib/streams-processor-lucene/src/main/jsonschema/org/apache/streams/lucene/LuceneTaggerConfiguration.json
@@ -0,0 +1,26 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "id": "#",
+    "javaType" : "org.apache.streams.lucene.LuceneTaggerConfiguration",
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "tags": {
+            "type": "array",
+            "items": {
+                "tag": {
+                    "type": "string",
+                    "description": "Tag Identifier"
+                },
+                "query": {
+                    "type": "string",
+                    "description": "Lucene Query"
+                },
+                "language": {
+                    "type": "string",
+                    "description": "Language"
+                }
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/27e67162/streams-contrib/streams-processor-lucene/src/test/java/org/apache/streams/lucene/TestLucenSimpleTaggingProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-lucene/src/test/java/org/apache/streams/lucene/TestLucenSimpleTaggingProcessor.java b/streams-contrib/streams-processor-lucene/src/test/java/org/apache/streams/lucene/TestLucenSimpleTaggingProcessor.java
new file mode 100644
index 0000000..937f4c4
--- /dev/null
+++ b/streams-contrib/streams-processor-lucene/src/test/java/org/apache/streams/lucene/TestLucenSimpleTaggingProcessor.java
@@ -0,0 +1,148 @@
+package org.apache.streams.lucene;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import com.jayway.jsonpath.JsonPath;
+import com.w2olabs.streams.pojo.W2OActivity;
+import com.w2olabs.util.tagging.engines.international.LanguageTag;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.pojo.json.Activity;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import java.util.List;
+import java.util.Scanner;
+
+/**
+ * Created by rebanks on 3/18/14.
+ */
+public class TestLucenSimpleTaggingProcessor {
+
+    private static final String ACTIVITY_JSON = "{\"id\":\"id:twitter:post:410898682381615105\",\"actor\":{\"id\":\"91407775\",\"displayName\":\"winchester_ky\",\"attachments\":[],\"upstreamDuplicates\":[],\"downstreamDuplicates\":[]},\"verb\":\"post\",\"object\":{\"id\":\"id:twitter:410898682381615105\",\"objectType\":\"tweet\",\"attachments\":[],\"upstreamDuplicates\":[],\"downstreamDuplicates\":[]},\"published\":1386800854000,\"provider\":{\"id\":\"id:providers:twitter\",\"attachments\":[],\"upstreamDuplicates\":[],\"downstreamDuplicates\":[]},\"title\":\"\",\"content\":\"Men's Basketball baseball soccer Single-Game Tickets Available - A limited number of tickets remain for Kentucky's upcoming men's ... http://t.co/SH5YZGpdRx\",\"links\":[\"http://ow.ly/2C2XL1\"],\"extensions\":{\"twitter\":{\"created_at\":\"Wed Dec 11 22:27:34 +0000 2013\",\"id\":410898682381615105,\"id_str\":\"410898682381615105\",\"text\":\"Men's Basketball Single-Game Tickets Available - A limited number of t
 ickets remain for Kentucky's upcoming men's ... http://t.co/SH5YZGpdRx\",\"source\":\"<a href=\\\"http://www.hootsuite.com\\\" rel=\\\"nofollow\\\">HootSuite</a>\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":91407775,\"id_str\":\"91407775\",\"name\":\"Winchester, KY\",\"screen_name\":\"winchester_ky\",\"location\":\"\",\"url\":null,\"description\":null,\"protected\":false,\"followers_count\":136,\"friends_count\":0,\"listed_count\":1,\"created_at\":\"Fri Nov 20 19:29:02 +0000 2009\",\"favourites_count\":0,\"utc_offset\":null,\"time_zone\":null,\"geo_enabled\":false,\"verified\":false,\"statuses_count\":1793,\"lang\":\"en\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C0DEED\",\"profile_background_image_url\":\"http://abs.twimg.com/images/themes/theme1/bg.png\",\"profile_background_image_url_
 https\":\"https://abs.twimg.com/images/themes/theme1/bg.png\",\"profile_background_tile\":false,\"profile_image_url\":\"http://pbs.twimg.com/profile_images/613854495/winchester_sociallogo_normal.jpg\",\"profile_image_url_https\":\"https://pbs.twimg.com/profile_images/613854495/winchester_sociallogo_normal.jpg\",\"profile_link_color\":\"0084B4\",\"profile_sidebar_border_color\":\"C0DEED\",\"profile_sidebar_fill_color\":\"DDEEF6\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"default_profile\":true,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[{\"url\":\"http://t.co/SH5YZGpdRx\",\"expanded_url\":\"http://ow.ly/2C2XL1\",\"display_url\":\"ow.ly/2C2XL1\",\"indices\":[118,140]}],\"user_mentions\":[]},\"favorited\":false,\"retweeted\":false,
 \"possibly_sensitive\":false,\"filter_level\":\"medium\",\"lang\":\"en\"},\"location\":{\"id\":\"id:twitter:410898682381615105\",\"coordinates\":null}}}\n";
+    private static final String W2O_ACTIVITY_JSON = "{\"extensions\":{\"w2o\":{\"provider\":\"twitter\",\"analyzer\":\"romance_analyzer\",\"tags\":[\"brand-cascade\",\"language-en\",\"country-ca\"],\"contentTags\":[],\"linkTags\":[],\"lang\":{\"primaryLanguage\":\"en\"}},\"twitter\":{\"retweetCount\":0,\"entities\":{\"symbols\":[],\"urls\":[{\"expanded_url\":\"http://bit.ly/hUmoRz\",\"indices\":[77,99],\"display_url\":\"bit.ly/hUmoRz\",\"url\":\"http://t.co/Ytn45Pbttk\"}],\"hashtags\":[{\"text\":\"SmurfsVillage\",\"indices\":[100,114]}],\"user_mentions\":[{\"id\":188075479,\"name\":\"Beeline Interactive\",\"indices\":[115,128],\"screen_name\":\"BeelineGames\",\"id_str\":\"188075479\"}]}},\"gnip\":{\"matching_rules\":[{\"tag\":\"cascade_CA_CA_en\"}],\"klout_score\":10,\"urls\":[{\"expanded_url\":\"https://itunes.apple.com/us/app/smurfs-village/id399648212?mt=8\",\"url\":\"http://t.co/Ytn45Pbttk\"}],\"klout_profile\":{\"topics\":[],\"klout_user_id\":\"257268143479895040\",\"link\":\"h
 ttp://klout.com/user/id/257268143479895040\"},\"language\":{\"value\":\"fr\"}}},\"id\":\"tag:search.twitter.com,2005:372802927385403392\",\"actor\":{\"id\":\"id:twitter.com:583891967\",\"image\":{\"url\":\"https://si0.twimg.com/sticky/default_profile_images/default_profile_1_normal.png\"},\"displayName\":\"Sabine Chappuis\",\"objectType\":\"person\",\"attachments\":[],\"upstreamDuplicates\":[],\"downstreamDuplicates\":[],\"twitterTimeZone\":\"Brussels\",\"friendsCount\":6,\"favoritesCount\":0,\"link\":\"http://www.twitter.com/spoffff\",\"postedTime\":\"2012-05-18T15:14:35.000Z\",\"links\":[{\"rel\":\"me\",\"href\":null}],\"listedCount\":0,\"languages\":[\"fr\"],\"verified\":false,\"utcOffset\":\"7200\",\"followersCount\":0,\"preferredUsername\":\"spoffff\",\"statusesCount\":87},\"verb\":\"post\",\"object\":{\"id\":\"object:search.twitter.com,2005:372802927385403392\",\"summary\":\"Le Grand Schtroumpf confirme que la cascade magique n'est \\\"Plus tr?�s loin.\\\" http://t.co/Ytn45P
 bttk #SmurfsVillage @BeelineGames\",\"objectType\":\"note\",\"attachments\":[],\"upstreamDuplicates\":[],\"downstreamDuplicates\":[],\"link\":\"http://twitter.com/spoffff/statuses/372802927385403392\",\"postedTime\":\"2013-08-28T19:28:38.000Z\"},\"published\":1377718118000,\"generator\":{\"id\":\"{link}\",\"displayName\":\"Smurfs' Village on iOS\",\"attachments\":[],\"upstreamDuplicates\":[],\"downstreamDuplicates\":[],\"link\":\"https://itunes.apple.com/us/app/smurfs-village/id399648212?mt=8&uo=4\"},\"provider\":{\"id\":\"{link}\",\"displayName\":\"Twitter\",\"objectType\":\"service\",\"attachments\":[],\"upstreamDuplicates\":[],\"downstreamDuplicates\":[],\"link\":\"http://www.twitter.com\"},\"content\":\"Le Grand Schtroumpf confirme soccer que la cascade magique n'est \\\"Plus tr?�s loin.\\\" http://t.co/Ytn45Pbttk #SmurfsVillage @BeelineGames\",\"links\":[],\"guid\":\"A8fccSz7rpKfDJY078VLyw==_201308\",\"link\":\"http://twitter.com/spoffff/statuses/372802927385403392\",\"posted
 Time\":\"2013-08-28T19:28:38.000Z\",\"objectType\":\"activity\",\"twitter_filter_level\":\"medium\"}\n";
+    private static final String LINK_EXPANDER_JSON = "{\"body\":\"Analise baseball Coady W2O Lead, EMEA Twitter Linkedin\\nRyan Flinn Director, Earned Media Twitter Linkedin\\nAdam Cohen W2O Lead, Boston Twitter Linkedin\\nSarah Savage Managing Director, Healthcare Twitter Linkedin\\nCarolyn Wang Practice Lead, Healthcare Twitter Linkedin\\nKathy Keanini Group Director, Strategy Twitter Linkedin\\nRob Cronin Practice Lead, Healthcare Twitter Linkedin\\nAnalise Coady W2O Lead, EMEA Twitter Linkedin\\nRyan Flinn Director, Earned Media Twitter Linkedin\\nAdam Cohen W2O Lead, Boston Twitter Linkedin\\nSarah Savage Managing Director, Healthcare Twitter Linkedin\\nCarolyn Wang Practice Lead, Healthcare Twitter Linkedin\\nKathy Keanini Group Director, Strategy Twitter Linkedin\\nRob Cronin Practice Lead, Healthcare Twitter Linkedin\\nOur Thinkers\\nFull-on enterprise consulting, supercharged by the best analytics in the business.\\nProducts & Services\\nMedia\\nPaid. Earned. Shared. Owned.
  You name it, we either mastered it or just invented it.\\nthe w2o group\\n© W2O Group 2014\\n\",\"finalUrl\":\"http://www.wcgworld.com/\",\"locale\":null,\"twitterSite\":null,\"urlParts\":[\"www.wcgworld.com\"],\"twitterCreator\":null,\"finalStatusCode\":\"200\",\"author\":null,\"originalUrl\":\"http://www.wcgworld.com/\",\"title\":\"WCG World\",\"description\":\"Find out what makes The WCG Approach second to none\",\"redirects\":[],\"domain\":\"www.wcgworld.com\",\"wasRedirected\":false,\"facebookApp\":null,\"metaData\":{\"content-type\":\"text/html; charset=UTF-8\",\"viewport\":\"width=device-width, initial-scale=1\",\"title\":\"WCG World\",\"og:title\":\"WCG World\",\"og:description\":\"Find out what makes The WCG Approach second to none\",\"content-encoding\":\"UTF-8\",\"x-ua-compatible\":\"IE=edge\",\"og:site_name\":\"WCG World\",\"dc:title\":\"WCG World\",\"og:image\":\"http://www.wcgworld.com/assets/img/home_banner.png\",\"og:url\":\"http://www.wcgworld.com/\"},\"normalized
 Url\":\"www.wcgworld.com/\",\"keywords\":[\"keywords\",\"news_keywords\"],\"status\":\"SUCCESS\",\"isTracked\":false,\"medium\":null,\"facebookPage\":null,\"failure\":false,\"lastModifiedDate\":null,\"tookInMillis\":110,\"publishedDate\":null,\"siteStatus\":\"ERROR\",\"imageURL\":\"http://www.wcgworld.com/assets/img/home_banner.png\",\"plainText\":null}\n";
+
+    private static List<LanguageTag> tags;
+    private static ObjectMapper mapper;
+
+
+    @BeforeClass
+    public static void setUpTags() {
+        tags = Lists.newLinkedList();
+        Scanner scanner = new Scanner(TestLucenSimpleTaggingProcessor.class.getResourceAsStream("/TestTags.tsv"));
+        while(scanner.hasNextLine()) {
+            String[] line = scanner.nextLine().split("\t");
+            tags.add(new LanguageTag(line[0], line[1], "en"));
+        }
+        mapper = new ObjectMapper();
+    }
+
+
+    @Test
+    public void testActivityJsonString() {
+        LuceneSimpleTaggingProcessor processor = new LuceneSimpleTaggingProcessor("test", new String[] {"$.content"}, null,tags);
+        processor.prepare(null);
+        List<StreamsDatum> datums = processor.process(new StreamsDatum(ACTIVITY_JSON));
+        assertNotNull(datums);
+        assertEquals(1, datums.size());
+        StreamsDatum datum = datums.get(0);
+        assertNotNull(datum);
+        assertNotNull(datum.getDocument());
+        assertTrue(datum.getDocument() instanceof String);
+        String json = (String)datum.getDocument();
+        List<String> tags = JsonPath.read(json, "$.extensions.w2o.tags");
+        assertEquals(2, tags.size());
+        assertTrue(tags.contains("baseball"));
+        assertTrue(tags.contains("soccer"));
+        tags = JsonPath.read(json, "$.extensions.w2o.contentTags");
+        assertEquals(2, tags.size());
+        assertTrue(tags.contains("baseball"));
+        assertTrue(tags.contains("soccer"));
+    }
+
+    @Test
+    public void testW2OActivityJsonString() {
+        LuceneSimpleTaggingProcessor processor = new LuceneSimpleTaggingProcessor("test", new String[] {"$.content"}, null,tags);
+        processor.prepare(null);
+        List<StreamsDatum> datums = processor.process(new StreamsDatum(W2O_ACTIVITY_JSON));
+        assertNotNull(datums);
+        assertEquals(1, datums.size());
+        StreamsDatum datum = datums.get(0);
+        assertNotNull(datum);
+        assertNotNull(datum.getDocument());
+        assertTrue(datum.getDocument() instanceof String);
+        String json = (String)datum.getDocument();
+        List<String> tags = JsonPath.read(json, "$.extensions.w2o.tags");
+        assertEquals(1, tags.size());
+        assertTrue(tags.contains("soccer"));
+        tags = JsonPath.read(json, "$.extensions.w2o.contentTags");
+        assertEquals(1, tags.size());
+        assertTrue(tags.contains("soccer"));
+    }
+
+    @Test
+    public void testLinkExpanderJsonString() {
+        LuceneSimpleTaggingProcessor processor = new LuceneSimpleTaggingProcessor("test", new String[] {"$.body"}, null, tags);
+        processor.prepare(null);
+        List<StreamsDatum> datums = processor.process(new StreamsDatum(LINK_EXPANDER_JSON));
+        assertNotNull(datums);
+        assertEquals(1, datums.size());
+        StreamsDatum datum = datums.get(0);
+        assertNotNull(datum);
+        assertNotNull(datum.getDocument());
+        assertTrue(datum.getDocument() instanceof String);
+        String json = (String)datum.getDocument();
+        List<String> tags = JsonPath.read(json, "$.extensions.w2o.tags");
+        assertEquals(1, tags.size());
+        assertTrue(tags.contains("baseball"));
+        tags = JsonPath.read(json, "$.extensions.w2o.contentTags");
+        assertEquals(1, tags.size());
+        assertTrue(tags.contains("baseball"));
+    }
+
+    @Test
+    public void testActivityObject() throws Exception {
+        LuceneSimpleTaggingProcessor processor = new LuceneSimpleTaggingProcessor("test", new String[] {"$.content"}, null,tags);
+        processor.prepare(null);
+        List<StreamsDatum> datums = processor.process(new StreamsDatum(mapper.readValue(ACTIVITY_JSON, Activity.class)));
+        assertNotNull(datums);
+        assertEquals(1, datums.size());
+        StreamsDatum datum = datums.get(0);
+        assertNotNull(datum);
+        assertNotNull(datum.getDocument());
+        assertTrue(datum.getDocument() instanceof Activity);
+        String json = mapper.writeValueAsString(datum.getDocument());
+        List<String> tags = JsonPath.read(json, "$.extensions.w2o.tags");
+        assertEquals(2, tags.size());
+        assertTrue(tags.contains("baseball"));
+        assertTrue(tags.contains("soccer"));
+        tags = JsonPath.read(json, "$.extensions.w2o.contentTags");
+        assertEquals(2, tags.size());
+        assertTrue(tags.contains("baseball"));
+        assertTrue(tags.contains("soccer"));
+    }
+
+    @Test
+    public void testW2OActivityObject() throws Exception{
+        LuceneSimpleTaggingProcessor processor = new LuceneSimpleTaggingProcessor("test", new String[] {"$.content"}, null,tags);
+        processor.prepare(null);
+        List<StreamsDatum> datums = processor.process(new StreamsDatum(mapper.readValue(W2O_ACTIVITY_JSON, W2OActivity.class)));
+        assertNotNull(datums);
+        assertEquals(1, datums.size());
+        StreamsDatum datum = datums.get(0);
+        assertNotNull(datum);
+        assertNotNull(datum.getDocument());
+        assertTrue(datum.getDocument() instanceof W2OActivity);
+        String json = (String) mapper.writeValueAsString(datum.getDocument());
+        List<String> tags = JsonPath.read(json, "$.extensions.w2o.tags");
+        assertEquals(1, tags.size());
+        assertTrue(tags.contains("soccer"));
+        tags = JsonPath.read(json, "$.extensions.w2o.contentTags");
+        assertEquals(1, tags.size());
+        assertTrue(tags.contains("soccer"));
+    }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/27e67162/streams-contrib/streams-processor-lucene/src/test/resources/TestTags.tsv
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-lucene/src/test/resources/TestTags.tsv b/streams-contrib/streams-processor-lucene/src/test/resources/TestTags.tsv
new file mode 100644
index 0000000..c6ed3a7
--- /dev/null
+++ b/streams-contrib/streams-processor-lucene/src/test/resources/TestTags.tsv
@@ -0,0 +1,2 @@
+baseball	baseball OR homerun OR "home run"
+soccer	soccer
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/27e67162/streams-runtimes/streams-runtime-storm/pom.xml
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-storm/pom.xml b/streams-runtimes/streams-runtime-storm/pom.xml
index 6be2acb..dd0105a 100644
--- a/streams-runtimes/streams-runtime-storm/pom.xml
+++ b/streams-runtimes/streams-runtime-storm/pom.xml
@@ -28,6 +28,12 @@
     <modelVersion>4.0.0</modelVersion>
     <artifactId>streams-runtime-storm</artifactId>
 
+    <properties>
+        <storm.version>0.9.1-incubating</storm.version>
+        <scala.version>0.9.1</scala.version>
+        <zkclient.version>0.4</zkclient.version>
+    </properties>
+
     <dependencies>
         <dependency>
             <groupId>org.apache.streams</groupId>
@@ -56,24 +62,18 @@
         <dependency>
             <groupId>org.apache.storm</groupId>
             <artifactId>storm-core</artifactId>
+            <version>${storm.version}</version>
             <scope>provided</scope>
         </dependency>
-        <!--<dependency>-->
-            <!--<groupId>org.apache.storm</groupId>-->
-            <!--<artifactId>storm-netty</artifactId>-->
-            <!--<scope>provided</scope>-->
-        <!--</dependency>-->
         <dependency>
             <groupId>org.scala-lang</groupId>
             <artifactId>scala-library</artifactId>
-            <version>2.9.2</version>
             <scope>compile</scope>
             <type>jar</type>
         </dependency>
         <dependency>
             <groupId>com.101tec</groupId>
             <artifactId>zkclient</artifactId>
-            <version>0.3</version>
             <scope>compile</scope>
             <exclusions>
                 <exclusion>
@@ -88,6 +88,39 @@
         </dependency>
     </dependencies>
 
+    <dependencyManagement>
+        <dependencies>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+            <version>${scala.version}</version>
+            <scope>compile</scope>
+            <type>jar</type>
+        </dependency>
+        <dependency>
+            <groupId>com.101tec</groupId>
+            <artifactId>zkclient</artifactId>
+            <version>${zkclient.version}</version>
+            <scope>compile</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        </dependencies>
+    </dependencyManagement>
+
     <build>
         <sourceDirectory>src/main/java</sourceDirectory>
         <testSourceDirectory>src/test/java</testSourceDirectory>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/27e67162/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/PipelineGraphLauncher.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/PipelineGraphLauncher.java b/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/PipelineGraphLauncher.java
deleted file mode 100644
index a0ad2ea..0000000
--- a/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/PipelineGraphLauncher.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.storm;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.typesafe.config.Config;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.util.RegexUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Created with IntelliJ IDEA.
- * User: sblackmon
- * Date: 9/20/13
- * Time: 11:17 AM
- * To change this template use File | Settings | File Templates.
- */
-public class PipelineGraphLauncher {
-
-    private static final Logger log = LoggerFactory.getLogger(PipelineGraphLauncher.class);
-
-    private static Config streamsConfiguration;
-
-    private static List<Pair<String,Class>> topologies;
-
-    private static List<Pair<String,Class>> resolveClasses(List<Pair<String,String>> topologyPairs) throws IOException, ClassNotFoundException {
-
-        List<Pair<String,Class>> topologies = new ArrayList<Pair<String,Class>>();
-
-        for( Pair<String,String> pair : topologyPairs ) {
-            String topologyId = pair.getLeft();
-            Class topologyClass = Class.forName(pair.getRight());
-            topologies.add(new ImmutablePair(topologyId, topologyClass));
-        }
-
-        return topologies;
-    }
-
-    private static List<Pair<String,Class>> loadTopologiesFromPipelineTopologyListFile(File file) throws IOException, ClassNotFoundException {
-
-        List<String> lines = IOUtils.readLines(FileUtils.openInputStream(file));
-        String pattern = "^([\\w-]*)[\\s]*?([\\w.]*)$";
-
-        List<Pair<String,String>> topologyPairs = RegexUtils.getTwoMatchedGroupsList(lines, pattern);
-
-        topologies = resolveClasses(topologyPairs);
-
-        for( Pair<String,String> pair : topologyPairs ) {
-            String topologyId = pair.getLeft();
-            Class topologyClass = Class.forName(pair.getRight());
-            topologies.add(new ImmutablePair(topologyId, topologyClass));
-        }
-
-        return topologies;
-    }
-
-    private static List<Pair<String,Class>> loadTopologiesFromPipelineGraphFile(File file) throws IOException, ClassNotFoundException {
-
-        List<String> lines = IOUtils.readLines(FileUtils.openInputStream(file));
-        String pattern = "$([\\w-]*)\\s([\\w.)";
-
-        List<Pair<String,String>> topologyPairs = RegexUtils.getTwoMatchedGroupsList(lines, pattern);
-
-        topologies = resolveClasses(topologyPairs);
-
-        return topologies;
-    }
-
-    public static boolean isLocal(String[] args) {
-        if(args.length >= 1 && args[1].equals("deploy"))
-            return false;
-        else return true;
-    }
-
-    public static void main(String[] args) throws Exception {
-
-        if(args.length < 3) {
-            log.error("Not enough arguments");
-            log.error("  storm {local,deploy} <pipelinedef>");
-            return;
-        }
-        if(!args[1].equals("deploy")) {
-            log.error("Not a deploy");
-            log.error("  storm {local,deploy} <pipelinedef>");
-            return;
-        }
-
-        ObjectMapper mapper = new ObjectMapper();
-
-        URL configFileUrl = PipelineGraphLauncher.class.getResource(args[0]);
-
-        File configFile;
-        try {
-            configFile = new File(configFileUrl.toURI());
-        } catch(URISyntaxException e) {
-            configFile = new File(configFileUrl.getPath());
-        }
-
-        streamsConfiguration = StreamsConfigurator.config;
-
-//        String pipelineIdentifier = streamsConfiguration.getPipeline();
-//
-//        for( Map.Entry<String, Object> moduleConfig : streamsConfiguration.getAdditionalProperties().entrySet()) {
-//
-//        }
-
-//      each defined graph becomes a topology
-
-//
-//        URL pipelineFileUrl = PipelineGraphLauncher.class.getResource(args[1]);
-//
-//        File pipelineFile;
-//        try {
-//            pipelineFile = new File(pipelineFileUrl.toURI());
-//        } catch(URISyntaxException e) {
-//            pipelineFile = new File(pipelineFileUrl.getPath());
-//        }
-//
-//        topologies = loadTopologiesFromPipelineTopologyListFile(pipelineFile);
-//
-//        for( Pair<String,Class> topology : topologies ) {
-//            Class topologyClass = topology.getRight();
-//
-//            try {
-//                Constructor ctor = topologyClass.getDeclaredConstructor(
-//                    String.class,
-//                    StreamsConfiguration.class);
-//                ctor.setAccessible(true);
-//                Object topologyObject = ctor.newInstance(pipelineIdentifier, streamsConfiguration);
-//                Runnable runnable = (Runnable) topologyObject;
-//                runnable.run();
-//            } catch (InstantiationException x) {
-//                log.warn(x.getMessage());
-//                x.printStackTrace();
-//            } catch (IllegalAccessException x) {
-//                log.warn(x.getMessage());
-//                x.printStackTrace();
-//            } catch (InvocationTargetException x) {
-//                log.warn(x.getMessage());
-//                x.printStackTrace();
-//            } catch (NoSuchMethodException x) {
-//                log.warn(x.getMessage());
-//                x.printStackTrace();
-//
-//                try {
-//                    Constructor ctor = topologyClass.getDeclaredConstructor(
-//                            String[].class);
-//                    ctor.setAccessible(true);
-//                    Object topologyObject = ctor.newInstance(args);
-//                    Method main = topologyClass.getMethod("main", String[].class);
-//                    main.invoke(topologyObject, args);
-//                } catch (InstantiationException x2) {
-//                    log.warn(x2.getMessage());
-//                    x.printStackTrace();
-//                } catch (IllegalAccessException x2) {
-//                    log.warn(x2.getMessage());
-//                    x.printStackTrace();
-//                } catch (InvocationTargetException x2) {
-//                    log.warn(x2.getMessage());
-//                    x.printStackTrace();
-//                } catch (NoSuchMethodException x2) {
-//                    log.error(x2.getMessage());
-//                    x.printStackTrace();
-//                }
-//            }
-//        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/27e67162/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsProcessorFunction.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsProcessorFunction.java b/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsProcessorFunction.java
new file mode 100644
index 0000000..b1cdba4
--- /dev/null
+++ b/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsProcessorFunction.java
@@ -0,0 +1,54 @@
+package org.apache.streams.storm.trident;
+
+import com.google.common.collect.Lists;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.core.StreamsProcessor;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import storm.trident.operation.Function;
+import storm.trident.operation.TridentCollector;
+import storm.trident.operation.TridentOperationContext;
+import storm.trident.tuple.TridentTuple;
+
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by sblackmon on 4/6/14.
+ */
+public class StreamsProcessorFunction implements Function {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(StreamsProcessorFunction.class);
+
+    StreamsProcessor processor;
+
+    @Override
+    public void execute(TridentTuple objects, TridentCollector tridentCollector) {
+        StreamsDatum datum = new StreamsDatum(
+                objects.getValueByField("document"),
+                new DateTime(objects.getLongByField("timestamp")),
+                new BigInteger(objects.getStringByField("sequenceid"))
+        );
+        List<StreamsDatum> results = processor.process(datum);
+        for( StreamsDatum result : results ) {
+            tridentCollector.emit( Lists.newArrayList(
+                    datum.getTimestamp(),
+                    datum.getSequenceid(),
+                    datum.getDocument()
+            ));
+        }
+    }
+
+    @Override
+    public void prepare(Map map, TridentOperationContext tridentOperationContext) {
+        processor.prepare(map);
+    }
+
+    @Override
+    public void cleanup() {
+        processor.cleanUp();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/27e67162/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsProviderSpout.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsProviderSpout.java b/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsProviderSpout.java
index 8aa04d8..e582e3d 100644
--- a/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsProviderSpout.java
+++ b/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsProviderSpout.java
@@ -62,6 +62,6 @@ public class StreamsProviderSpout implements IBatchSpout {
 
     @Override
     public Fields getOutputFields() {
-        return new Fields("timestamp", "sequenceid", "datum");
+        return new Fields("timestamp", "sequenceid", "document");
     }
 };
\ No newline at end of file