You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/03/24 16:50:48 UTC

[11/13] dropping streams-pojo-extensions, BC activity can be extended without it move streams cassandra under contrib move streams-eip-routes under runtimes, renaming streams-runtime-webapp

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-pojo-extensions/src/test/java/org/apache/streams/pojo/test/ActivityExtendedSerDeTest.java
----------------------------------------------------------------------
diff --git a/streams-pojo-extensions/src/test/java/org/apache/streams/pojo/test/ActivityExtendedSerDeTest.java b/streams-pojo-extensions/src/test/java/org/apache/streams/pojo/test/ActivityExtendedSerDeTest.java
deleted file mode 100644
index 5abd42d..0000000
--- a/streams-pojo-extensions/src/test/java/org/apache/streams/pojo/test/ActivityExtendedSerDeTest.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.pojo.json.test;
-
-import com.google.common.base.Joiner;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.io.input.BoundedInputStream;
-import org.apache.streams.pojo.json.ActivityExtended;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.pojo.json.Extensions;
-import org.codehaus.jackson.map.DeserializationConfig;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.Ignore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.InputStream;
-import java.util.Map;
-
-/**
- * Created with IntelliJ IDEA.
- * User: sblackmon
- * Date: 8/20/13
- * Time: 5:57 PM
- * To change this template use File | Settings | File Templates.
- */
-public class ActivityExtendedSerDeTest {
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(ActivityExtendedSerDeTest.class);
-
-    private ObjectMapper mapper = new ObjectMapper();
-
-    @Test
-    public void TestActivity()
-    {
-        mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.FALSE);
-        mapper.configure(DeserializationConfig.Feature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
-        mapper.configure(DeserializationConfig.Feature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
-
-        InputStream is = ActivityExtendedSerDeTest.class.getResourceAsStream("/gnip_twitter_extended.json");
-        Joiner joiner = Joiner.on(" ").skipNulls();
-        is = new BoundedInputStream(is, 10000);
-        String json;
-        try {
-            json = joiner.join(IOUtils.readLines(is));
-            LOGGER.debug(json);
-
-            Activity ser = mapper.readValue(json, Activity.class);
-
-            String des = mapper.writeValueAsString(ser);
-            LOGGER.debug(des);
-
-        } catch( Exception e ) {
-            e.printStackTrace();
-            Assert.fail();
-        }
-    }
-
-    @Ignore
-    @Test
-    public void TestActivityExtended()
-    {
-        mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.FALSE);
-        mapper.configure(DeserializationConfig.Feature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
-        mapper.configure(DeserializationConfig.Feature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
-
-        InputStream is = ActivityExtendedSerDeTest.class.getResourceAsStream("/gnip_twitter_extended.json");
-        Joiner joiner = Joiner.on(" ").skipNulls();
-        is = new BoundedInputStream(is, 10000);
-        String json;
-        try {
-            json = joiner.join(IOUtils.readLines(is));
-            LOGGER.debug(json);
-
-            ActivityExtended ser = mapper.readValue(json, ActivityExtended.class);
-
-            Extensions extensions = ser.getExtensions();
-
-            String des = mapper.writeValueAsString(extensions);
-
-            Assert.assertTrue(extensions.getAdditionalProperties().size() > 0);
-            LOGGER.debug(des);
-
-        } catch( Exception e ) {
-            e.printStackTrace();
-            Assert.fail();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-pojo-extensions/src/test/resources/gnip_twitter_extended.json
----------------------------------------------------------------------
diff --git a/streams-pojo-extensions/src/test/resources/gnip_twitter_extended.json b/streams-pojo-extensions/src/test/resources/gnip_twitter_extended.json
deleted file mode 100644
index ee764cb..0000000
--- a/streams-pojo-extensions/src/test/resources/gnip_twitter_extended.json
+++ /dev/null
@@ -1,146 +0,0 @@
-{
-    "gnip": {
-        "matching_rules": [
-            {
-                "tag": "cascade_CA_CA_en"
-            }
-        ],
-        "klout_score": 10,
-        "urls": [
-            {
-                "expanded_url": "https://itunes.apple.com/us/app/smurfs-village/id399648212?mt=8",
-                "url": "http://t.co/Ytn45Pbttk"
-            }
-        ],
-        "klout_profile": {
-            "topics": [],
-            "klout_user_id": "257268143479895040",
-            "link": "http://klout.com/user/id/257268143479895040"
-        },
-        "language": {
-            "value": "fr"
-        }
-    },
-    "body": "Le Grand Schtroumpf confirme que la cascade magique n'est \"Plus tr?�s loin.\" http://t.co/Ytn45Pbttk #SmurfsVillage @BeelineGames",
-    "favoritesCount": 0,
-    "link": "http://twitter.com/spoffff/statuses/372802927385403392",
-    "retweetCount": 0,
-    "twitter_lang": "fr",
-    "postedTime": "2013-08-28T19:28:38.000Z",
-    "provider": {
-        "id": "{link}",
-        "upstreamDuplicates": [],
-        "link": "http://www.twitter.com",
-        "attachments": [],
-        "displayName": "Twitter",
-        "objectType": "service",
-        "downstreamDuplicates": []
-    },
-    "links": [],
-    "actor": {
-        "twitterTimeZone": "Brussels",
-        "friendsCount": 6,
-        "favoritesCount": 0,
-        "link": "http://www.twitter.com/spoffff",
-        "image": {
-            "url": "https://si0.twimg.com/sticky/default_profile_images/default_profile_1_normal.png"
-        },
-        "postedTime": "2012-05-18T15:14:35.000Z",
-        "links": [
-            {
-                "rel": "me",
-                "href": null
-            }
-        ],
-        "listedCount": 0,
-        "downstreamDuplicates": [],
-        "id": "id:twitter.com:583891967",
-        "upstreamDuplicates": [],
-        "languages": [
-            "fr"
-        ],
-        "verified": false,
-        "utcOffset": "7200",
-        "followersCount": 0,
-        "attachments": [],
-        "displayName": "Sabine Chappuis",
-        "preferredUsername": "spoffff",
-        "statusesCount": 87,
-        "objectType": "person"
-    },
-    "object": {
-        "id": "object:search.twitter.com,2005:372802927385403392",
-        "summary": "Le Grand Schtroumpf confirme que la cascade magique n'est \"Plus tr?�s loin.\" http://t.co/Ytn45Pbttk #SmurfsVillage @BeelineGames",
-        "upstreamDuplicates": [],
-        "link": "http://twitter.com/spoffff/statuses/372802927385403392",
-        "postedTime": "2013-08-28T19:28:38.000Z",
-        "attachments": [],
-        "objectType": "note",
-        "downstreamDuplicates": []
-    },
-    "twitter_entities": {
-        "symbols": [],
-        "urls": [
-            {
-                "expanded_url": "http://bit.ly/hUmoRz",
-                "indices": [
-                    77,
-                    99
-                ],
-                "display_url": "bit.ly/hUmoRz",
-                "url": "http://t.co/Ytn45Pbttk"
-            }
-        ],
-        "hashtags": [
-            {
-                "text": "SmurfsVillage",
-                "indices": [
-                    100,
-                    114
-                ]
-            }
-        ],
-        "user_mentions": [
-            {
-                "id": 188075479,
-                "name": "Beeline Interactive",
-                "indices": [
-                    115,
-                    128
-                ],
-                "screen_name": "BeelineGames",
-                "id_str": "188075479"
-            }
-        ]
-    },
-    "extensions": {
-        "w2o": {
-            "tags": [
-                "brand-cascade",
-                "language-en",
-                "country-ca"
-            ],
-            "provider": "twitter",
-            "analyzer": "romance_analyzer",
-            "lang": {
-                "primaryLanguage": "en"
-            }
-        }
-    },
-    "twitter_filter_level": "medium",
-    "guid": "A8fccSz7rpKfDJY078VLyw==_201308",
-    "content": "Le Grand Schtroumpf confirme que la cascade magique n'est \"Plus tr?�s loin.\" http://t.co/Ytn45Pbttk #SmurfsVillage @BeelineGames",
-    "id": "tag:search.twitter.com,2005:372802927385403392",
-    "verb": "post",
-    "generator": {
-        "id": "{link}",
-        "upstreamDuplicates": [],
-        "link": "https://itunes.apple.com/us/app/smurfs-village/id399648212?mt=8&uo=4",
-        "attachments": [],
-        "displayName": "Smurfs' Village on iOS",
-        "downstreamDuplicates": []
-    },
-    "published": "2013-08-28T19:28:38Z",
-    "objectType": "activity"
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-runtimes/pom.xml
----------------------------------------------------------------------
diff --git a/streams-runtimes/pom.xml b/streams-runtimes/pom.xml
index cf0d57f..1abe74e 100644
--- a/streams-runtimes/pom.xml
+++ b/streams-runtimes/pom.xml
@@ -35,5 +35,6 @@
     <modules>
         <module>streams-runtime-local</module>
         <module>streams-runtime-storm</module>
+        <module>streams-runtime-webapp</module>
     </modules>
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-runtimes/streams-runtime-webapp/ReadMe.txt
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-webapp/ReadMe.txt b/streams-runtimes/streams-runtime-webapp/ReadMe.txt
new file mode 100644
index 0000000..19a1d19
--- /dev/null
+++ b/streams-runtimes/streams-runtime-webapp/ReadMe.txt
@@ -0,0 +1,32 @@
+Camel Router WAR Project with Web Console and REST Support
+==========================================================
+
+This project bundles the Camel Web Console, REST API, and some
+sample routes as a WAR. You can build the WAR by running
+
+mvn install
+
+You can then run the project by dropping the WAR into your 
+favorite web container or just run
+
+mvn jetty:run
+
+to start up and deploy to Jetty.
+
+
+Web Console
+===========
+
+You can view the Web Console by pointing your browser to http://localhost:8080/
+
+You should be able to do things like
+
+    * browse the available endpoints
+    * browse the messages on an endpoint if it is a BrowsableEndpoint
+    * send a message to an endpoint
+    * create new endpoints
+
+For more help see the Apache Camel documentation
+
+    http://camel.apache.org/
+    

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-runtimes/streams-runtime-webapp/pom.xml
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-webapp/pom.xml b/streams-runtimes/streams-runtime-webapp/pom.xml
new file mode 100644
index 0000000..8753bb7
--- /dev/null
+++ b/streams-runtimes/streams-runtime-webapp/pom.xml
@@ -0,0 +1,209 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xmlns="http://maven.apache.org/POM/4.0.0"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.streams</groupId>
+    <artifactId>streams-runtimes</artifactId>
+    <version>0.1-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>streams-runtime-webapp</artifactId>
+
+  <name>${bundle.symbolicName} [${bundle.namespace}]</name>
+
+  <properties>
+        <bundle.symbolicName>streams-eip-routes</bundle.symbolicName>
+        <bundle.namespace>org.apache.streams</bundle.namespace>
+        <jackson.old.version>1.9.11</jackson.old.version>
+    </properties>
+
+    <packaging>bundle</packaging>
+
+    <repositories>
+        <repository>
+            <id>clojars.org</id>
+            <url>http://clojars.org/repo</url>
+        </repository>
+    </repositories>
+
+    <build>
+
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+            </resource>
+            <!--
+             | example additional resource entries, useful when building Eclipse RCP applications
+            -->
+            <resource>
+                <directory>.</directory>
+                <includes>
+                    <include>plugin.xml</include>
+                    <include>plugin.properties</include>
+                    <include>icons/**</include>
+                    <include>META-INF/*</include>
+                </includes>
+            </resource>
+        </resources>
+        <plugins>
+            <plugin>
+                <groupId>org.ops4j</groupId>
+                <artifactId>maven-pax-plugin</artifactId>
+                <!--
+                 | enable improved OSGi compilation support for the bundle life-cycle.
+                 | to switch back to the standard bundle life-cycle, move this setting
+                 | down to the maven-bundle-plugin section
+                -->
+                <extensions>true</extensions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-bundle-plugin</artifactId>
+                <version>1.4.3</version>
+                <!--
+                 | the following instructions build a simple set of public/private classes into an OSGi bundle
+                -->
+                <configuration>
+                    <instructions>
+                        <Bundle-SymbolicName>${bundle.symbolicName}</Bundle-SymbolicName>
+                        <Bundle-Version>${project.version}</Bundle-Version>
+                        <Export-Package>${bundle.namespace};version="${project.version}",org.apache.streams.messaging.configuration,org.apache.streams.messaging.routers,org.apache.streams.messaging.rules,org.apache.streams.messaging.processors,org.apache.streams.messaging.aggregation,org.apache.streams.messaging.service, org.apache.streams.messaging.storm,org.apache.activemq,org.codehaus.jackson.*;version="${jackson.version}"</Export-Package>
+                        <Private-Package>${bundle.namespace}.messaging.routers.impl.*,${bundle.namespace}.messaging.rules.impl.*, ${bundle.namespace}.messaging.service.impl.*</Private-Package>
+                        <Import-Package>org.apache.camel.*;version="2.8.5",org.apache.streams.messaging.configuration,org.apache.activemq.camel.component,org.apache.activemq,org.apache.activemq.pool,org.apache.camel.component.jms,org.springframework.*;version="3.0.6.RELEASE",org.apache.commons.logging,org.apache.streams.*,org.apache.streams.osgi.components,org.apache.streams.osgi.components.activityconsumer,org.apache.streams.osgi.components.activityconsumer.impl,org.apache.streams.osgi.components.activitysubscriber,org.apache.streams.osgi.components.activitysubscriber.impl,org.apache.streams.messaging.processors,org.apache.streams.messaging.aggregation,javax.jms, javax.net.ssl, javax.transaction.xa, org.apache.activemq.advisory, org.apache.activemq.blob, org.apache.activemq.broker, org.apache.activemq.broker.region, org.apache.activemq.command, org.apache.activemq.filter, org.apache.activemq.jndi, org.apache.activemq.management, org.apache.activemq.selector, org.apache.activemq.sta
 te, org.apache.activemq.thread, org.apache.activemq.transaction, org.apache.activemq.transport, org.apache.activemq.transport.failover, org.apache.activemq.transport.tcp, org.apache.activemq.usage, org.apache.activemq.util, org.slf4j,org.codehaus.jackson;version="${jackson.version}",javax.xml.datatype, javax.xml.namespace, javax.xml.parsers, org.joda.time, org.joda.time.format, org.w3c.dom, org.w3c.dom.bootstrap, org.w3c.dom.ls, org.xml.sax, org.apache.rave.model, org.apache.rave.portal.model.impl, backtype.storm, backtype.storm.coordination, backtype.storm.generated, backtype.storm.spout, backtype.storm.task, backtype.storm.topology, backtype.storm.topology.base, backtype.storm.tuple, javax.annotation, backtype.storm.utils</Import-Package>
+                    </instructions>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-core</artifactId>
+            <version>2.9.0</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.activemq</groupId>
+            <artifactId>activemq-camel</artifactId>
+            <version>5.5.1</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.activemq</groupId>
+            <artifactId>activemq-pool</artifactId>
+            <version>5.5.1</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-core</artifactId>
+            <version>3.0.6.RELEASE</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-context</artifactId>
+            <version>3.0.6.RELEASE</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.codehaus.jackson</groupId>
+            <artifactId>jackson-mrbean</artifactId>
+            <version>${jackson.old.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.codehaus.jackson</groupId>
+            <artifactId>jackson-mapper-asl</artifactId>
+            <version>${jackson.old.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.osgi</groupId>
+            <artifactId>osgi_R4_core</artifactId>
+            <version>1.0</version>
+            <scope>provided</scope>
+            <optional>true</optional>
+        </dependency>
+
+        <dependency>
+            <groupId>org.osgi</groupId>
+            <artifactId>osgi_R4_compendium</artifactId>
+            <version>1.0</version>
+            <scope>provided</scope>
+            <optional>true</optional>
+        </dependency>
+
+
+        <dependency>
+            <groupId>org.apache.streams.osgi.components</groupId>
+            <artifactId>activity-registration</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.streams.osgi.components</groupId>
+            <artifactId>activity-consumer</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.streams.osgi.components</groupId>
+            <artifactId>activity-subscriber</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-cassandra</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.rave</groupId>
+            <artifactId>rave-core</artifactId>
+            <version>${rave.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.rave</groupId>
+            <artifactId>rave-core-api</artifactId>
+            <version>${rave.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>storm</groupId>
+            <artifactId>storm</artifactId>
+            <version>0.8.2</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.easymock</groupId>
+            <artifactId>easymock</artifactId>
+            <version>3.1</version>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/aggregation/ActivityAggregator.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/aggregation/ActivityAggregator.java b/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/aggregation/ActivityAggregator.java
new file mode 100644
index 0000000..dc7ba0c
--- /dev/null
+++ b/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/aggregation/ActivityAggregator.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.messaging.aggregation;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.streams.messaging.service.impl.CassandraActivityService;
+import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscriber;
+import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscriberWarehouse;
+import org.springframework.scheduling.annotation.Scheduled;
+
+import java.util.*;
+
+public class ActivityAggregator {
+
+    private ActivityStreamsSubscriberWarehouse activityStreamsSubscriberWarehouse;
+    private CassandraActivityService activityService;
+    private static final transient Log LOG = LogFactory.getLog(ActivityAggregator.class);
+
+    public void setActivityStreamsSubscriberWarehouse(ActivityStreamsSubscriberWarehouse activityStreamsSubscriberWarehouse) {
+        this.activityStreamsSubscriberWarehouse = activityStreamsSubscriberWarehouse;
+    }
+
+    public void setActivityService(CassandraActivityService activityService) {
+        this.activityService = activityService;
+    }
+
+    @Scheduled(fixedRate=30000)
+    public void distributeToSubscribers() {
+        for (ActivityStreamsSubscriber subscriber : activityStreamsSubscriberWarehouse.getAllSubscribers()) {
+              updateSubscriber(subscriber);
+        }
+    }
+
+    public void updateSubscriber(ActivityStreamsSubscriber subscriber){
+        Set<String> activities = new TreeSet<String>();
+        activities.addAll(activityService.getActivitiesForFilters(subscriber.getActivityStreamsSubscriberConfiguration().getFilters(), subscriber.getLastUpdated()));
+        //TODO: an activity posted in between the cql query and setting the lastUpdated field will be lost
+        subscriber.setLastUpdated(new Date());
+        subscriber.receive(new ArrayList<String>(activities));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/configuration/EipConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/configuration/EipConfigurator.java b/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/configuration/EipConfigurator.java
new file mode 100644
index 0000000..460c43a
--- /dev/null
+++ b/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/configuration/EipConfigurator.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.messaging.configuration;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+@Component
+public class EipConfigurator {
+
+
+
+    @Value("${consumer.inRouteHost}")
+    private String consumerInRouteHost;
+
+    @Value("${consumer.inRoutePort}")
+    private String consumerInRoutePort;
+
+    @Value("${subscriber.inRouteHost}")
+    private String subscriberInRouteHost;
+
+    @Value("${subscriber.inRoutePort}")
+    private String subscriberInRoutePort;
+
+
+    @Value("${consumer.activityQUri}")
+    private String consumerActivityQUri;
+
+    @Value("${consumer.publisherEndpointProtocol}")
+    private String publisherEndpointProtocol;
+
+    @Value("${consumer.publisherEndpointUrlResource}")
+    private String publisherEndpointUrlResource;
+
+    @Value("${consumer.receiveMethod}")
+    private String consumerReceiveMethod;
+
+    @Value("${consumer.splitMethod}")
+    private String consumerSplitMethod;
+
+    @Value("${subscriber.subscriberEndpointProtocol}")
+    private String subscriberEndpointProtocol;
+
+    @Value("${subscriber.subscriberEndpointUrlResource}")
+    private String subscriberEndpointUrlResource;
+
+    @Value("${subscriber.receiveMethod}")
+    private String subscriberReceiveMethod;
+
+    @Value("${subscriber.postMethod}")
+    private String subscriberPostMethod;
+
+    @Value("${subscriber.getMethod}")
+    private String subscriberGetMethod;
+
+
+    @Value("${servlet.baseUrlPath}")
+    private String baseUrlPath;
+
+
+    public static String ENDPOINT_PROTOCOL_JETTY="jetty:http://";
+    public static String ENDPOINT_PROTOCOL_SERVLET="servlet:///";
+
+    public String getConsumerInRouteHost() {
+        return consumerInRouteHost;
+    }
+
+    public String getConsumerInRoutePort() {
+        return consumerInRoutePort;
+    }
+
+    public String getConsumerActivityQUri() {
+        return consumerActivityQUri;
+    }
+
+    public void setConsumerActivityQUri(String consumerActivityQUri) {
+        this.consumerActivityQUri = consumerActivityQUri;
+    }
+
+    public void setConsumerInRoutePort(String consumerInRoutePort) {
+        this.consumerInRoutePort = consumerInRoutePort;
+    }
+
+    public void setConsumerInRouteHost(String consumerInRouteHost) {
+        this.consumerInRouteHost = consumerInRouteHost;
+    }
+
+    public String getSubscriberInRouteHost() {
+        return subscriberInRouteHost;
+    }
+
+    public void setSubscriberInRouteHost(String subscriberInRouteHost) {
+        this.subscriberInRouteHost = subscriberInRouteHost;
+    }
+
+    public String getSubscriberInRoutePort() {
+        return subscriberInRoutePort;
+    }
+
+    public void setSubscriberInRoutePort(String subscriberInRoutePort) {
+        this.subscriberInRoutePort = subscriberInRoutePort;
+    }
+
+    public String getPublisherEndpointProtocol() {
+        return publisherEndpointProtocol;
+    }
+
+    public void setPublisherEndpointProtocol(String publisherEndpointProtocol) {
+        this.publisherEndpointProtocol = publisherEndpointProtocol;
+    }
+
+    public String getPublisherEndpointUrlResource() {
+        return publisherEndpointUrlResource;
+    }
+
+    public void setPublisherEndpointUrlResource(String publisherEndpointUrlResource) {
+        this.publisherEndpointUrlResource = publisherEndpointUrlResource;
+    }
+
+    public String getConsumerReceiveMethod() {
+        return consumerReceiveMethod;
+    }
+
+    public void setConsumerReceiveMethod(String consumerReceiveMethod) {
+        this.consumerReceiveMethod = consumerReceiveMethod;
+    }
+
+    public String getConsumerSplitMethod() {
+        return consumerSplitMethod;
+    }
+
+    public void setConsumerSplitMethod(String consumerSplitMethod) {
+        this.consumerSplitMethod = consumerSplitMethod;
+    }
+
+    public String getSubscriberEndpointProtocol() {
+        return subscriberEndpointProtocol;
+    }
+
+    public void setSubscriberEndpointProtocol(String subscriberEndpointProtocol) {
+        this.subscriberEndpointProtocol = subscriberEndpointProtocol;
+    }
+
+    public String getSubscriberEndpointUrlResource() {
+        return subscriberEndpointUrlResource;
+    }
+
+    public void setSubscriberEndpointUrlResource(String subscriberEndpointUrlResource) {
+        this.subscriberEndpointUrlResource = subscriberEndpointUrlResource;
+    }
+
+    public String getSubscriberReceiveMethod() {
+        return subscriberReceiveMethod;
+    }
+
+    public void setSubscriberReceiveMethod(String subscriberReceiveMethod) {
+        this.subscriberReceiveMethod = subscriberReceiveMethod;
+    }
+
+    public String getSubscriberPostMethod() {
+        return subscriberPostMethod;
+    }
+
+    public void setSubscriberPostMethod(String subscriberPostMethod) {
+        this.subscriberPostMethod = subscriberPostMethod;
+    }
+
+    public String getSubscriberGetMethod() {
+        return subscriberGetMethod;
+    }
+
+    public void setSubscriberGetMethod(String subscriberGetMethod) {
+        this.subscriberGetMethod = subscriberGetMethod;
+    }
+
+    public String getBaseUrlPath() {
+        return baseUrlPath;
+    }
+
+    public void setBaseUrlPath(String baseUrlPath) {
+        this.baseUrlPath = baseUrlPath;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/processors/ActivityPublisherRegistrationProcessor.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/processors/ActivityPublisherRegistrationProcessor.java b/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/processors/ActivityPublisherRegistrationProcessor.java
new file mode 100644
index 0000000..741a63c
--- /dev/null
+++ b/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/processors/ActivityPublisherRegistrationProcessor.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.messaging.processors;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.streams.osgi.components.activityconsumer.ActivityConsumer;
+import org.codehaus.jackson.map.DeserializationConfig;
+import org.codehaus.jackson.map.ObjectMapper;
+
+
+public class ActivityPublisherRegistrationProcessor implements Processor{
+    private static final transient Log LOG = LogFactory.getLog(ActivityStreamsSubscriberRegistrationProcessor.class);
+    public void process(Exchange exchange){
+        //add the necessary headers to the message so that the activity registration component
+        //can do a lookup to either make a new processor and endpoint, or pass the message to the right one
+        String httpMethod = exchange.getIn().getHeader("CamelHttpMethod").toString();
+
+        if (!httpMethod.equals("POST")){
+            //reject anything that isn't a post...Camel 2.10 solves needing this check, however, SM 4.4 doesn't have the latest
+            exchange.getOut().setFault(true);
+            exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE,405);
+        }  else {
+
+             //for now...just expect a post with a uri in the body...should have some checking here with http response codes
+            // authentication, all that good stuff...happens in the registration module
+
+            String body = exchange.getIn().getBody(String.class);
+            ObjectMapper mapper = new ObjectMapper();
+            mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES,false);
+
+            try {
+
+                // read from file, convert it to user class
+                ActivityConsumer configuration = mapper.readValue(body, ActivityConsumer.class);
+                if (configuration.getSrc()==null){
+                   LOG.info("configuration src is null");
+                   throw new Exception();
+                }
+
+                exchange.getOut().setBody(configuration);
+
+            } catch (Exception e) {
+                LOG.info("error: " + e);
+                exchange.getOut().setFault(true);
+                exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE,400);
+                exchange.getOut().setBody("POST should contain a valid JSON configuration for registering as an Activity Publisher (check that src element is valid).");
+            }
+        }
+
+
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/processors/ActivityStreamsSubscriberRegistrationProcessor.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/processors/ActivityStreamsSubscriberRegistrationProcessor.java b/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/processors/ActivityStreamsSubscriberRegistrationProcessor.java
new file mode 100644
index 0000000..201eebd
--- /dev/null
+++ b/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/processors/ActivityStreamsSubscriberRegistrationProcessor.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.messaging.processors;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.streams.messaging.service.SubscriptionService;
+import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscription;
+import org.codehaus.jackson.map.DeserializationConfig;
+import org.codehaus.jackson.map.ObjectMapper;
+
+
+public class ActivityStreamsSubscriberRegistrationProcessor implements Processor{
+    private static final transient Log LOG = LogFactory.getLog(ActivityStreamsSubscriberRegistrationProcessor.class);
+    private SubscriptionService subscriptionService;
+
+    public ActivityStreamsSubscriberRegistrationProcessor(SubscriptionService subscriptionService){
+        this.subscriptionService = subscriptionService;
+    }
+
+    public void process(Exchange exchange){
+        LOG.info("processing the subscriber...");
+        //add the necessary headers to the message so that the activity registration component
+        //can do a lookup to either make a new processor and endpoint, or pass the message to the right one
+        String httpMethod = exchange.getIn().getHeader("CamelHttpMethod").toString();
+
+        if (!httpMethod.equals("POST")){
+            //reject anything that isn't a post...Camel 2.10 solves needing this check, however, SM 4.4 doesn't have the latest
+            exchange.getOut().setFault(true);
+            exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE,405);
+        }  else {
+
+             //for now...just expect a post with a uri in the body...should have some checking here with http response codes
+            // authentication, all that good stuff...happens in the registration module
+
+
+            String body = exchange.getIn().getBody(String.class);
+
+            LOG.info("receiving the subscriber: "+body);
+            //OAuth token? What does subscriber post to init a subscription URL?
+            //maybe its a list of URLs to subscribe to subscriptions=1,2,3,4&auth_token=XXXX
+
+            ObjectMapper mapper = new ObjectMapper();
+            mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES,false);
+
+            try {
+
+                // read from file, convert it to user class
+                ActivityStreamsSubscription configuration = mapper.readValue(body, ActivityStreamsSubscription.class);
+                if(configuration.getFilters() == null){
+                    configuration.setFilters(subscriptionService.getFilters(configuration.getAuthToken()));
+                }else{
+                    subscriptionService.saveFilters(configuration);
+                }
+                exchange.getOut().setBody(configuration);
+
+            } catch (Exception e) {
+                LOG.info("exception" + e);
+                exchange.getOut().setFault(true);
+                exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE,400);
+                exchange.getOut().setBody("POST should contain a valid Subscription configuration object.");
+            }
+
+
+
+            //just pass this on to the route creator, body will be the dedicated URL for this subscriber
+
+        }
+
+
+
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/routers/ActivityConsumerRouteBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/routers/ActivityConsumerRouteBuilder.java b/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/routers/ActivityConsumerRouteBuilder.java
new file mode 100644
index 0000000..dea8781
--- /dev/null
+++ b/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/routers/ActivityConsumerRouteBuilder.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.messaging.routers;
+
+
+
+import org.apache.camel.Exchange;
+import org.apache.streams.osgi.components.activityconsumer.ActivityConsumer;
+
+
+public interface ActivityConsumerRouteBuilder {
+
+
+    void createNewRouteForConsumer(Exchange exchange, ActivityConsumer activityConsumer);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/routers/ActivityStreamsSubscriberRouteBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/routers/ActivityStreamsSubscriberRouteBuilder.java b/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/routers/ActivityStreamsSubscriberRouteBuilder.java
new file mode 100644
index 0000000..6947722
--- /dev/null
+++ b/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/routers/ActivityStreamsSubscriberRouteBuilder.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.messaging.routers;
+
+
+
+import org.apache.camel.Exchange;
+import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscriber;
+
+
+public interface ActivityStreamsSubscriberRouteBuilder {
+
+
+    void createNewRouteForSubscriber(Exchange exchange, ActivityStreamsSubscriber activityStreamsSubscriber);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/routers/impl/ActivityConsumerRouter.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/routers/impl/ActivityConsumerRouter.java b/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/routers/impl/ActivityConsumerRouter.java
new file mode 100644
index 0000000..20b8246
--- /dev/null
+++ b/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/routers/impl/ActivityConsumerRouter.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.messaging.routers.impl;
+
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.streams.messaging.routers.ActivityConsumerRouteBuilder;
+
+
+import org.apache.streams.osgi.components.activityconsumer.ActivityConsumerWarehouse;
+import org.apache.streams.osgi.components.activityconsumer.ActivityConsumer;
+import org.apache.streams.messaging.configuration.EipConfigurator;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.apache.camel.Exchange;
+import org.apache.camel.CamelContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.UUID;
+
+
+public class ActivityConsumerRouter extends RouteBuilder implements ActivityConsumerRouteBuilder {
+    private static final transient Log LOG = LogFactory.getLog(ActivityConsumerRouter.class);
+
+    @Autowired
+    private EipConfigurator configuration;
+
+    protected CamelContext camelContext;
+
+    private ActivityConsumerWarehouse activityConsumerWarehouse;
+
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    public void setActivityConsumerWarehouse(ActivityConsumerWarehouse activityConsumerWarehouse) {
+        this.activityConsumerWarehouse = activityConsumerWarehouse;
+    }
+
+
+    public void createNewRouteForConsumer(Exchange exchange, ActivityConsumer activityConsumer){
+
+        //todo: add some better scheme then getCount for URL...
+        //todo: make the route again if consumer exists...and context doesn't have route
+        if (activityConsumer.isAuthenticated()){
+                ActivityConsumer existingConsumer = activityConsumerWarehouse.findConsumerBySrc(activityConsumer.getSrc().toASCIIString());
+
+                if (existingConsumer==null){
+
+                  try{
+
+                    if (configuration.getPublisherEndpointProtocol().equals(EipConfigurator.ENDPOINT_PROTOCOL_JETTY)){
+                        activityConsumer.setInRoute(configuration.getConsumerInRouteHost()+ ":" + configuration.getConsumerInRoutePort() +"/" + configuration.getPublisherEndpointUrlResource() + "/" + UUID.randomUUID());
+                        //set the body to the url the producer should post to
+                        exchange.getOut().setBody("http://" + activityConsumer.getInRoute());
+                    }else if (configuration.getPublisherEndpointProtocol().equals(EipConfigurator.ENDPOINT_PROTOCOL_SERVLET)){
+                        activityConsumer.setInRoute( configuration.getPublisherEndpointUrlResource() + "/" + UUID.randomUUID());
+                        //set the body to the url the producer should post to
+                        exchange.getOut().setBody(configuration.getBaseUrlPath() + activityConsumer.getInRoute());
+                    } else{
+                        throw new Exception("No supported endpoint protocol is configured.");
+                    }
+
+
+                        //setup a message queue for this consumer.getInRoute()
+                        camelContext.addRoutes(new DynamicConsumerRouteBuilder(configuration,camelContext, configuration.getPublisherEndpointProtocol() + activityConsumer.getInRoute(), activityConsumer));
+
+
+                        LOG.info("all messages sent from " + activityConsumer.getSrc() + " must be posted to " + activityConsumer.getInRoute());
+                        //only add the route to the warehouse after its been created in messaging system...
+                        activityConsumerWarehouse.register(activityConsumer);
+                    }catch (Exception e){
+                        exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE,500);
+                        exchange.getOut().setBody("error creating route: " + e);
+                        LOG.error("error creating route: " + e);
+                    }
+
+                } else{
+
+                    exchange.getOut().setBody(configuration.getBaseUrlPath() + existingConsumer.getInRoute());
+                }
+
+        }else{
+                exchange.getOut().setFault(true);
+                exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE,401);
+                exchange.getOut().setBody("Authentication failed.");
+        }
+
+    }
+
+
+    public void configure() throws java.lang.Exception{
+        //nothing...set the context?
+
+    }
+
+    /**
+     * This route builder is a skeleton to add new routes at runtime
+     */
+    private static final class DynamicConsumerRouteBuilder extends RouteBuilder {
+        private final String from;
+        private ActivityConsumer activityConsumer;
+
+
+        private EipConfigurator configuration;
+
+        private DynamicConsumerRouteBuilder(EipConfigurator configuration, CamelContext context, String from, ActivityConsumer activityConsumer) {
+            super(context);
+            this.from = from;
+            this.activityConsumer = activityConsumer;
+            this.configuration = configuration;
+        }
+
+        @Override
+        public void configure() throws Exception {
+
+
+            from(from)
+                    .bean(activityConsumer, configuration.getConsumerReceiveMethod()).setBody(body())
+                    .split()
+                    .method(activityConsumer, configuration.getConsumerSplitMethod())
+                    .to(configuration.getConsumerActivityQUri());
+
+
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/routers/impl/ActivityStreamsSubscriberRouter.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/routers/impl/ActivityStreamsSubscriberRouter.java b/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/routers/impl/ActivityStreamsSubscriberRouter.java
new file mode 100644
index 0000000..68ef0a5
--- /dev/null
+++ b/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/routers/impl/ActivityStreamsSubscriberRouter.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.messaging.routers.impl;
+
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.streams.messaging.aggregation.ActivityAggregator;
+import org.apache.streams.messaging.configuration.EipConfigurator;
+import org.apache.streams.messaging.routers.ActivityStreamsSubscriberRouteBuilder;
+import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscriber;
+import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscriberWarehouse;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import java.util.UUID;
+
+
+public class ActivityStreamsSubscriberRouter extends RouteBuilder implements ActivityStreamsSubscriberRouteBuilder {
+    private static final transient Log LOG = LogFactory.getLog(ActivityStreamsSubscriberRouter.class);
+
+    @Autowired
+    private EipConfigurator configuration;
+
+    protected CamelContext camelContext;
+
+    @Autowired
+    private ActivityAggregator activityAggregator;
+
+    private ActivityStreamsSubscriberWarehouse activityStreamsSubscriberWarehouse;
+
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    public void setActivityStreamsSubscriberWarehouse(ActivityStreamsSubscriberWarehouse activityStreamsSubscriberWarehouse) {
+        this.activityStreamsSubscriberWarehouse = activityStreamsSubscriberWarehouse;
+    }
+
+
+    public void createNewRouteForSubscriber(Exchange exchange, ActivityStreamsSubscriber activityStreamsSubscriber){
+
+        //todo: add some better scheme then getCount for URL...
+        //todo: make the route again if subscriber exists...and context doesn't have route
+        if (activityStreamsSubscriber.isAuthenticated()){
+
+            try{
+
+                if (configuration.getSubscriberEndpointProtocol().equals(EipConfigurator.ENDPOINT_PROTOCOL_JETTY)){
+                    activityStreamsSubscriber.setInRoute(configuration.getSubscriberInRouteHost()+ ":" + configuration.getSubscriberInRoutePort() +"/" + configuration.getSubscriberEndpointUrlResource() + "/" + UUID.randomUUID());
+                    //set the body to the url the producer should post to
+                    exchange.getOut().setBody("http://" + activityStreamsSubscriber.getInRoute());
+                }else if (configuration.getSubscriberEndpointProtocol().equals(EipConfigurator.ENDPOINT_PROTOCOL_SERVLET)){
+                    activityStreamsSubscriber.setInRoute( configuration.getSubscriberEndpointUrlResource() + "/" + UUID.randomUUID());
+                    //set the body to the url the producer should post to
+                    exchange.getOut().setBody(configuration.getBaseUrlPath() + activityStreamsSubscriber.getInRoute());
+                } else{
+                    throw new Exception("No supported endpoint protocol is configured.");
+                }
+
+                //setup a message queue for this consumer.getInRoute()
+                camelContext.addRoutes(new DynamicSubscriberRouteBuilder(configuration,camelContext, configuration.getSubscriberEndpointProtocol() + activityStreamsSubscriber.getInRoute(), activityStreamsSubscriber));
+
+                activityAggregator.updateSubscriber(activityStreamsSubscriber);
+                activityStreamsSubscriberWarehouse.register(activityStreamsSubscriber);
+            }catch (Exception e){
+                exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE,500);
+                exchange.getOut().setBody("error creating route: " + e);
+                LOG.error("error creating route: " + e);
+            }
+
+        }else{
+            exchange.getOut().setFault(true);
+            exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE,401);
+            exchange.getOut().setBody("Authentication failed.");
+        }
+
+    }
+
+
+
+
+    public void configure() throws Exception{
+        //nothing...set the context?
+
+    }
+
+    /**
+     * This route builder is a skeleton to add new routes at runtime
+     */
+    private static final class DynamicSubscriberRouteBuilder extends RouteBuilder {
+        private final String from;
+        private ActivityStreamsSubscriber activityStreamsSubscriber;
+
+
+        private EipConfigurator configuration;
+
+        private DynamicSubscriberRouteBuilder(EipConfigurator configuration, CamelContext context, String from, ActivityStreamsSubscriber activityStreamsSubscriber) {
+            super(context);
+            this.from = from;
+            this.activityStreamsSubscriber = activityStreamsSubscriber;
+            this.configuration = configuration;
+        }
+
+        @Override
+        public void configure() throws Exception {
+
+
+            from(from)
+                    .choice()
+                        .when(header("CamelHttpMethod").isEqualTo("POST"))
+                            //when its a post...it goes to adding a new src
+                            .bean(activityStreamsSubscriber, configuration.getSubscriberPostMethod()).setBody(body())
+                        .when(header("CamelHttpMethod").isEqualTo("GET"))
+                                // when its a GET it goes to getStream()
+                            .bean(activityStreamsSubscriber, configuration.getSubscriberGetMethod()) ;
+
+
+
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/service/ActivityService.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/service/ActivityService.java b/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/service/ActivityService.java
new file mode 100644
index 0000000..0c85974
--- /dev/null
+++ b/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/service/ActivityService.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.messaging.service;
+
+import org.apache.camel.Exchange;
+
+import java.util.Date;
+import java.util.List;
+
+public interface ActivityService {
+
+    void receiveExchange(Exchange exchange);
+
+    List<String> getActivitiesForFilters(List<String> filters, Date lastUpdated);
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/service/SubscriptionService.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/service/SubscriptionService.java b/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/service/SubscriptionService.java
new file mode 100644
index 0000000..98f585d
--- /dev/null
+++ b/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/service/SubscriptionService.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.messaging.service;
+
+import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscription;
+
+import java.util.List;
+
+public interface SubscriptionService {
+
+    List<String> getFilters(String authToken);
+    void saveFilters(ActivityStreamsSubscription subscription);
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/service/impl/CassandraActivityService.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/service/impl/CassandraActivityService.java b/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/service/impl/CassandraActivityService.java
new file mode 100644
index 0000000..89f71ab
--- /dev/null
+++ b/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/service/impl/CassandraActivityService.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.messaging.service.impl;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.camel.Exchange;
+import org.apache.rave.model.ActivityStreamsEntry;
+import org.apache.streams.cassandra.model.CassandraActivityStreamsEntry;
+import org.apache.streams.cassandra.repository.impl.CassandraActivityStreamsRepository;
+import org.apache.streams.messaging.service.ActivityService;
+import org.codehaus.jackson.map.DeserializationConfig;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+
+public class CassandraActivityService implements ActivityService {
+
+    private static final transient Log LOG = LogFactory.getLog(CassandraActivityService.class);
+
+    private CassandraActivityStreamsRepository cassandraActivityStreamsRepository;
+    private ObjectMapper mapper;
+
+    @Autowired
+    public CassandraActivityService(CassandraActivityStreamsRepository cassandraActivityStreamsRepository, ObjectMapper mapper) {
+        this.cassandraActivityStreamsRepository = cassandraActivityStreamsRepository;
+        this.mapper = mapper;
+        mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+    }
+
+    @Override
+    public void receiveExchange(Exchange exchange) {
+
+        //receive the exchange as a list
+        List<Exchange> grouped = exchange.getProperty(Exchange.GROUPED_EXCHANGE, List.class);
+
+        for (Exchange e : grouped) {
+            //get activity off of exchange
+            LOG.info("Exchange: " + e);
+
+            //extract the ActivityStreamsEntry object and save it in the database
+            LOG.info("About to preform the translation to JSON Object");
+            String activityJson = e.getIn().getBody(String.class);
+
+            try {
+                ActivityStreamsEntry streamsEntry = mapper.readValue(activityJson, CassandraActivityStreamsEntry.class);
+                streamsEntry.setPublished(new Date());
+                cassandraActivityStreamsRepository.save(streamsEntry);
+            } catch (IOException err) {
+                LOG.error("there was an error while converting the json string to an object and saving to the database", err);
+            }
+
+        }
+    }
+
+    @Override
+    public List<String> getActivitiesForFilters(List<String> filters, Date lastUpdated) {
+        List<CassandraActivityStreamsEntry> activityObjects = cassandraActivityStreamsRepository.getActivitiesForFilters(filters, lastUpdated);
+        Collections.sort(activityObjects, Collections.reverseOrder());
+        //TODO: make the number of streams returned configurable
+        return getJsonList(activityObjects.subList(0,Math.min(activityObjects.size(),10)));
+    }
+
+    private List<String> getJsonList(List<CassandraActivityStreamsEntry> activities) {
+        List<String> jsonList = new ArrayList<String>();
+        for (ActivityStreamsEntry entry : activities) {
+            try {
+                jsonList.add(mapper.writeValueAsString(entry));
+            } catch (IOException e) {
+                LOG.error("There was an error while trying to convert the java object to a string: " + entry, e);
+            }
+        }
+        return jsonList;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/service/impl/CassandraSubscriptionService.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/service/impl/CassandraSubscriptionService.java b/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/service/impl/CassandraSubscriptionService.java
new file mode 100644
index 0000000..8972d1e
--- /dev/null
+++ b/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/service/impl/CassandraSubscriptionService.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.messaging.service.impl;
+
+import org.apache.streams.cassandra.repository.impl.CassandraSubscriptionRepository;
+import org.apache.streams.messaging.service.SubscriptionService;
+import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscription;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class CassandraSubscriptionService implements SubscriptionService {
+
+    private CassandraSubscriptionRepository repository;
+
+    public CassandraSubscriptionService(CassandraSubscriptionRepository repository){
+        this.repository = repository;
+    }
+
+    public List<String> getFilters(String authToken){
+          return Arrays.asList(repository.getFilters(authToken).split(" "));
+    }
+
+    public void saveFilters(ActivityStreamsSubscription subscription){
+          repository.save(subscription);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-runtimes/streams-runtime-webapp/src/main/resources/META-INF/spring/propertiesLoader.xml
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-webapp/src/main/resources/META-INF/spring/propertiesLoader.xml b/streams-runtimes/streams-runtime-webapp/src/main/resources/META-INF/spring/propertiesLoader.xml
new file mode 100644
index 0000000..60a3f1f
--- /dev/null
+++ b/streams-runtimes/streams-runtime-webapp/src/main/resources/META-INF/spring/propertiesLoader.xml
@@ -0,0 +1,35 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<!--<beans-->
+        <!--xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"-->
+        <!--xmlns="http://www.springframework.org/schema/beans"-->
+        <!--xmlns:context="http://www.springframework.org/schema/context"-->
+        <!--xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd-->
+        <!--http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">-->
+
+
+
+    <!--<context:component-scan base-package="org.apache.streams.messaging" annotation-config="true"/>-->
+
+    <!--<context:property-placeholder location="/META-INF/streams.properties"/>-->
+
+    <!--<bean id="configuration" class="org.apache.streams.messaging.configuration.EipConfigurator" />-->
+
+
+<!--</beans>-->
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-runtimes/streams-runtime-webapp/src/main/resources/META-INF/spring/streams-eip-applicationContext.xml
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-webapp/src/main/resources/META-INF/spring/streams-eip-applicationContext.xml b/streams-runtimes/streams-runtime-webapp/src/main/resources/META-INF/spring/streams-eip-applicationContext.xml
new file mode 100644
index 0000000..a9b97a7
--- /dev/null
+++ b/streams-runtimes/streams-runtime-webapp/src/main/resources/META-INF/spring/streams-eip-applicationContext.xml
@@ -0,0 +1,113 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<beans
+        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+        xmlns="http://www.springframework.org/schema/beans"
+        xmlns:context="http://www.springframework.org/schema/context"
+        xmlns:task="http://www.springframework.org/schema/task"
+        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
+
+
+    <bean id="activityConsumerRouter" class="org.apache.streams.messaging.routers.impl.ActivityConsumerRouter">
+        <property name="activityConsumerWarehouse" ref="activityConsumerWarehouse"/>
+        <property name="camelContext" ref="context"/>
+    </bean>
+
+    <bean id="activityRegistrationProcessor"
+          class="org.apache.streams.messaging.processors.ActivityPublisherRegistrationProcessor"/>
+
+
+    <bean id="activityStreamsSubscriberRouter"
+          class="org.apache.streams.messaging.routers.impl.ActivityStreamsSubscriberRouter">
+        <property name="activityStreamsSubscriberWarehouse" ref="activityStreamsSubscriberWarehouse"/>
+        <property name="camelContext" ref="context"/>
+    </bean>
+
+    <bean id="subscriberRegistrationProcessor"
+          class="org.apache.streams.messaging.processors.ActivityStreamsSubscriberRegistrationProcessor">
+        <constructor-arg ref="subscriptionService"/>
+    </bean>
+
+    <bean id="cassandraKeyspace" class="org.apache.streams.cassandra.repository.impl.CassandraKeyspace">
+        <constructor-arg ref="cassandraConfig"/>
+    </bean>
+
+    <bean id="cassandraActivityStreamsRepository"
+          class="org.apache.streams.cassandra.repository.impl.CassandraActivityStreamsRepository">
+        <constructor-arg ref="cassandraKeyspace"/>
+        <constructor-arg ref="cassandraConfig"/>
+    </bean>
+
+    <bean id="cassandraSubscriptionRepository"
+          class="org.apache.streams.cassandra.repository.impl.CassandraSubscriptionRepository">
+        <constructor-arg ref="cassandraKeyspace"/>
+        <constructor-arg ref="cassandraConfig"/>
+    </bean>
+
+    <bean id="objectMapper" class="org.codehaus.jackson.map.ObjectMapper"/>
+
+    <bean id="subscriptionService" class="org.apache.streams.messaging.service.impl.CassandraSubscriptionService">
+        <constructor-arg ref="cassandraSubscriptionRepository"/>
+    </bean>
+
+    <bean id="activityService" class="org.apache.streams.messaging.service.impl.CassandraActivityService">
+        <constructor-arg ref="cassandraActivityStreamsRepository"/>
+        <constructor-arg ref="objectMapper"/>
+    </bean>
+
+    <!--<bean id="stromActivityAggregator" class="org.apache.streams.messaging.storm.StormActivityAggregator">-->
+        <!--<constructor-arg ref="bolt"/>-->
+        <!--<constructor-arg ref="spout"/>-->
+    <!--</bean>-->
+
+    <!--<bean id="bolt" class="org.apache.streams.messaging.storm.StormSubscriberBolt"/>-->
+    <!--<bean id="spout" class="org.apache.streams.messaging.storm.StormSubscriberSpout"/>-->
+
+    <bean id="activityAggregator" class="org.apache.streams.messaging.aggregation.ActivityAggregator">
+        <property name="activityService" ref="activityService"/>
+        <property name="activityStreamsSubscriberWarehouse" ref="activityStreamsSubscriberWarehouse"/>
+    </bean>
+
+    <task:annotation-driven/>
+
+    <bean id="jmsConnectionFactory"
+          class="org.apache.activemq.ActiveMQConnectionFactory">
+        <property name="brokerURL" value="${activemq.jmsConnectionFactoryUrl}"/>
+    </bean>
+
+    <bean id="pooledConnectionFactory"
+          class="org.apache.activemq.pool.PooledConnectionFactory" init-method="start" destroy-method="stop">
+        <property name="maxConnections" value="8"/>
+        <property name="connectionFactory" ref="jmsConnectionFactory"/>
+    </bean>
+
+    <bean id="jmsConfig"
+          class="org.apache.camel.component.jms.JmsConfiguration">
+        <property name="connectionFactory" ref="pooledConnectionFactory"/>
+        <property name="concurrentConsumers" value="10"/>
+    </bean>
+
+    <bean id="activemq"
+          class="org.apache.activemq.camel.component.ActiveMQComponent">
+        <property name="configuration" ref="jmsConfig"/>
+    </bean>
+
+
+</beans>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-runtimes/streams-runtime-webapp/src/main/resources/META-INF/spring/streams-eip-osgi-component-import.xml
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-webapp/src/main/resources/META-INF/spring/streams-eip-osgi-component-import.xml b/streams-runtimes/streams-runtime-webapp/src/main/resources/META-INF/spring/streams-eip-osgi-component-import.xml
new file mode 100644
index 0000000..9066206
--- /dev/null
+++ b/streams-runtimes/streams-runtime-webapp/src/main/resources/META-INF/spring/streams-eip-osgi-component-import.xml
@@ -0,0 +1,38 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xmlns:osgi="http://www.springframework.org/schema/osgi"
+       xsi:schemaLocation="http://www.springframework.org/schema/beans
+	http://www.springframework.org/schema/beans/spring-beans.xsd
+    http://www.springframework.org/schema/osgi
+    http://www.springframework.org/schema/osgi/spring-osgi.xsd">
+
+
+    <osgi:reference id="activityPublisherRegistration" interface="org.apache.streams.osgi.components.ActivityPublisherRegistration" />
+    <osgi:reference id="activityConsumerWarehouse" interface="org.apache.streams.osgi.components.activityconsumer.ActivityConsumerWarehouse" />
+
+    <osgi:reference id="activityStreamsSubscriberRegistration" interface="org.apache.streams.osgi.components.ActivityStreamsSubscriberRegistration" />
+    <osgi:reference id="activityStreamsSubscriberWarehouse" interface="org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscriberWarehouse" />
+
+
+
+
+</beans>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-runtimes/streams-runtime-webapp/src/main/resources/META-INF/spring/streamsCamelContext.xml
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-webapp/src/main/resources/META-INF/spring/streamsCamelContext.xml b/streams-runtimes/streams-runtime-webapp/src/main/resources/META-INF/spring/streamsCamelContext.xml
new file mode 100644
index 0000000..4360f39
--- /dev/null
+++ b/streams-runtimes/streams-runtime-webapp/src/main/resources/META-INF/spring/streamsCamelContext.xml
@@ -0,0 +1,96 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xmlns:osgi="http://www.springframework.org/schema/osgi"
+       xsi:schemaLocation="
+       http://www.springframework.org/schema/beans
+       http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
+       http://camel.apache.org/schema/spring
+       http://camel.apache.org/schema/spring/camel-spring-2.0.0.xsd
+       http://www.springframework.org/schema/osgi
+       http://www.springframework.org/schema/osgi/spring-osgi.xsd">
+
+
+
+
+    <camelContext id="context" xmlns="http://camel.apache.org/schema/spring">
+
+        <endpoint id="consumerRegistrationEndpoint" uri="${consumer.registrationEndpoint}"/>
+        <endpoint id="subscriberRegistrationEndpoint" uri="${subscriber.registrationEndpoint}"/>
+        <!--publisher registration route setup -->
+        <route>
+            <from uri="ref:consumerRegistrationEndpoint"/>
+                <bean ref="activityRegistrationProcessor" />
+            <to uri="direct:publisher-register"/>
+        </route>
+
+        <route>
+            <from uri="direct:publisher-register"/>
+                <bean ref="activityPublisherRegistration" method="register"/>
+            <to uri="direct:add-publisher-route"/>
+        </route>
+
+        <route>
+            <from uri="direct:add-publisher-route"/>
+                <bean ref="activityConsumerRouter" method="createNewRouteForConsumer"/>
+            <to uri="log:ExampleLog"/>
+        </route>
+
+        <!--split activities on Q, waiting for aggregation -->
+        <route>
+            <from uri="direct:activityQ"/>
+            <inOnly uri="activemq:queue:activities"/>
+        </route>
+
+        <route>
+            <from uri="activemq:queue:activities"/>
+            <aggregate completionInterval="500" groupExchanges="true">
+                <correlationExpression>
+                    <constant>true</constant>
+                </correlationExpression>
+                <bean ref="activityService" method="receiveExchange"/>
+            </aggregate>
+        </route>
+
+
+        <!-- register as a subscriber - returned the endpoint to poll and add to subscription sources - GET/POST -->
+        <route>
+            <from uri="ref:subscriberRegistrationEndpoint"/>
+                <bean ref="subscriberRegistrationProcessor" />
+            <to uri="direct:subscriber-register"/>
+        </route>
+
+        <route>
+            <from uri="direct:subscriber-register"/>
+                <bean ref="activityStreamsSubscriberRegistration" method="register"/>
+            <to uri="direct:add-subscriber-route"/>
+        </route>
+
+        <route>
+            <from uri="direct:add-subscriber-route"/>
+            <bean ref="activityStreamsSubscriberRouter" method="createNewRouteForSubscriber"/>
+            <to uri="log:ExampleLog"/>
+        </route>
+
+
+    </camelContext>
+
+</beans>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-runtimes/streams-runtime-webapp/src/main/resources/META-INF/streams.properties
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-webapp/src/main/resources/META-INF/streams.properties b/streams-runtimes/streams-runtime-webapp/src/main/resources/META-INF/streams.properties
new file mode 100644
index 0000000..b7bbfce
--- /dev/null
+++ b/streams-runtimes/streams-runtime-webapp/src/main/resources/META-INF/streams.properties
@@ -0,0 +1,41 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+servlet.baseUrlPath=http://localhost:8080/streams-web/
+
+consumer.inRouteHost=localhost
+consumer.inRoutePort=8000
+consumer.activityQUri = direct:activityQ
+
+consumer.publisherEndpointProtocol=jetty:http://
+consumer.publisherEndpointUrlResource=streams/publish
+consumer.receiveMethod=receive
+consumer.splitMethod=split
+
+consumer.registrationEndpoint=jetty:http://localhost:8000/streams/publisher/register
+
+subscriber.inRouteHost=localhost
+subscriber.inRoutePort=8000
+subscriber.subscriberEndpointUrlResource=streams/subscriber
+subscriber.receiveMethod=receive
+subscriber.postMethod=updateActivityStreamsSubscriberConfiguration
+subscriber.getMethod=getStream
+subscriber.registrationEndpoint=jetty:http://localhost:8000/streams/subscriber/register
+subscriber.subscriberEndpointProtocol=jetty:http://
+
+activemq.jmsConnectionFactoryUrl=tcp://localhost:61616
\ No newline at end of file