You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/07/21 20:59:03 UTC

[1/2] git commit: STREAMS-133 | Refactored the way we handle SyndEntries so that the process is cleaner

Repository: incubator-streams
Updated Branches:
  refs/heads/master 9580fbc7e -> 7216a6fdf


STREAMS-133 | Refactored the way we handle SyndEntries so that the process is cleaner


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/3325c6ec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/3325c6ec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/3325c6ec

Branch: refs/heads/master
Commit: 3325c6ec4a69366d907329d492452ce75ced49a6
Parents: 5c9a531
Author: Robert Douglas <rd...@w2odigital.com>
Authored: Fri Jul 11 16:59:57 2014 -0500
Committer: Robert Douglas <rd...@w2odigital.com>
Committed: Fri Jul 11 16:59:57 2014 -0500

----------------------------------------------------------------------
 .../streams/rss/processor/RssTypeConverter.java |  72 +++++
 .../streams/rss/provider/RssEventProcessor.java |   8 +-
 .../serializer/SyndEntryActivitySerializer.java | 198 ++++++++++--
 .../rss/serializer/SyndEntrySerializer.java     | 308 +++++++++++++++++++
 .../streams/rss/test/RssTypeConverterTest.java  |  31 ++
 .../test/SyndEntryActivitySerizlizerTest.java   | 102 ++++++
 .../streams/rss/test/Top100FeedsTest.java       |  84 -----
 .../src/test/resources/TestSyndEntryJson.txt    |  10 +
 .../local/tasks/StreamsProcessorTask.java       |  33 +-
 9 files changed, 719 insertions(+), 127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3325c6ec/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/processor/RssTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/processor/RssTypeConverter.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/processor/RssTypeConverter.java
new file mode 100644
index 0000000..339b922
--- /dev/null
+++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/processor/RssTypeConverter.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.rss.processor;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.rss.serializer.SyndEntryActivitySerializer;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Converts ObjectNode representations of Rome SyndEntries to activities.
+ */
+public class RssTypeConverter implements StreamsProcessor{
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(RssTypeConverter.class);
+
+    private SyndEntryActivitySerializer serializer;
+    private int successCount = 0;
+    private int failCount = 0;
+
+    @Override
+    public List<StreamsDatum> process(StreamsDatum datum) {
+        List<StreamsDatum> datums = Lists.newLinkedList();
+        if(datum.getDocument() instanceof ObjectNode) {
+            Activity activity = this.serializer.deserialize((ObjectNode) datum.getDocument());
+            datums.add(new StreamsDatum(activity, activity.getId(), DateTime.now().withZone(DateTimeZone.UTC)));
+            successCount ++;
+        } else {
+            failCount ++;
+            throw new NotImplementedException("Not implemented for class type : "+ datum.getDocument().getClass().toString());
+
+        }
+        LOGGER.debug("Processor current success count: {} and current fail: {}", successCount, failCount);
+
+        return datums;
+    }
+
+    @Override
+    public void prepare(Object o) {
+        this.serializer = new SyndEntryActivitySerializer();
+    }
+
+    @Override
+    public void cleanUp() {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3325c6ec/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssEventProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssEventProcessor.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssEventProcessor.java
index 94b04ae..75d275d 100644
--- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssEventProcessor.java
+++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssEventProcessor.java
@@ -19,19 +19,18 @@
 package org.apache.streams.rss.provider;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.sun.syndication.feed.synd.SyndEntry;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.rss.serializer.SyndEntryActivitySerializer;
+import org.apache.streams.rss.serializer.SyndEntrySerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Queue;
 import java.util.Random;
 
-/**
- * Created by sblackmon on 12/10/13.
- */
 public class RssEventProcessor implements Runnable {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(RssEventProcessor.class);
@@ -45,6 +44,7 @@ public class RssEventProcessor implements Runnable {
     private Class outClass;
 
     private SyndEntryActivitySerializer syndEntryActivitySerializer = new SyndEntryActivitySerializer();
+    private SyndEntrySerializer syndEntrySerializer = new SyndEntrySerializer();
 
     public final static String TERMINATE = new String("TERMINATE");
 
@@ -87,7 +87,7 @@ public class RssEventProcessor implements Runnable {
                     // convert to desired format
                     SyndEntry entry = (SyndEntry)item;
                     if( entry != null ) {
-                        Activity out = syndEntryActivitySerializer.deserialize((SyndEntry)item);
+                        Activity out = syndEntryActivitySerializer.deserialize(this.syndEntrySerializer.deserialize((SyndEntry)item));
 
                         if( out != null )
                             outQueue.offer(new StreamsDatum(out));

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3325c6ec/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java
index 52c0f04..ace10dc 100644
--- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java
+++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java
@@ -18,18 +18,49 @@
 
 package org.apache.streams.rss.serializer;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-import com.sun.syndication.feed.synd.SyndEntry;
 import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.data.util.RFC3339Utils;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.*;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.List;
+import java.util.Map;
+
+import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
+
+public class SyndEntryActivitySerializer implements ActivitySerializer<ObjectNode> {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(SyndEntryActivitySerializer.class);
+
+    private boolean includeRomeExtension;
+
+    public SyndEntryActivitySerializer() {
+        this(true);
+    }
+
+    public SyndEntryActivitySerializer(boolean includeRomeExtension) {
+        this.includeRomeExtension = includeRomeExtension;
+    }
 
-/**
- * Deserializes the Rome SyndEntry POJO and converts it to an instance of {@link Activity}
- */
-public class SyndEntryActivitySerializer implements ActivitySerializer<SyndEntry> {
+
+    @Override
+    public List<Activity> deserializeAll(List<ObjectNode> objectNodes) {
+        List<Activity> result = Lists.newLinkedList();
+        for (ObjectNode node : objectNodes) {
+            result.add(deserialize(node));
+        }
+        return result;
+    }
 
     @Override
     public String serializationFormat() {
@@ -37,46 +68,149 @@ public class SyndEntryActivitySerializer implements ActivitySerializer<SyndEntry
     }
 
     @Override
-    public SyndEntry serialize(Activity deserialized) {
+    public ObjectNode serialize(Activity deserialized) {
         throw new UnsupportedOperationException("Cannot currently serialize to Rome");
     }
 
     @Override
-    public Activity deserialize(SyndEntry serialized) {
-        Preconditions.checkNotNull(serialized);
+    public Activity deserialize(ObjectNode syndEntry) {
+        return deserializeWithRomeExtension(syndEntry, this.includeRomeExtension);
+    }
+
+    public Activity deserializeWithRomeExtension(ObjectNode entry, boolean withExtension) {
+        Preconditions.checkNotNull(entry);
+
         Activity activity = new Activity();
-        Provider provider = new Provider();
-        if( serialized.getSource() != null )
-            if( serialized.getSource().getUri() != null )
-                provider.setId("rss:"+serialized.getSource().getUri());
-        else
-            provider.setId("rss:unknown");
-        Actor actor = new Actor();
+        Provider provider = buildProvider(entry);
+        Actor actor = buildActor(entry);
+        ActivityObject activityObject = buildActivityObject(entry);
+
+        activityObject.setUrl(provider.getUrl());
+        activityObject.setAuthor(actor.getAuthor());
+
+        activity.setUrl(provider.getUrl());
+        activity.setProvider(provider);
+        activity.setActor(actor);
+        activity.setVerb("post");
+        activity.setId("id:rss:post:" + activity.getUrl());
+
+        JsonNode published = entry.get("publishedDate");
+        if (published != null) {
+            try {
+                activity.setPublished(RFC3339Utils.parseToUTC(published.textValue()));
+            } catch (Exception e) {
+                LOGGER.warn("Failed to parse date : {}", published.textValue());
+
+                DateTime now = DateTime.now().withZone(DateTimeZone.UTC);
+                activity.setPublished(now);
+            }
+        }
+
+        activity.setUpdated(activityObject.getUpdated());
+        activity.setObject(activityObject);
+
+        if (withExtension) {
+            activity = addRomeExtension(activity, entry);
+        }
+
+        return activity;
+    }
+
+    /**
+     * Given an RSS entry, extra out the author and actor information and return it
+     * in an actor object
+     *
+     * @param entry
+     * @return
+     */
+    private Actor buildActor(ObjectNode entry) {
         Author author = new Author();
-        if( serialized.getAuthor() != null ) {
-            author.setId(serialized.getAuthor());
-            author.setDisplayName(serialized.getAuthor());
+        Actor actor = new Actor();
+
+        if (entry.get("author") != null) {
+            author.setId(entry.get("author").textValue());
+            author.setDisplayName(entry.get("author").textValue());
+
             actor.setAuthor(author);
+            String uriToSet = entry.get("rssFeed") != null ? entry.get("rssFeed").asText() : null;
+
+            actor.setId("id:rss:" + uriToSet + ":" + author.getId());
+            actor.setDisplayName(author.getDisplayName());
         }
-        activity.setActor(actor);
-        activity.setVerb("blog");
-        activity.setProvider(provider);
+
+        return actor;
+    }
+
+    /**
+     * Given an RSS object, build the ActivityObject
+     *
+     * @param entry
+     * @return
+     */
+    private ActivityObject buildActivityObject(ObjectNode entry) {
         ActivityObject activityObject = new ActivityObject();
-        activityObject.setSummary(serialized.getTitle());
-        activityObject.setUrl(serialized.getLink());
-        activity.setObject(activityObject);
-        activity.setId(serialized.getLink());
-        return activity;
+
+        JsonNode summary = entry.get("description");
+        if (summary != null)
+            activityObject.setSummary(summary.textValue());
+        else if((summary = entry.get("title")) != null) {
+            activityObject.setSummary(summary.textValue());
+        }
+
+        return activityObject;
     }
 
-    @Override
-    public List<Activity> deserializeAll(List<SyndEntry> serializedList) {
-        List<Activity> activityList = Lists.newArrayList();
-        for(SyndEntry entry : serializedList) {
-            activityList.add(deserialize(entry));
+    /**
+     * Given an RSS object, build and return the Provider object
+     *
+     * @param entry
+     * @return
+     */
+    private Provider buildProvider(ObjectNode entry) {
+        Provider provider = new Provider();
+
+        String link = null;
+        String uri = null;
+        String resourceLocation = null;
+
+        if (entry.get("link") != null)
+            link = entry.get("link").textValue();
+        if (entry.get("uri") != null)
+            uri = entry.get("uri").textValue();
+
+        if (uri != null) {
+            if((uri.contains("http") || uri.contains("www")) || (link == null || !(link.contains("http") || link.contains("www")))) {
+                resourceLocation = uri;
+            } else {
+                resourceLocation = link;
+            }
         }
-        return activityList;
+
+        provider.setId("id:providers:rss");
+        provider.setUrl(resourceLocation);
+        provider.setDisplayName("RSS");
+
+        return provider;
     }
 
+    /**
+     * Given an RSS object and an existing activity,
+     * add the Rome extension to that activity and return it
+     *
+     * @param activity
+     * @param entry
+     * @return
+     */
+    private Activity addRomeExtension(Activity activity, ObjectNode entry) {
+        ObjectMapper mapper = new StreamsJacksonMapper();
+        ObjectNode activityRoot = mapper.convertValue(activity, ObjectNode.class);
+        ObjectNode extensions = JsonNodeFactory.instance.objectNode();
+
+        extensions.put("rome", entry);
+        activityRoot.put("extensions", extensions);
 
+        activity = mapper.convertValue(activityRoot, Activity.class);
+
+        return activity;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3325c6ec/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntrySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntrySerializer.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntrySerializer.java
new file mode 100644
index 0000000..1135172
--- /dev/null
+++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntrySerializer.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.rss.serializer;
+
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import com.sun.syndication.feed.module.Module;
+import com.sun.syndication.feed.rss.Category;
+import com.sun.syndication.feed.rss.Content;
+import com.sun.syndication.feed.rss.Enclosure;
+import com.sun.syndication.feed.synd.*;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.ISODateTimeFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+
+/**
+ * Since SyndEntry is not Serializable, we cannot emit them from any StreamOperation.  So the CommunityRssProvider
+ * converts the SyndEntries to ObjectNodes using this class.
+ */
+public class SyndEntrySerializer {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(SyndEntrySerializer.class);
+
+    public ObjectNode deserialize(SyndEntry entry) {
+        return deserializeRomeEntry(entry);
+    }
+
+
+    public List<ObjectNode> deserializeAll(Collection<SyndEntry> entries) {
+        List<ObjectNode> result = Lists.newLinkedList();
+        for(SyndEntry entry : entries) {
+            result.add(deserialize(entry));
+        }
+        return result;
+    }
+
+
+
+    private ObjectNode deserializeRomeEntry(SyndEntry entry) {
+        JsonNodeFactory factory = JsonNodeFactory.instance;
+        ObjectNode root = factory.objectNode();
+
+        serializeString(entry.getAuthor(), "author", root);
+        serializeListOfStrings(entry.getAuthors(), "authors", root, factory);
+        serializeCategories(root, factory, entry.getCategories());
+        serializeContents(root, factory, entry.getContents());
+        serializeListOfStrings(entry.getContributors(), "contributors", root, factory);
+        serializeDescription(root, factory, entry.getDescription());
+        serializeEnclosures(root, factory, entry.getEnclosures());
+        serializeForeignMarkUp(root, factory, entry.getForeignMarkup());
+        serializeString(entry.getLink(), "link", root);
+        serializeLinks(root, factory, entry.getLinks());
+        serializeModules(root, factory, entry.getModules());
+        serializeDate(root, entry.getPublishedDate(), "publishedDate");
+        serializeSource(root, factory, entry.getSource());
+        serializeString(entry.getTitle(), "title", root);
+        serializeDate(root, entry.getUpdatedDate(), "updateDate");
+        serializeString(entry.getUri(), "uri", root);
+
+        return root;
+    }
+
+
+    private void serializeCategories(ObjectNode root, JsonNodeFactory factory, List categories) {
+        if(categories == null || categories.size() == 0)
+            return;
+        ArrayNode cats = factory.arrayNode();
+        for(Object obj : categories) {
+            if(obj instanceof Category) {
+                ObjectNode catNode = factory.objectNode();
+                Category category = (Category) obj;
+                if(category.getDomain() != null)
+                    catNode.put("domain", category.getDomain());
+                if(category.getValue() != null)
+                    catNode.put("value", category.getValue());
+                cats.add(catNode);
+            }
+            else if(obj instanceof com.sun.syndication.feed.atom.Category) {
+                com.sun.syndication.feed.atom.Category category = (com.sun.syndication.feed.atom.Category) obj;
+                ObjectNode catNode = factory.objectNode();
+                if(category.getLabel() != null)
+                    catNode.put("label", category.getLabel());
+                if(category.getScheme() != null)
+                    catNode.put("scheme", category.getScheme());
+                if(category.getSchemeResolved() != null)
+                    catNode.put("schemeResolved", category.getSchemeResolved());
+                if(category.getTerm() != null )
+                    catNode.put("term", category.getTerm());
+                cats.add(catNode);
+            }
+        }
+        root.put("categories", cats);
+    }
+
+    private void serializeContents(ObjectNode root, JsonNodeFactory factory, List contents) {
+        if(contents == null || contents.size() == 0)
+            return;
+        ArrayNode contentsArray = factory.arrayNode();
+        for(Object obj : contents) {
+            ObjectNode content = factory.objectNode();
+            if(obj instanceof Content) {
+                Content rssContent = (Content) obj;
+                content.put("type", rssContent.getType());
+                content.put("value", rssContent.getValue());
+            }
+            if(obj instanceof com.sun.syndication.feed.atom.Content) {
+                com.sun.syndication.feed.atom.Content atomContent = (com.sun.syndication.feed.atom.Content) obj;
+                content.put("type", atomContent.getType());
+                content.put("value", atomContent.getValue());
+                content.put("mode", atomContent.getMode());
+                content.put("src", atomContent.getSrc());
+            }
+            contentsArray.add(content);
+        }
+        root.put("contents", contentsArray);
+    }
+
+    private void serializeDate(ObjectNode root, Date date, String key) {
+        DateTimeFormatter formatter = ISODateTimeFormat.dateTime();
+        if(date == null)
+            return;
+        root.put(key, formatter.print(date.getTime()));
+    }
+
+    private void serializeDescription(ObjectNode root, JsonNodeFactory factory, SyndContent synd) {
+        if(synd == null)
+            return;
+        ObjectNode content = factory.objectNode();
+        if(synd.getValue() != null)
+            content.put("value", synd.getValue());
+        if(synd.getMode() != null)
+            content.put("mode", synd.getMode());
+        if(synd.getType() != null)
+            content.put("type", synd.getType());
+        root.put("description", content);
+    }
+
+    private void serializeEnclosures(ObjectNode root, JsonNodeFactory factory, List enclosures) {
+        if(enclosures == null || enclosures.size() == 0)
+            return;
+        ArrayNode encls = factory.arrayNode();
+        for(Object obj : enclosures) {
+            if(obj instanceof Enclosure){
+                Enclosure enclosure = (Enclosure) obj;
+                ObjectNode encl = factory.objectNode();
+                if(enclosure.getType() != null)
+                    encl.put("type", enclosure.getType());
+                if(enclosure.getUrl() != null)
+                    encl.put("url", enclosure.getUrl());
+                encl.put("length", enclosure.getLength());
+                encls.add(encl);
+            } else if(obj instanceof SyndEnclosure) {
+                SyndEnclosure enclosure = (SyndEnclosure) obj;
+                ObjectNode encl = factory.objectNode();
+                if(enclosure.getType() != null)
+                    encl.put("type", enclosure.getType());
+                if(enclosure.getUrl() != null)
+                    encl.put("url", enclosure.getUrl());
+                encl.put("length", enclosure.getLength());
+                encls.add(encl);
+            } else {
+                LOGGER.warn("serializeEnclosures does not handle type : {}", obj.getClass().toString());
+            }
+        }
+        root.put("enclosures", encls);
+    }
+
+    private void serializeForeignMarkUp(ObjectNode root, JsonNodeFactory factory, Object foreignMarkUp) {
+        if(foreignMarkUp == null)
+            return;
+        if(foreignMarkUp instanceof String) {
+            root.put("foreignEnclosures", (String) foreignMarkUp);
+        } else if (foreignMarkUp instanceof List) {
+            List foreignList = (List) foreignMarkUp;
+            if(foreignList.size() == 0)
+                return;
+            if(foreignList.get(0) instanceof String) {
+                serializeListOfStrings(foreignList, "foreignEnclosures", root, factory);
+            } else {
+                LOGGER.debug("SyndEntry.getForeignMarkUp is not of type String. Need to handle the case of class : {}", ((List)foreignMarkUp).get(0).getClass().toString());
+            }
+        } else {
+            LOGGER.debug("SyndEntry.getForeignMarkUp is not of an expected type. Need to handle the case of class : {}", foreignMarkUp.getClass().toString());
+        }
+    }
+
+    private void serializeImage(ObjectNode root, JsonNodeFactory factory, SyndImage image) {
+        if(image == null)
+            return;
+        ObjectNode imageNode = factory.objectNode();
+        serializeString(image.getDescription(), "description", imageNode);
+        serializeString(image.getLink(), "link", imageNode);
+        serializeString(image.getUrl(), "url", imageNode);
+        serializeString(image.getTitle(), "title", imageNode);
+        root.put("image", imageNode);
+    }
+
+    private void serializeListOfStrings(List toSerialize, String key, ObjectNode node, JsonNodeFactory factory) {
+        if(toSerialize == null || toSerialize.size() == 0)
+            return;
+        ArrayNode keyNode = factory.arrayNode();
+        for(Object obj : toSerialize) {
+            if(obj instanceof String) {
+                keyNode.add((String) obj);
+            } else {
+                LOGGER.debug("Array at Key:{} was expecting item types of String. Received class : {}", key, obj.getClass().toString());
+            }
+        }
+        node.put(key, keyNode);
+    }
+
+    private void serializeLinks(ObjectNode root, JsonNodeFactory factory, List links) {
+        if(links == null || links.size() == 0) {
+            return;
+        } else if(links.get(0) instanceof String) {
+            serializeListOfStrings(links, "links", root, factory);
+        } else if(links.get(0) instanceof SyndLinkImpl) {
+            ArrayNode linksArray = factory.arrayNode();
+            SyndLinkImpl syndLink;
+            ObjectNode linkNode;
+            for(Object obj : links) {
+                linkNode = factory.objectNode();
+                syndLink = (SyndLinkImpl) obj;
+                linkNode.put("rel", syndLink.getRel());
+                linkNode.put("href", syndLink.getHref());
+                linkNode.put("type", syndLink.getType());
+                linkNode.put("length", syndLink.getLength());
+                linkNode.put("hrefLang", syndLink.getHreflang());
+                linkNode.put("title", syndLink.getTitle());
+                linksArray.add(linkNode);
+            }
+            root.put("links", linksArray);
+        } else {
+            LOGGER.error("No implementation for handling links of class : {}", links.get(0).getClass().toString());
+        }
+    }
+
+    private void serializeModules(ObjectNode root, JsonNodeFactory factory, List modules) {
+        if(modules == null || modules.size() == 0)
+            return;
+        ArrayNode modulesArray = factory.arrayNode();
+        for(Object obj : modules) {
+            if(obj instanceof Module) {
+                Module mod = (Module) obj;
+                if(mod.getUri() != null)
+                    modulesArray.add(mod.getUri());
+            } else {
+                LOGGER.debug("SyndEntry.getModules() items are not of type Module. Need to handle the case of class : {}", obj.getClass().toString());
+            }
+        }
+        root.put("modules", modulesArray);
+    }
+
+    private void serializeSource(ObjectNode root, JsonNodeFactory factory, SyndFeed source) {
+        if(source == null)
+            return;
+        ObjectNode sourceNode = factory.objectNode();
+        serializeString(source.getAuthor(), "author", sourceNode);
+        serializeListOfStrings(source.getAuthors(), "authors", sourceNode, factory);
+        serializeCategories(sourceNode, factory, source.getCategories());
+        serializeString(source.getCopyright(), "copyright", sourceNode);
+        serializeListOfStrings(source.getContributors(), "contributors", sourceNode, factory);
+        serializeString(source.getDescription(), "description", sourceNode);
+        serializeDescription(sourceNode, factory, source.getDescriptionEx());
+        // source.getEntries(); wtf?
+        serializeString(source.getFeedType(), "feedType", sourceNode);
+        serializeImage(sourceNode, factory, source.getImage());
+        serializeForeignMarkUp(sourceNode, factory, source.getForeignMarkup());
+        serializeString(source.getLanguage(), "language", sourceNode);
+        serializeString(source.getLink(), "link", sourceNode);
+        serializeListOfStrings(source.getLinks(), "links", sourceNode, factory);
+        serializeModules(sourceNode, factory, source.getModules());
+        serializeDate(sourceNode, source.getPublishedDate(), "publishedDate");
+        serializeString(source.getTitle(), "title", sourceNode);
+        serializeString(source.getUri(), "uri", sourceNode);
+
+        root.put("source", sourceNode);
+    }
+
+    private void serializeString(String string, String key, ObjectNode node) {
+        if(string != null && !string.equals(""))
+            node.put(key, string);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3325c6ec/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/RssTypeConverterTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/RssTypeConverterTest.java b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/RssTypeConverterTest.java
new file mode 100644
index 0000000..ed62ad7
--- /dev/null
+++ b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/RssTypeConverterTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.rss.test;
+
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.streams.rss.processor.RssTypeConverter;
+import org.junit.Test;
+
+public class RssTypeConverterTest {
+    @Test
+    public void testSerializability() {
+        RssTypeConverter converter = new RssTypeConverter();
+        RssTypeConverter clone = SerializationUtils.clone(converter);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3325c6ec/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/SyndEntryActivitySerizlizerTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/SyndEntryActivitySerizlizerTest.java b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/SyndEntryActivitySerizlizerTest.java
new file mode 100644
index 0000000..776f4ed
--- /dev/null
+++ b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/SyndEntryActivitySerizlizerTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.rss.test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.Actor;
+import org.apache.streams.pojo.json.Author;
+import org.apache.streams.pojo.json.Provider;
+import org.apache.streams.rss.serializer.SyndEntryActivitySerializer;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Scanner;
+
+import static org.junit.Assert.assertEquals;
+
+public class SyndEntryActivitySerizlizerTest {
+
+    private static ObjectMapper mapper = new StreamsJacksonMapper();
+
+    @Test
+    public void testJsonData() throws Exception {
+        Scanner scanner = new Scanner(this.getClass().getResourceAsStream("/TestSyndEntryJson.txt"));
+        List<Activity> activities = Lists.newLinkedList();
+        List<ObjectNode> objects = Lists.newLinkedList();
+
+        SyndEntryActivitySerializer serializer = new SyndEntryActivitySerializer();
+
+        while(scanner.hasNext()) {
+            String line = scanner.nextLine();
+            System.out.println(line);
+            ObjectNode node = (ObjectNode) mapper.readTree(line);
+
+            objects.add(node);
+            activities.add(serializer.deserialize(node));
+        }
+
+        assertEquals(10, activities.size());
+
+        for(int x = 0; x < activities.size(); x ++) {
+            ObjectNode n = objects.get(x);
+            Activity a = activities.get(x);
+
+            testActor(n.get("author").asText(), a.getActor());
+            testAuthor(n.get("author").asText(), a.getObject().getAuthor());
+            testProvider("id:providers:rss", "RSS", a.getProvider());
+            testVerb("post", a.getVerb());
+            testPublished(n.get("publishedDate").asText(), a.getPublished());
+            testUrl(n.get("uri").asText(), a);
+        }
+    }
+
+    public void testVerb(String expected, String verb) {
+        assertEquals(expected, verb);
+    }
+
+    public void testPublished(String expected, DateTime published) {
+        assertEquals(new DateTime(expected, DateTimeZone.UTC), published);
+    }
+
+    public void testActor(String expected, Actor actor) {
+        assertEquals("id:rss:null" + ":" + expected, actor.getId());
+        assertEquals(expected, actor.getDisplayName());
+    }
+
+    public void testAuthor(String expected, Author author) {
+        assertEquals(expected, author.getDisplayName());
+        assertEquals(expected, author.getId());
+    }
+
+    public void testProvider(String expectedId, String expectedDisplay, Provider provider) {
+        assertEquals(expectedId, provider.getId());
+        assertEquals(expectedDisplay, provider.getDisplayName());
+    }
+
+    public void testUrl(String expected, Activity activity) {
+        assertEquals(expected, activity.getUrl());
+        assertEquals(expected, activity.getObject().getUrl());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3325c6ec/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/Top100FeedsTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/Top100FeedsTest.java b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/Top100FeedsTest.java
deleted file mode 100644
index 81fff6d..0000000
--- a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/Top100FeedsTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.rss.test;
-
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.StringUtils;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.rss.FeedDetails;
-import org.apache.streams.rss.RssStreamConfiguration;
-import org.apache.streams.rss.provider.RssStreamProvider;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.util.List;
-
-import static org.hamcrest.number.OrderingComparison.greaterThan;
-
-/**
- * Created by sblackmon on 2/5/14.
- */
-public class Top100FeedsTest{
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(Top100FeedsTest.class);
-
-    @Test
-    public void Tests()
-    {
-        InputStream is = Top100FeedsTest.class.getResourceAsStream("/top100.txt");
-        InputStreamReader isr = new InputStreamReader(is);
-        BufferedReader br = new BufferedReader(isr);
-
-        RssStreamConfiguration configuration = new RssStreamConfiguration();
-        List<FeedDetails> feeds = Lists.newArrayList();
-        try {
-            while (br.ready()) {
-                String line = br.readLine();
-                if(!StringUtils.isEmpty(line))
-                {
-                    feeds.add(new FeedDetails().withUrl(line).withPollIntervalMillis(5000l));
-                }
-            }
-        } catch( Exception e ) {
-            System.out.println(e);
-            e.printStackTrace();
-            Assert.fail();
-        }
-
-        Assert.assertThat(feeds.size(), greaterThan(70));
-
-        configuration.setFeeds(feeds);
-
-        RssStreamProvider provider = new RssStreamProvider(configuration, Activity.class);
-        provider.prepare(configuration);
-        provider.startStream();
-
-        try {
-            Thread.sleep(10000);
-        } catch (InterruptedException e) {}
-
-        Assert.assertThat(provider.getProviderQueue().size(), greaterThan(0));
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3325c6ec/streams-contrib/streams-provider-rss/src/test/resources/TestSyndEntryJson.txt
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/test/resources/TestSyndEntryJson.txt b/streams-contrib/streams-provider-rss/src/test/resources/TestSyndEntryJson.txt
new file mode 100644
index 0000000..e559b3b
--- /dev/null
+++ b/streams-contrib/streams-provider-rss/src/test/resources/TestSyndEntryJson.txt
@@ -0,0 +1,10 @@
+{"author":"Michael Vizard","categories":[],"contents":[{}],"description":{"value":"<img src=\"http://blog.programmableweb.com/wp-content/Screen-Shot-2014-05-02-at-12.05.39-PM-150x26.png\" alt=\"\" title=\"\" width=\"150\" height=\"26\" img align=\"right\" size-thumbnail wp-image-83714\" />As much as mobile backend-as-a-service (BaaS) platforms free up developers from having to worry about the infrastructure that support their applications, they do generally come with a downside; most of them lock the developer into a particular cloud platform. With that issue in mind, FeedHenry created a namesake BaaS platform that took advantage of <a href=\"http://nodejs.org/\">Node.js</a> to create a BaaS platform that developers could deploy and ultimately move to any cloud platform of their choice.","type":"text/html"},"link":"http://feedproxy.google.com/~r/ProgrammableWeb/~3/qHpel755J7w/","modules":["http://purl.org/dc/elements/1.1/"],"publishedDate":"2014-05-02T11:07:06.000-05:00","title":"Fe
 edHenry Extends Reach of BaaS Platform","uri":"http://blog.programmableweb.com/?p=83638"}
+{"author":"Michael Vizard","categories":[],"contents":[{}],"description":{"value":"<img align=\"right\" size-thumbnail wp-image-83711\" src=\"http://blog.programmableweb.com/wp-content/Screen-Shot-2014-05-02-at-11.05.53-AM-150x30.png\" alt=\"\" width=\"150\" height=\"30\" />This week at the <a href=\"http://www-01.ibm.com/software/websphere/events/impact/\">IBM Impact 2014 conference</a>, IBM previewed a significant extension to its application development tool portfolio in the form of a new class of rapid application development tools running on top of the <a href=\"http://blog.programmableweb.com/2014/02/24/ibm-extends-api-reach-across-emerging-cloud-ecosystem/\">IBM BlueMix cloud integration service</a>.","type":"text/html"},"link":"http://feedproxy.google.com/~r/ProgrammableWeb/~3/ZLxc4F-uDW0/","modules":["http://purl.org/dc/elements/1.1/"],"publishedDate":"2014-05-02T10:06:57.000-05:00","title":"IBM Previews RapidApps Builder Development Tools","uri":"http://blog.programmablew
 eb.com/?p=83690"}
+{"author":"Greg Bates","categories":[],"contents":[{}],"description":{"value":"<a href=\"http://www.programmableweb.com/api/undata\"><img class=\"imgRight\" src=\"http://www.programmableweb.com/images/apis/at11719.png\" alt=\"UNdata\" /></a>3Scale builds an API for UNData. Xignite creates TypeAhead API for finance industry. Plus: Zapier’s OneNote integration, and to API or not to API.","type":"text/html"},"link":"http://feedproxy.google.com/~r/ProgrammableWeb/~3/betRfB4P6p4/","modules":["http://purl.org/dc/elements/1.1/"],"publishedDate":"2014-05-01T16:00:56.000-05:00","title":"Today in APIs: 3Scale Creates an API for UNData","uri":"http://blog.programmableweb.com/?p=83646"}
+{"author":"Eric Carter","categories":[],"contents":[{}],"description":{"value":"<a href=\"http://www.programmableweb.com/api/heroku\"><img class=\"imgRight\" src=\"http://www.programmableweb.com/images/apis/at2228.png\" alt=\"Heroku\" /></a>Today, <a href=\"https://www.heroku.com/home\">Heroku</a> makes a huge announcement around its support for PHP. PHP is currently one of the most popular languages driving the web. Although PHP drives the likes of some of the webs giants (e.g. Facebook, Etsy, etc.), PHP has traditionally lacked some qualities (e.g. runtime, management, infrastructure elements) that peers like Ruby on Rails, Python with Django, and Node have delivered for quite some time. On April 30, at <a href=\"https://www.fbf8.com/\">Facebook's F8</a>, a new PHP is announced with full Heroku support. On the evening of the announcement, <em>ProgrammableWeb</em> caught up with Heroku's Adam Gross.","type":"text/html"},"link":"http://feedproxy.google.com/~r/ProgrammableWeb/~3/kbKy
 izaYqgA/","modules":["http://purl.org/dc/elements/1.1/"],"publishedDate":"2014-05-01T14:00:46.000-05:00","title":"Heroku Announces Support for the New PHP","uri":"http://blog.programmableweb.com/?p=83572"}
+{"author":"Patricio Robles","categories":[],"contents":[{}],"description":{"value":"<img align=\"right\" size-full wp-image-53858\" title=\"bitcoing\" src=\"http://blog.programmableweb.com/wp-content/bitcoing.jpg\" alt=\"\" width=\"65\" height=\"68\" />The rise of Bitcoin has resulted in the development of a variety of services that enable individuals and companies to track Bitcoin, create and manage Bitcoin wallets, and perform transactions using the cryptocurrency.","type":"text/html"},"link":"http://feedproxy.google.com/~r/ProgrammableWeb/~3/OcabUbq4pVA/","modules":["http://purl.org/dc/elements/1.1/"],"publishedDate":"2014-05-01T12:32:34.000-05:00","title":"How To Build a Bitcoin Currency Converter Using the Blockchain.info APIs","uri":"http://blog.programmableweb.com/?p=82906"}
+{"author":"Michael Vizard","categories":[],"contents":[{}],"description":{"value":"<img src=\"http://blog.programmableweb.com/wp-content/Screen-Shot-2014-04-29-at-10.55.52-AM-150x36.png\" alt=\"\" width=\"150\" height=\"36\" align=\"right\" />After being a provider of integration software primarily used on-premise to integrate enterprise applications, <a href=\"http://www.jitterbit.com/\">Jitterbit</a> today unfurled a cloud integration platform. Andrew Leigh, vice president of products and alliances for Jitterbit, says The Harmony Cloud Integration Platform from Jitterbit offers all the same features of the on-premise edition of Jitterbit integration software in a way that can be easily extended to social, mobile and Internet of Things (IoT) applications using connectors in as little as 10 minutes.","type":"text/html"},"link":"http://feedproxy.google.com/~r/ProgrammableWeb/~3/kXHalQf44cg/","modules":["http://purl.org/dc/elements/1.1/"],"publishedDate":"2014-04-29T09:57:16.000-05:0
 0","title":"Jitterbit Unfurls Harmony Cloud Integration Platform","uri":"http://blog.programmableweb.com/?p=83557"}
+{"author":"Michael Vizard","categories":[],"contents":[{}],"description":{"value":"<img src=\"http://blog.programmableweb.com/wp-content/Screen-Shot-2014-04-29-at-10.30.57-AM-150x18.png\" alt=\"\" width=\"150\" height=\"18\" align=\"right\" /><a href=\"http://www.appdynamics.com/\">AppDynamics</a> is moving to turn its application performance management (APM) software into an application intelligence platform. With the unveiling today of AppDynamics Application Intelligence Platform, AppDynamics is layering application intelligence applications on top of its service while at the same time inviting third-party organizations to build complementary applications that leverage AppDynamics data that can be accessed via RESTful APIs.","type":"text/html"},"link":"http://feedproxy.google.com/~r/ProgrammableWeb/~3/Vo4mV_yuq3E/","modules":["http://purl.org/dc/elements/1.1/"],"publishedDate":"2014-04-29T09:32:28.000-05:00","title":"AppDynamics Exposes App Intelligence via RESTful APIs","uri":"h
 ttp://blog.programmableweb.com/?p=83550"}
+{"author":"Greg Bates","categories":[],"contents":[{}],"description":{"value":"<a href=\"http://www.programmableweb.com/api/change.org\"><img class=\"imgRight\" src=\"http://www.programmableweb.com/images/apis/at8881.png\" alt=\"Change.org\" /></a>HackSummit features a partnership with The Evolve Organisation focused on two problems for democracy. Kings of Code Hack Battle's brilliant hacks. Plus: Yodlee links incubators for fintech innovation, LinkedIn launches Sponsored Updates API, and 10 new APIs.","type":"text/html"},"link":"http://feedproxy.google.com/~r/ProgrammableWeb/~3/12JNO5FUkCE/","modules":["http://purl.org/dc/elements/1.1/"],"publishedDate":"2014-04-28T16:01:00.000-05:00","title":"Today in APIs: HackSummit Partners with Evolve, and 10 New APIs","uri":"http://blog.programmableweb.com/?p=83441"}
+{"author":"Guest Author","categories":[],"contents":[{}],"description":{"value":"<img src=\"http://blog.programmableweb.com/wp-content/Screen-Shot-2014-03-07-at-4.51.06-PM5-150x58.png\" alt=\"\" width=\"150\" height=\"58\" align=\"right\" />So you’ve built a stellar API. Now what? How do you get the word out? First, throw everything you know about marketing out the window. Indeed, the old style of marketing doesn’t apply when it comes to APIs. The notion of building a marketing campaign replete with ads and PR fluff is virtually non-existent in the world of API marketing. Why? When you create an API, you are creating a community. This community of users will drive your marketing in a highly decentralized way.","type":"text/html"},"link":"http://feedproxy.google.com/~r/ProgrammableWeb/~3/O2Ol3kBFpqk/","modules":["http://purl.org/dc/elements/1.1/"],"publishedDate":"2014-04-28T15:00:07.000-05:00","title":"10 Ways To Market Your API (And Why Old-School Marketing Won’t Work)","uri"
 :"http://blog.programmableweb.com/?p=83440"}
+{"author":"Michael Vizard","categories":[],"contents":[{}],"description":{"value":"<img src=\"http://blog.programmableweb.com/wp-content/Screen-Shot-2014-04-28-at-2.25.35-PM-150x61.png\" alt=\"\" width=\"150\" height=\"61\" align=\"right\" />As a first step toward enabling new economic models for software consumption, <a href=\"http://www-03.ibm.com/press/us/en/pressrelease/43723.wss\">today IBM opened an IBM Cloud marketplace</a>, through which it and its partners will make software available to enterprise IT customers. In addition, IBM also announced that it is launching BlueMix Garages.","type":"text/html"},"link":"http://feedproxy.google.com/~r/ProgrammableWeb/~3/-Fot4vkTNi0/","modules":["http://purl.org/dc/elements/1.1/"],"publishedDate":"2014-04-28T13:26:34.000-05:00","title":"IBM Opens Marketplace to Help Drive Emerging API Economy","uri":"http://blog.programmableweb.com/?p=83495"}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3325c6ec/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
index 0d3f54a..c5aae8c 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
@@ -18,8 +18,9 @@
 
 package org.apache.streams.local.tasks;
 
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.core.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.LinkedList;
 import java.util.List;
@@ -30,7 +31,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
 /**
  *
  */
-public class StreamsProcessorTask extends BaseStreamsTask {
+public class StreamsProcessorTask extends BaseStreamsTask implements DatumStatusCountable {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(StreamsProcessorTask.class);
 
 
     private StreamsProcessor processor;
@@ -40,6 +43,13 @@ public class StreamsProcessorTask extends BaseStreamsTask {
     private Queue<StreamsDatum> inQueue;
     private AtomicBoolean isRunning;
 
+    private DatumStatusCounter statusCounter = new DatumStatusCounter();
+
+    @Override
+    public DatumStatusCounter getDatumStatusCounter() {
+        return this.statusCounter;
+    }
+
     /**
      * Default constructor, uses default sleep time of 500ms when inbound queue is empty
      * @param processor process to run in task
@@ -87,11 +97,20 @@ public class StreamsProcessorTask extends BaseStreamsTask {
             StreamsDatum datum = this.inQueue.poll();
             while(this.keepRunning.get()) {
                 if(datum != null) {
-                    List<StreamsDatum> output = this.processor.process(datum);
-                    if(output != null) {
-                        for(StreamsDatum outDatum : output) {
-                            super.addToOutgoingQueue(outDatum);
+                    try {
+                        List<StreamsDatum> output = this.processor.process(datum);
+
+                        if(output != null) {
+                            for(StreamsDatum outDatum : output) {
+                                super.addToOutgoingQueue(outDatum);
+                                statusCounter.incrementStatus(DatumStatus.SUCCESS);
+                            }
                         }
+                    } catch (Throwable e) {
+                        LOGGER.error("Throwable Streams Processor {}", e);
+                        e.printStackTrace();
+                        statusCounter.incrementStatus(DatumStatus.FAIL);
+                        throw new RuntimeException(e);
                     }
                 }
                 else {


[2/2] git commit: Merge PR#55 from 'robdouglas/STREAMS-133'

Posted by mf...@apache.org.
Merge PR#55 from 'robdouglas/STREAMS-133'


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/7216a6fd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/7216a6fd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/7216a6fd

Branch: refs/heads/master
Commit: 7216a6fdf9aed7503764d281bcb4ecd5cd19a419
Parents: 9580fbc 3325c6e
Author: mfranklin <mf...@apache.org>
Authored: Mon Jul 21 14:47:10 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Mon Jul 21 14:47:10 2014 -0400

----------------------------------------------------------------------
 .../streams/rss/processor/RssTypeConverter.java |  72 +++++
 .../streams/rss/provider/RssEventProcessor.java |   8 +-
 .../serializer/SyndEntryActivitySerializer.java | 198 ++++++++++--
 .../rss/serializer/SyndEntrySerializer.java     | 308 +++++++++++++++++++
 .../streams/rss/test/RssTypeConverterTest.java  |  31 ++
 .../test/SyndEntryActivitySerizlizerTest.java   | 102 ++++++
 .../streams/rss/test/Top100FeedsTest.java       |  84 -----
 .../src/test/resources/TestSyndEntryJson.txt    |  10 +
 .../local/tasks/StreamsProcessorTask.java       |  33 +-
 9 files changed, 719 insertions(+), 127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7216a6fd/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
----------------------------------------------------------------------